您好,登錄后才能下訂單哦!
這篇文章主要介紹Flink 算子狀態(tài)怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
算子狀態(tài)的作用范圍限定為算子并行子任務(wù)。這意味著由同一并行子任務(wù)所處理的所有數(shù)據(jù)都可以訪問(wèn)到相同的狀態(tài),狀態(tài)對(duì)于同一子任務(wù)而言是共享的。算子狀態(tài)不能由相同或不同算子的另一個(gè)并行子任務(wù)訪問(wèn)。
Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu),主要介紹當(dāng)并行度改變(擴(kuò)縮容)時(shí),從保存點(diǎn)重新啟動(dòng)時(shí),算子狀態(tài)如何分配:
列表狀態(tài)(List state):將狀態(tài)表示為一組數(shù)據(jù)的列表。
帶有算子列表狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)對(duì)列表中的條目進(jìn)行重新分配。理論上,所有并行算子任務(wù)的列表?xiàng)l目會(huì)被統(tǒng)一收集起來(lái),隨后均勻分配到更少或更多的任務(wù)之上。如果列表?xiàng)l目的數(shù)量小于算子新設(shè)置的并行度,部分任務(wù)在啟動(dòng)時(shí)的狀態(tài)就可能為空。
聯(lián)合列表狀態(tài)(Union list state) 也將狀態(tài)表示為數(shù)據(jù)的列表。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障從保存點(diǎn)(savepoint)啟動(dòng)應(yīng)用程序時(shí)進(jìn)行恢復(fù),如果并行度發(fā)生改變,帶有算子聯(lián)合列表狀態(tài)的算子會(huì)在擴(kuò)縮容時(shí)把狀態(tài)列表的全部條目廣播到全部任務(wù)上,隨后由任務(wù)自己決定哪些條目應(yīng)該保留,哪些應(yīng)該丟棄。
對(duì)于同一個(gè)算子來(lái)說(shuō),假如之前的并行度為2,那么就會(huì)有兩個(gè)子任務(wù),也就是兩個(gè)狀態(tài),假如改變其并行度為3,那么就把之前的兩個(gè)狀態(tài),給每個(gè)并行子任務(wù)都發(fā)一份,這樣每個(gè)并行子任務(wù)上都有所有的狀態(tài),然后由并行子任務(wù)去決定使用哪個(gè)狀態(tài)。
廣播狀態(tài)(Broadcast state):不同于普通的算子狀態(tài),每個(gè)并行子任務(wù)的狀態(tài)相同。但是仍然是每個(gè)并行子任務(wù)訪問(wèn)自己的狀態(tài),但是狀態(tài)都是一樣的。 如果一個(gè)算子有多項(xiàng)任務(wù),而它的每個(gè)并行子任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。
帶有算子廣播狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)把狀態(tài)拷貝到全部新任務(wù)上,這樣做的原因是廣播狀態(tài)能確保所有任務(wù)的狀態(tài)相同。在縮容的情況下,由于狀態(tài)經(jīng)過(guò)復(fù)制不會(huì)丟失,我們可以簡(jiǎn)單的停掉多出的任務(wù)。
public class StateTest1_OperatorState { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // socket文本流 DataStream<String> inputStream = env.socketTextStream("localhost", 7777); // 轉(zhuǎn)換成SensorReading類型 DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定義一個(gè)有狀態(tài)的map操作,統(tǒng)計(jì)當(dāng)前分區(qū)數(shù)據(jù)個(gè)數(shù) SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper()); resultStream.print(); env.execute(); } // 自定義MapFunction public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{ // 定義一個(gè)本地變量,作為算子狀態(tài) private Integer count = 0; @Override public Integer map(SensorReading value) throws Exception { count++; return count; } @Override public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { return Collections.singletonList(count); } @Override public void restoreState(List<Integer> state) throws Exception { for( Integer num: state ) count += num; } } }
算子狀態(tài)的定義和普通的成員變量定義相同,但是對(duì)應(yīng)的算子處理函數(shù)要繼承對(duì)應(yīng)的接口,例如ListCheckpointed,自定義狀態(tài)進(jìn)行快照和恢復(fù)的邏輯。
以上是“Flink 算子狀態(tài)怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。