溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink 算子狀態(tài)怎么用

發(fā)布時(shí)間:2021-12-31 10:46:52 來(lái)源:億速云 閱讀:258 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹Flink 算子狀態(tài)怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

1. 算子狀態(tài)分類

算子狀態(tài)的作用范圍限定為算子并行子任務(wù)。這意味著由同一并行子任務(wù)所處理的所有數(shù)據(jù)都可以訪問(wèn)到相同的狀態(tài),狀態(tài)對(duì)于同一子任務(wù)而言是共享的。算子狀態(tài)不能由相同或不同算子的另一個(gè)并行子任務(wù)訪問(wèn)。

Flink 算子狀態(tài)怎么用

Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu),主要介紹當(dāng)并行度改變(擴(kuò)縮容)時(shí),從保存點(diǎn)重新啟動(dòng)時(shí),算子狀態(tài)如何分配:

  1. 列表狀態(tài)(List state):將狀態(tài)表示為一組數(shù)據(jù)的列表。

帶有算子列表狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)對(duì)列表中的條目進(jìn)行重新分配。理論上,所有并行算子任務(wù)的列表?xiàng)l目會(huì)被統(tǒng)一收集起來(lái),隨后均勻分配到更少或更多的任務(wù)之上。如果列表?xiàng)l目的數(shù)量小于算子新設(shè)置的并行度,部分任務(wù)在啟動(dòng)時(shí)的狀態(tài)就可能為空。

Flink 算子狀態(tài)怎么用

  1. 聯(lián)合列表狀態(tài)(Union list state) 也將狀態(tài)表示為數(shù)據(jù)的列表。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障從保存點(diǎn)(savepoint)啟動(dòng)應(yīng)用程序時(shí)進(jìn)行恢復(fù),如果并行度發(fā)生改變,帶有算子聯(lián)合列表狀態(tài)的算子會(huì)在擴(kuò)縮容時(shí)把狀態(tài)列表的全部條目廣播到全部任務(wù)上,隨后由任務(wù)自己決定哪些條目應(yīng)該保留,哪些應(yīng)該丟棄。

對(duì)于同一個(gè)算子來(lái)說(shuō),假如之前的并行度為2,那么就會(huì)有兩個(gè)子任務(wù),也就是兩個(gè)狀態(tài),假如改變其并行度為3,那么就把之前的兩個(gè)狀態(tài),給每個(gè)并行子任務(wù)都發(fā)一份,這樣每個(gè)并行子任務(wù)上都有所有的狀態(tài),然后由并行子任務(wù)去決定使用哪個(gè)狀態(tài)。 Flink 算子狀態(tài)怎么用

  1. 廣播狀態(tài)(Broadcast state):不同于普通的算子狀態(tài),每個(gè)并行子任務(wù)的狀態(tài)相同。但是仍然是每個(gè)并行子任務(wù)訪問(wèn)自己的狀態(tài),但是狀態(tài)都是一樣的。 如果一個(gè)算子有多項(xiàng)任務(wù),而它的每個(gè)并行子任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。

帶有算子廣播狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)把狀態(tài)拷貝到全部新任務(wù)上,這樣做的原因是廣播狀態(tài)能確保所有任務(wù)的狀態(tài)相同。在縮容的情況下,由于狀態(tài)經(jīng)過(guò)復(fù)制不會(huì)丟失,我們可以簡(jiǎn)單的停掉多出的任務(wù)。

2.算子狀態(tài)的使用

public class StateTest1_OperatorState {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 轉(zhuǎn)換成SensorReading類型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定義一個(gè)有狀態(tài)的map操作,統(tǒng)計(jì)當(dāng)前分區(qū)數(shù)據(jù)個(gè)數(shù)
        SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());

        resultStream.print();

        env.execute();
    }

    // 自定義MapFunction
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{
        // 定義一個(gè)本地變量,作為算子狀態(tài)
        private Integer count = 0;

        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }

        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }

        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for( Integer num: state )
                count += num;
        }
    }
}
  1. 算子狀態(tài)的定義和普通的成員變量定義相同,但是對(duì)應(yīng)的算子處理函數(shù)要繼承對(duì)應(yīng)的接口,例如ListCheckpointed,自定義狀態(tài)進(jìn)行快照和恢復(fù)的邏輯。

以上是“Flink 算子狀態(tài)怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI