溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理

發(fā)布時(shí)間:2021-12-31 10:49:04 來源:億速云 閱讀:270 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

獲取流數(shù)據(jù)

用戶提交的代碼最終被封裝成了org.apache.flink.runtime.taskmanager.Task,Task是一個(gè)Runnable因此核心代碼就在run方法,run方法調(diào)用了doRun方法,在doRun中調(diào)用了invokable.invoke(),Task的整個(gè)處理流程其實(shí)就在這里面。org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable是一個(gè)抽象類,它的子類是不同類型的Task,這里我們主要關(guān)注流處理任務(wù)相關(guān)的org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask的invoke方法執(zhí)行了runMailboxLoop()方法。

runMailboxLoop()方法就是執(zhí)行org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor的runMailboxLoop方法。MailboxProcessor是一種線程模型,runMailboxLoop就是在while輪詢中不斷執(zhí)行任務(wù)和默認(rèn)動(dòng)作,其中默認(rèn)動(dòng)作就是StreamTask的processInput方法,該方法調(diào)用了StreamInputProcessor的inputProcessor方法,在這個(gè)方法中獲取并處理了流數(shù)據(jù)。StreamInputProcessor的子類StreamOneInputProcessor和StreamTwoInputProcessor分別用來處理有1個(gè)和2個(gè)入度的Task(StreamMultipleInputProcessor先不管)。StreamOneInputProcessor中有1個(gè)StreamTaskInput用來獲取數(shù)據(jù),1個(gè)DataOutput用來收集從StreamTaskInput獲取的數(shù)據(jù);同理,StreamTwoInputProcessor有2個(gè)StreamTaskInput和2個(gè)DataOutput。StreamTaskInput的子類StreamTaskNetworkInput用來從網(wǎng)絡(luò)中獲取流數(shù)據(jù),通過調(diào)用他它的emitNext不僅處理流數(shù)據(jù)還處理了checkpoint barrier,本篇文章只關(guān)注數(shù)據(jù)流的處理流程。StreamTaskNetworkInput從反序列化器中獲取到完整流數(shù)據(jù)后把數(shù)據(jù)交給DataOutput。DataOutput也有處理1個(gè)入度和2個(gè)入度的子類,它們都持有OperatorChain中第一個(gè)operator的引用,稱為headOperator,DataOutput從StreamTaskInput那里獲取到數(shù)據(jù)后會(huì)交給headOperator來處理。到此為止,流數(shù)據(jù)被獲取并傳入了OperatorChain。 這里總結(jié)一下:StreamTask的processInput方法在MailboxProcessor中被反復(fù)調(diào)用,在processInput方法中StreamTask使用StreamInputProcessor來獲取并處理流數(shù)據(jù)。StreamInputProcessor中的StreamTaskInput用來獲取數(shù)據(jù),獲取的數(shù)據(jù)交給DataOutput,DataOutput將數(shù)據(jù)傳入OperatorChain的第一個(gè)operator。其中StreamTask,StreamInputProcessor和DataOutput都有處理1個(gè)入度和2個(gè)入度的子類。

Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理

數(shù)據(jù)流過OperatorChain

OperatorChain的第一個(gè)operator獲取數(shù)據(jù)后,數(shù)據(jù)是怎樣在OperatorChain中流動(dòng)的呢?首先說說OperatorChain,StreamOperatorWrapper是chain的每個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都有指向下一個(gè)或上一個(gè)節(jié)點(diǎn)的引用,因此OperatorChain是一個(gè)雙向鏈表。但是數(shù)據(jù)的流動(dòng)并不依靠這個(gè)鏈?zhǔn)浇Y(jié)構(gòu)。上文我們提到DataOutput將數(shù)據(jù)交給了headOperator,OperatorChain的第一個(gè)節(jié)點(diǎn)都是StreamOperator的子類,我們編寫的filer算子,map算子等最終都會(huì)被封裝成StreamOperator,例如子類StreamFlatMap就是執(zhí)行flatMap方法,StreamFilter就是執(zhí)行fliter方法等。這些方法執(zhí)行的時(shí)候用org.apache.flink.streaming.api.operators.Output對處理后的結(jié)果進(jìn)行收集。例如StreamFilter當(dāng)FilterFunction返回true時(shí)收集數(shù)據(jù),而StreamFlatMap將Output傳入flatMap方法中由用戶代碼進(jìn)行收集數(shù)據(jù)。收集的數(shù)據(jù)是怎樣向OperatorChain的下一個(gè)節(jié)點(diǎn)傳遞的呢?原來Output中持有OneInputStreamOperator變量指向了chain中下一個(gè)節(jié)點(diǎn)的算子,調(diào)用Output的collect方法會(huì)調(diào)用下一個(gè)算子的processElement,數(shù)據(jù)就這樣在整個(gè)OperatorChain中傳遞了。

Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理

發(fā)向下游Task

當(dāng)數(shù)據(jù)傳到OperatorChain的最后一個(gè)算子時(shí)數(shù)據(jù)是怎樣發(fā)向下個(gè)Task的呢?最后一個(gè)算子擁有的Output實(shí)現(xiàn)類是org.apache.flink.streaming.runtime.io.RecordWriterOutput。RecordWriterOutput的collect方法會(huì)調(diào)用的org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit方法用來發(fā)送數(shù)據(jù),該方法會(huì)將序列化器中的數(shù)據(jù)復(fù)制到BufferBuilder中。BufferBuilder維護(hù)了一個(gè)內(nèi)存片段MemorySegment并且可以創(chuàng)建相應(yīng)的消費(fèi)者。RecordWriter有2個(gè)實(shí)現(xiàn)類ChannelSelectorRecordWriter和BroadcastRecordWriter。Task向下游節(jié)點(diǎn)的多個(gè)并行度發(fā)送數(shù)據(jù),每個(gè)并行度都對應(yīng)一個(gè)channel。ChannelSelectorRecordWriter為每個(gè)chanel都保存一個(gè)BufferBuilder并分別添加BufferConsumer:

BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);//按channel獲取BufferBuilder
addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);//按channel添加BufferConsumer
bufferBuilders[targetChannel] = bufferBuilder;

BroadcastRecordWriter只有一個(gè)BufferBuilder,使用同一個(gè)BufferBuilder給所有的channel添加BufferConsumer:

try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
    for (int channel = 0; channel < numberOfChannels; channel++) {
        addBufferConsumer(bufferConsumer.copy(), channel);//所有channel用同一個(gè)BufferBuilder達(dá)到廣播的目的
    }
}

RecordWriter#requestNewBufferBuilder方法會(huì)獲取BufferBuilder,如果獲取失敗會(huì)導(dǎo)致Task執(zhí)行線程阻塞造成反壓。

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);//嘗試獲取,獲取不到返回null
    if (builder == null) {
        long start = System.currentTimeMillis();
        builder = targetPartition.getBufferBuilder(targetChannel);//阻塞獲取,導(dǎo)致反壓
        idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
    }
    return builder;
}

BufferBuilder最終來自LocalBufferPool,LocalBufferPool有幾個(gè)重要的屬性:

//taskmanager的網(wǎng)絡(luò)緩存池,MemorySegment從這里獲取
private final NetworkBufferPool networkBufferPool;
//已經(jīng)獲取的MemorySegment被組織成一個(gè)隊(duì)列
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
//當(dāng)前l(fā)ocalBufferPool的大小
private int currentPoolSize;
//已經(jīng)獲取的MemorySegment
private int numberOfRequestedMemorySegments;
//每個(gè)channel能同時(shí)獲取的最大BufferBuilder數(shù)
private final int maxBuffersPerChannel;
//subpartition就是channel,數(shù)組存儲(chǔ)了每個(gè)channel同時(shí)使用的BufferBuilder數(shù)
private final int[] subpartitionBuffersCount;

BufferBuilder由requestMemorySegment方法和requestMemorySegmentBlocking方法獲取,requestMemorySegmentBlocking方法也是調(diào)用requestMemorySegment方法并在沒有獲取到MemorySegment時(shí)通過AvailableFuture的get方法來阻塞直到獲取成功為止,AvailableFuture是一個(gè)用CompletableFuture表示的狀態(tài)位,這里用到了CompletableFuture的get方法會(huì)阻塞直到complete的特性,沒有完成的future表示unavailable,完成了的表示available。requestMemorySegment方法中如果已經(jīng)獲取的MemorySegment(numberOfRequestedMemorySegments)大于了localBufferPool的大?。╟urrentPoolSize)需要將多余的MemorySegment先歸還給networkBufferPool。之后獲取MemorySegment,如果獲取不到就設(shè)置AvailableFuture為不可用,否則記錄channel使用的MemorySegment數(shù)量,如果大于maxBuffersPerChannel,也設(shè)置AvailableFuture為不可用。

@Nullable
private MemorySegment requestMemorySegment(int targetChannel) throws IOException {
    MemorySegment segment = null;
    synchronized (availableMemorySegments) {
        returnExcessMemorySegments();//將多余的segment歸還給networkBufferPool

        if (availableMemorySegments.isEmpty()) {
            segment = requestMemorySegmentFromGlobal();//全局獲取
        }
        // segment may have been released by buffer pool owner
        if (segment == null) {
            segment = availableMemorySegments.poll();//局部獲取
        }
        if (segment == null) {
            availabilityHelper.resetUnavailable();//獲取不到設(shè)置為不可用
        }

        //記錄channel正在使用segment數(shù),如果超了設(shè)置為不可用
        if (segment != null && targetChannel != UNKNOWN_CHANNEL) {
            if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
                unavailableSubpartitionsCount++;
                availabilityHelper.resetUnavailable();
            }
        }
    }
    return segment;
}

反壓的采集

上面說的AvailableFuture設(shè)置為不可用其實(shí)和反壓有關(guān),Task的isBackPressured方法返回了該Task是否產(chǎn)生了反壓。

public boolean isBackPressured() {
    if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) {
        return false;
    }
    //獲取所有的AvailableFuture,如果有沒完成了則有反壓
    final CompletableFuture<?>[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length];
    for (int i = 0; i < outputFutures.length; ++i) {
        outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture();
    }
    return !CompletableFuture.allOf(outputFutures).isDone();
}

以上是“Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI