您好,登錄后才能下訂單哦!
這篇文章主要介紹“Flink基礎(chǔ)知識(shí)點(diǎn)有哪些”,在日常操作中,相信很多人在Flink基礎(chǔ)知識(shí)點(diǎn)有哪些問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink基礎(chǔ)知識(shí)點(diǎn)有哪些”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
流式計(jì)算是大數(shù)據(jù)計(jì)算的痛點(diǎn),第1代實(shí)時(shí)計(jì)算引擎Storm對Exactly Once 語義和窗口支持較弱,使用的場景有限且無法支持高吞吐計(jì)算;Spark Streaming 采用“微批處理”模擬流計(jì)算,在窗口設(shè)置很小的場景中有性能瓶頸,Spark 本身也在嘗試連續(xù)執(zhí)行模式(Continuous Processing),但進(jìn)展緩慢。
Flink是一個(gè)低延遲、高吞吐的實(shí)時(shí)計(jì)算引擎,其利用分布式一致性快照實(shí)現(xiàn)檢查點(diǎn)容錯(cuò)機(jī)制,并實(shí)現(xiàn)了更好的狀態(tài)管理,F(xiàn)link可在毫秒級(jí)的延遲下處理上億次/秒的消息或者事件,同時(shí)提供了一個(gè)Exactly-once的一致性語義,保證了數(shù)據(jù)的正確性,使得Flink可以提供金融級(jí)的數(shù)據(jù)處理能力,總結(jié)其高級(jí)特性包括CSTW(CheckPoint,Statue,Time,windows)
Spark的技術(shù)理念是基于批來模擬流,微批處理的延時(shí)較高(無法優(yōu)化到秒以下的數(shù)量級(jí)),且無法支持基于event_time的時(shí)間窗口做聚合邏輯。Flink和spark相反,它基于流計(jì)算來模擬批計(jì)算,更切合數(shù)據(jù)的生成方式,技術(shù)上有更好的擴(kuò)展性。
流處理任務(wù)要對數(shù)據(jù)進(jìn)行統(tǒng)計(jì),如Sum, Count, Min, Max,這些值是需要存儲(chǔ)的,因?yàn)橐粩喔拢@些值或者變量就可以理解為一種狀態(tài),如果數(shù)據(jù)源是在讀取Kafka, RocketMQ,可能要記錄讀取到什么位置,并記錄Offset,這些Offset變量都是要計(jì)算的狀態(tài)。
Flink提供了內(nèi)置的狀態(tài)管理,可以把這些狀態(tài)存儲(chǔ)在Flink內(nèi)部,而不需要把它存儲(chǔ)在外部系統(tǒng),這樣做的好處:
① 降低了計(jì)算引擎對外部系統(tǒng)的依賴以及部署,使運(yùn)維更加簡單;
② 對性能帶來了極大的提升:如果通過外部去訪問如Redis , HBase 需要網(wǎng)絡(luò)及RPC資源,如果通過Flink內(nèi)部去訪問,只通過自身的進(jìn)程去訪問這些變量。
同時(shí)Flink會(huì)定期將這些狀態(tài)做Checkpoint持久化,把Checkpoint存儲(chǔ)到一個(gè)分布式的持久化系統(tǒng)中,比如HDFS,這樣當(dāng)Flink的任務(wù)出現(xiàn)任何故障時(shí),它都會(huì)從最近的一次Checkpoint將整個(gè)流的狀態(tài)進(jìn)行恢復(fù),然后繼續(xù)運(yùn)行它的流處理,對用戶沒有任何數(shù)據(jù)上的影響。
Flink是一個(gè)分層的架構(gòu)系統(tǒng),每一層所包含的組件都提供了特定的抽象,用來服務(wù)于上層組件,F(xiàn)link的分層體現(xiàn)有四層,分別是Deploy層、core層、API層/Libraries層,其中Deploy層主要涉及的是Flink的部署模式及同資源調(diào)度組件的交互模式,Core層提供了支持Flink計(jì)算的全部核心實(shí)現(xiàn),API層/Libraries層提供了Flink的API接口和基于API接口的特定應(yīng)用的計(jì)算框架;
Deploy層:該層主要涉及了Flink的部署模式,F(xiàn)link支持多種部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2),Standalone 部署模式與Spark類似;
Runtime層:Runtime層提供了支持Flink計(jì)算的全部核心實(shí)現(xiàn),比如:支持分布式Stream處理、Job Graph到Execution Graph的映射、調(diào)度 等,為上層API層提供基礎(chǔ)服務(wù)。
API層:API層主要實(shí)現(xiàn)了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應(yīng)DataStream API,面向批處理對應(yīng)DataSet API。
Libraries層:該層也可以稱為Flink應(yīng)用框架層,根據(jù)API層的劃分,在API層之上構(gòu)建的滿足特定應(yīng)用的實(shí)時(shí)計(jì)算框架,也分別對應(yīng)于面向流處理 和面向批處理兩類。面向流處理支持:CEP(復(fù)雜事件處理)、SQL-like的操作(基于Table的關(guān)系操作);面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)、Gelly(圖處理)。
Flink支持增量迭代,具有對迭代自行優(yōu)化的功能,因此在on yarn上提交的任務(wù)性能略好于 Spark,F(xiàn)link提供2種方式在yarn上提交任務(wù):啟動(dòng)1個(gè)一直運(yùn)行的 Yarn session(分離模式)和在 Yarn 上運(yùn)行1個(gè) Flink 任務(wù)(客戶端模式);
分離模式:通過命令yarn-session.sh的啟動(dòng)方式本質(zhì)上是在yarn集群上啟動(dòng)一個(gè)flink集群,由yarn預(yù)先給flink集群分配若干個(gè)container,在yarn的界面上只能看到一個(gè)Flink session with X TaskManagers的任務(wù),并且只有一個(gè)Flink界面,可以從Yarn的Application Master鏈接進(jìn)入;
客戶端模式:通過命令bin/flink run -m yarn-cluster啟動(dòng),每次發(fā)布1個(gè)任務(wù),本質(zhì)上給每個(gè)Flink任務(wù)啟動(dòng)了1個(gè)集群,yarn在任務(wù)發(fā)布時(shí)啟動(dòng)JobManager(對應(yīng)Yarn的AM)和TaskManager,如果一個(gè)任務(wù)指定了n個(gè)TaksManager(-yn n),則會(huì)啟動(dòng)n+1個(gè)Container,其中一個(gè)是JobManager,發(fā)布m個(gè)應(yīng)用,則有m個(gè)Flink界面,不同的任務(wù)不可能在一個(gè)Container(JVM)中,實(shí)現(xiàn)了資源隔離。
進(jìn)入Flink的bin目錄下運(yùn)行./yarn-session.sh –help 查看幫助驗(yàn)證yarn是否成功配置,使用./yarn-session.sh –q 顯示yarn所有nodeManager節(jié)點(diǎn)資源;部署On yarn模式的Flink只需要修改配置conf/flink-conf.yaml ,詳細(xì)參數(shù)請參考官網(wǎng):通用配置:Configuration,HA配置:High Availability (HA)
采用分離模式來啟動(dòng)Flink Yarn Session,提交后提示該yarn application成功提交到y(tǒng)arn并返回id,使用yarn application –kill application_id 來停止yarn上提交的任務(wù);
yarn-session.sh -n 3 -jm 700 -tm 700 -s 8 -nm FlinkOnYarnSession -d –st
可以直接提交自帶的詞頻統(tǒng)計(jì)用例,驗(yàn)證on yarn模式是否配置成功:
~/bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048 ~/flink/examples/batch/WordCount.jar
分離模式:通過命令yarn-session.sh先啟動(dòng)集群,然后再提交作業(yè),接著會(huì)向yarn申請一塊空間后,資源永遠(yuǎn)保持不變。如果資源滿了,下一個(gè)作業(yè)就無法提交,只能等到y(tǒng)arn中的其中一個(gè)作業(yè)執(zhí)行完成后,釋放了資源,下個(gè)作業(yè)才會(huì)正常提交。所有作業(yè)共享Dispatcher和ResourceManager;共享資源;適合規(guī)模小執(zhí)行時(shí)間短的作業(yè)。
客戶端模式: 通過命令bin/flink run -m yarn-cluster提交任務(wù),每提交一個(gè)作業(yè)會(huì)根據(jù)自身的情況,都會(huì)單獨(dú)向yarn申請資源,直到作業(yè)執(zhí)行完成,一個(gè)作業(yè)的失敗與否并不會(huì)影響下一個(gè)作業(yè)的正常提交和運(yùn)行,適合規(guī)模大長時(shí)間運(yùn)行的作業(yè);
DataStream是Flink的較低級(jí)API,用于進(jìn)行數(shù)據(jù)的實(shí)時(shí)處理任務(wù),可以將該編程模型分為DataSource、Transformation、Sink三個(gè)部分;
源是程序讀取輸入數(shù)據(jù)的位置,可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 將源添加到程序,F(xiàn)link 有許多預(yù)先實(shí)現(xiàn)的源函數(shù),也可以通過實(shí)現(xiàn) SourceFunction 方法自定義非并行源 ,或通過實(shí)現(xiàn) ParallelSourceFunction 或擴(kuò)展 RichParallelSourceFunction 自定義并行源。
有幾個(gè)預(yù)定義的流數(shù)據(jù)源可從 StreamExecutionEnvironment 訪問:
基于文件:
readTextFile(path) #逐行讀取文本文件(文件符合 TextInputFormat 格式),并作為字符串返回每一行。
readFile(fileInputFormat, path) #按指定的文件輸入格式(fileInputFormat)讀取指定路徑的文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter) #前兩個(gè)方法的內(nèi)部調(diào)用方法。根據(jù)給定文件格式(fileInputFormat)讀取指定路徑的文件。根據(jù) watchType,定期監(jiān)聽路徑下的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理當(dāng)前在路徑中的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE),使用 pathFilter,可以進(jìn)一步排除正在處理的文件。
基于Socket:socketTextStream 從 Socket 讀取,元素可以用分隔符分隔。
基于集合:
fromCollection(Seq) #用 Java.util.Collection 對象創(chuàng)建數(shù)據(jù)流,集合中的所有元素必須屬于同一類型;
fromCollection(Iterator) #用迭代器創(chuàng)建數(shù)據(jù)流。指定迭代器返回的元素的數(shù)據(jù)類型;
fromElements(elements: _*) #從給定的對象序列創(chuàng)建數(shù)據(jù)流。所有對象必須屬于同一類型;
fromParallelCollection(SplittableIterator) #并行地從迭代器創(chuàng)建數(shù)據(jù)流。指定迭代器返回的元素的數(shù)據(jù)類型;
generateSequence(from, to) #并行生成給定間隔的數(shù)字序列。
自定義:addSource 附加新的源函數(shù)。例如從 Apache Kafka 中讀取,可以使用 addSource(new FlinkKafkaConsumer08<>(...))。請?jiān)敿?xì)查看 連接器。
Transformation操作將1個(gè)或多個(gè)DataStream轉(zhuǎn)換為新的DataStream,多個(gè)轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓?fù)?,如下圖所示,DataStream會(huì)由不同的Transformation操作、轉(zhuǎn)換、過濾、聚合成其他不同的流,從而完成業(yè)務(wù)要求;
Map:DataStream -> DataStream,一個(gè)數(shù)據(jù)元生成一個(gè)新的數(shù)據(jù)元。將輸入流的元素翻倍:dataStream.map { x => x * 2 }
FlatMap:DataStream -> DataStream,一個(gè)數(shù)據(jù)元生成多個(gè)數(shù)據(jù)元(可以為0)。將句子分割為單詞:
dataStream.flatMap { str => str.split(" ") }
Filter:DataStream -> DataStream,每個(gè)數(shù)據(jù)元執(zhí)行布爾函數(shù),只保存函數(shù)返回 true 的數(shù)據(jù)元。過濾掉零值的過濾器:
dataStream.filter { _ != 0 }
KeyBy :DataStream -> KeyedStream,將流劃分為不相交的分區(qū)。具有相同 Keys 的所有記錄在同一分區(qū)。指定 key 的取值:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce :KeyedStream -> DataStream,KeyedStream 元素滾動(dòng)執(zhí)行 Reduce。將當(dāng)前數(shù)據(jù)元與最新的一個(gè) Reduce 值組合作為新值發(fā)送。創(chuàng)建 key 的值求和:keyedStream.reduce { _ + _ }
Aggregations :KeyedStream -> DataStream,應(yīng)用于 KeyedStream 上的滾動(dòng)聚合。
Window:KeyedStream -> WindowedStream,Windows 可以在已經(jīng)分區(qū)的 KeyedStream 上定義。Windows 根據(jù)某些特征(例如,在最近5秒內(nèi)到達(dá)的數(shù)據(jù))對每個(gè)Keys中的數(shù)據(jù)進(jìn)行分組。更多說明參考 Windows 或 譯版。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAll :DataStream -> AllWindowedStream,Windows 也可以在 DataStream 上定義。在許多情況下,這是非并行轉(zhuǎn)換。所有記錄將收集在 windowAll 算子的一個(gè)任務(wù)中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window Apply :WindowedStream -> DataStream 或 AllWindowedStream -> DataStream,將函數(shù)應(yīng)用于整個(gè)窗口。一個(gè)對窗口數(shù)據(jù)求和:
windowedStream.apply { WindowFunction }
allWindowedStream.apply { AllWindowFunction }
Window Reduce:WindowedStream -> DataStream,Reduce 函數(shù)應(yīng)用于窗口并返回結(jié)果值。windowedStream.reduce { _ + _ }
Aggregations on windows:WindowedStream -> DataStream,聚合窗口內(nèi)容;
Union :DataStream* -> DataStream,兩個(gè)或多個(gè)數(shù)據(jù)流的合并,創(chuàng)建包含來自所有流的所有數(shù)據(jù)元的新流。如果將數(shù)據(jù)流與自身聯(lián)合,則會(huì)在結(jié)果流中獲取兩次數(shù)據(jù)元。
dataStream.union(otherStream1, otherStream2, ...)
Window Join :DataStream,DataStream -> DataStream,Join 連接兩個(gè)流,指定 Key 和窗口。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup :DataStream,DataStream -> DataStream,CoGroup 連接兩個(gè)流,指定 Key 和窗口。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
CoGroup 與 Join 的區(qū)別:CoGroup 會(huì)輸出未匹配的數(shù)據(jù),Join 只輸出匹配的數(shù)據(jù)
Connect :DataStream,DataStream -> ConnectedStreams,連接兩個(gè)有各自類型的數(shù)據(jù)流。允許兩個(gè)流之間的狀態(tài)共享。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
可用于數(shù)據(jù)流關(guān)聯(lián)配置流;
CoMap, CoFlatMap :ConnectedStreams -> DataStream,作用域連接數(shù)據(jù)流(connected data stream)上的 map 和 flatMap:
Split :DataStream -> SplitStream,將數(shù)據(jù)流拆分為兩個(gè)或更多個(gè)流。
Select :SplitStream -> DataStream,從 SpliteStream 中選擇一個(gè)流或多個(gè)流。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate :DataStream -> IterativeStream -> DataStream,將一個(gè)算子的輸出重定向到某個(gè)先前的算子,在流中創(chuàng)建 feedback 循環(huán)。這對于定義不斷更新模型的算法特別有用。以下代碼以流開頭并連續(xù)應(yīng)用迭代體。大于0的數(shù)據(jù)元將被發(fā)送回 feedback,其余數(shù)據(jù)元將向下游轉(zhuǎn)發(fā)。
Project:DataStream -> DataStream,作用于元組的轉(zhuǎn)換,從元組中選擇字段的子集。
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
Data Sink 消費(fèi) DataStream 并轉(zhuǎn)發(fā)到文件,套接字,外部系統(tǒng)或打印到頁面。Flink 帶有各種內(nèi)置輸出格式,封裝在 DataStreams 上的算子操作后面:
writeAsText() / TextOutputFormat:按字符串順序?qū)懭胛募?。通過調(diào)用每個(gè)元素的 toString() 方法獲得字符串。
writeAsCsv(...) / CsvOutputFormat:將元組寫為逗號(hào)分隔的形式寫入文件。行和字段分隔符是可配置的。每個(gè)字段的值來自對象的 toString() 方法。
print() / printToErr():在標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯(cuò)誤流上打印每個(gè)元素的 toString() 值??梢远x輸出前綴,這有助于區(qū)分不同的打印調(diào)用。如果并行度大于1,輸出也包含生成輸出的任務(wù)的標(biāo)識(shí)符。
writeUsingOutputFormat() / FileOutputFormat:自定義文件輸出的方法和基類。支持自定義對象到字節(jié)的轉(zhuǎn)換。
writeToSocket:將元素寫入 Socket,使用 SerializationSchema 進(jìn)行序列化。
addSink:調(diào)用自定義接收器函數(shù)。請?jiān)敿?xì)查看 連接器。
DataStream 的 write*() 方法主要用于調(diào)試目的。他們沒有參與 Flink checkpoint,這意味著這些函數(shù)通常具有至少一次的語義。刷新到目標(biāo)系統(tǒng)的數(shù)據(jù)取決于 OutputFormat 的實(shí)現(xiàn),并非所有發(fā)送到 OutputFormat 的數(shù)據(jù)都會(huì)立即顯示在目標(biāo)系統(tǒng)中。此外,在失敗的情況下,這些記錄可能會(huì)丟失。
要將流可靠、準(zhǔn)確地傳送到文件系統(tǒng),請使用 flink-connector-filesystem。通過 .addSink(...) 方法的自定義實(shí)現(xiàn),可以實(shí)現(xiàn)在 checkpoint 中精確一次的語義。
流式數(shù)據(jù)處理最大的特點(diǎn)是數(shù)據(jù)具有時(shí)間屬性特征,F(xiàn)link根據(jù)時(shí)間產(chǎn)生的位置不同,將時(shí)間區(qū)分為三種概念:數(shù)據(jù)生成時(shí)間(Event_time)、事件接入時(shí)間(Ingestion_time)、事件處理時(shí)間(Processing_time),用戶可以根據(jù)需要選擇事件類型作為流式數(shù)據(jù)的時(shí)間屬性,極大增強(qiáng)了數(shù)據(jù)處理的靈活性和準(zhǔn)確性;
Event_time:獨(dú)立事件在產(chǎn)生它的設(shè)備上的發(fā)生時(shí)間,這個(gè)時(shí)間通常在到達(dá)Flink之前已經(jīng)嵌入到生產(chǎn)數(shù)據(jù)中,因此時(shí)間順序取決于事件產(chǎn)生的地方,和下游的數(shù)據(jù)處理系統(tǒng)的事件無關(guān),需要在Flink中指定事件的時(shí)間屬性或者設(shè)定時(shí)間提取器提取事件時(shí)間;
Processing_time:指在操作算子計(jì)算過程中獲取到的所在主機(jī)的時(shí)間,用戶選擇了Processing_time后,所有和時(shí)間相關(guān)的計(jì)算算子都直接使用其所在主機(jī)的系統(tǒng)時(shí)間,使用Processing_time的程序性能相對較高,延時(shí)相對較低,因?yàn)槠渌胁僮鞑恍枰鋈魏螘r(shí)間上的對比和協(xié)調(diào);
Ingestion_time:指數(shù)據(jù)接入Flink系統(tǒng)的時(shí)間,依賴于Source Operator所在主機(jī)的系統(tǒng)時(shí)鐘;
一般場景中選擇event_time作為事件時(shí)間戳是最貼近生產(chǎn)的,但大多數(shù)情況下由于數(shù)據(jù)的延遲和亂序使用processing_time;
在流式計(jì)算中,數(shù)據(jù)持續(xù)不斷的流入計(jì)算引擎,需要一個(gè)窗口限定計(jì)算范圍,比如監(jiān)控場景的近2分鐘或者精準(zhǔn)計(jì)算的每隔2分鐘計(jì)算一次,窗口定義了該范圍,輔助完成有界范圍的數(shù)據(jù)處理;
Flink的DataStream API將窗口抽象成獨(dú)立的Operator,且支持很多窗口算子,每個(gè)窗口算子包含Window Assigner 、Windows Function、觸發(fā)器、剔除器、時(shí)延設(shè)定等部分屬性,其中Window Assigner 和 Windows Function是必須要指定的屬性;
Window Assigner用來決定某個(gè)元素被分配到哪個(gè)/哪些窗口中去;Trigger觸發(fā)器決定了一個(gè)窗口何時(shí)能夠被計(jì)算或清除,每個(gè)窗口都會(huì)擁有一個(gè)自己的Trigger;
Evictor驅(qū)逐者在Trigger觸發(fā)之后,在窗口被處理之前,Evictor(如果有Evictor的話)會(huì)用來剔除窗口中不需要的元素,相當(dāng)于一個(gè)filter。
Flink支持多種窗口類型,按照驅(qū)動(dòng)類型分為:時(shí)間驅(qū)動(dòng)的Time Window(如每30秒鐘)和數(shù)據(jù)驅(qū)動(dòng)的Count Window(如每100個(gè)事件),按照窗口的滾動(dòng)方式又可以分成:翻滾窗口(Tumbling Window,無重疊),滾動(dòng)窗口(Sliding Window,有重疊)和會(huì)話窗口(Session Window,活動(dòng)間隙),下圖可以看出分類區(qū)別:
Time Window 是根據(jù)時(shí)間對數(shù)據(jù)流進(jìn)行分組的,且窗口機(jī)制和時(shí)間類型是完全解耦的,也就是說當(dāng)需要改變時(shí)間類型時(shí)(三種時(shí)間)不需要更改窗口邏輯相關(guān)的代碼,Time Window 中常見的即為Tumbling Time Window和Sliding Time Window;
Count Window 是根據(jù)元素個(gè)數(shù)對數(shù)據(jù)流進(jìn)行分組的,也包括Tumbling Count Window和Sliding Count Window;
每一個(gè)窗口都擁有一個(gè)屬于自己的 Trigger,Trigger上會(huì)有定時(shí)器,用來決定一個(gè)窗口何時(shí)能夠被計(jì)算或清除,每當(dāng)有元素加入到該窗口,或者之前注冊的定時(shí)器超時(shí)了,那么Trigger都會(huì)被調(diào)用。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個(gè)Trigger的調(diào)用結(jié)果只是fire的話,那么會(huì)計(jì)算窗口并保留窗口原樣,也就是說窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時(shí)候再次執(zhí)行計(jì)算。一個(gè)窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會(huì)一直占用著內(nèi)存。
當(dāng)Trigger fire了,窗口中的元素集合就會(huì)交給Evictor(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表,并決定最先進(jìn)入窗口的多少個(gè)元素需要被移除。剩余的元素會(huì)交給用戶指定的函數(shù)進(jìn)行窗口的計(jì)算。如果沒有 Evictor 的話,窗口中的所有元素會(huì)一起交給函數(shù)進(jìn)行計(jì)算。
計(jì)算函數(shù)收到了窗口的元素(可能經(jīng)過了 Evictor 的過濾),并計(jì)算出窗口的結(jié)果值,并發(fā)送給下游。窗口的結(jié)果值可以是一個(gè)也可以是多個(gè)。DataStream API 上可以接收不同類型的計(jì)算函數(shù),包括預(yù)定義的sum(),min(),max(),還有 ReduceFunction,F(xiàn)oldFunction,還有WindowFunction。WindowFunction 是最通用的計(jì)算函數(shù),其他的預(yù)定義的函數(shù)基本都是基于該函數(shù)實(shí)現(xiàn)的。
Flink 對于一些聚合類的窗口計(jì)算(如sum,min)做了優(yōu)化,因?yàn)榫酆项惖挠?jì)算不需要將窗口中的所有數(shù)據(jù)都保存下來,只需要保存一個(gè)result值就可以了。每個(gè)進(jìn)入窗口的元素都會(huì)執(zhí)行一次聚合函數(shù)并修改result值。這樣可以大大降低內(nèi)存的消耗并提升性能。但是如果用戶定義了 Evictor,則不會(huì)啟用對聚合窗口的優(yōu)化,因?yàn)?Evictor 需要遍歷窗口中的所有元素,必須要將窗口中所有元素都存下來。
在運(yùn)用窗口計(jì)算時(shí),F(xiàn)link根據(jù)上有數(shù)據(jù)集是否是KeyedStream類型(數(shù)據(jù)是否按照Key分區(qū)),如果上游數(shù)據(jù)未分組則調(diào)用window()方法指定Windows Assigner,數(shù)據(jù)會(huì)根據(jù)Key在不同Task實(shí)例中并行計(jì)算,最后得出針對每個(gè)Key的統(tǒng)計(jì)結(jié)果,如果是Non-Keyed類型則調(diào)用WindowsAll()方法指定Windows Assigner,所有的數(shù)據(jù)都會(huì)在窗口算子中路由得到一個(gè)Task中計(jì)算,并得到全局統(tǒng)計(jì)結(jié)果;
定義完窗口分配器后,需要為每一個(gè)窗口指定計(jì)算邏輯,也就是Windows Function,F(xiàn)link提供了四種類型Window Function,分別是ReduceFunction、AggreateFunction、FoldFunction、ProcessWindowFunction,其中FoldFunction將逐漸不再使用;四種類型有分為增量聚合操作(ReduceFunction、AggreateFunction、FoldFunction)和全量聚合操作(ProcessWindowFunction);
增量聚合函數(shù)計(jì)算性能高,占用存儲(chǔ)空間少,因?yàn)槠渲恍枰S護(hù)窗口的中間結(jié)果狀態(tài)值,不需要緩存原始數(shù)據(jù);全量聚合函數(shù)使用代價(jià)相對高,性能較弱,因?yàn)樗阕有枰彺嬖摯翱诘慕尤霐?shù)據(jù),然后等窗口觸發(fā)后對所有原始數(shù)據(jù)進(jìn)行匯總計(jì)算,若接入數(shù)據(jù)量大或窗口時(shí)間長容易導(dǎo)致計(jì)算性能下降;
ReduceFunction和AggreateFunction相似,但前者的輸出類型和輸入類型一致(如使用tuple的某個(gè)字段聚合),后者更加靈活地提供3個(gè)復(fù)寫方法,add()定義數(shù)據(jù)的添加邏輯,getResult()定義根據(jù)Accumulator計(jì)算結(jié)果的邏輯,merge()方法定義合并accumulator的邏輯;
ProcessWindowFunction可以支撐更復(fù)雜的算子,其支持基于窗口全部數(shù)據(jù)元素的結(jié)果計(jì)算,當(dāng)算子需要窗口的元數(shù)據(jù)或狀態(tài)數(shù)據(jù),或者算子不支持運(yùn)算交換律和結(jié)合律(統(tǒng)計(jì)所有元素的中位數(shù)和眾數(shù)),需要該函數(shù)中的Context對象,Context類定義了Window的元數(shù)據(jù)及可以操作的Window的狀態(tài)數(shù)據(jù)包括GlobalState和WindowState;
大部分情況下,需要增量計(jì)算和全量計(jì)算結(jié)合,因?yàn)樵隽坑?jì)算雖然一定程度能夠提升窗口性能,但靈活性不及ProcessWindowFunction,兩者整合使用,既可以得到增量算子又可以得到窗口的元數(shù)據(jù)(窗口開始、終止時(shí)間等),比如在計(jì)算TOP N的場景中,分窗口計(jì)算完數(shù)據(jù)的計(jì)算后需要根據(jù)商品ID匯聚總的點(diǎn)擊數(shù);
由于網(wǎng)絡(luò)或系統(tǒng)等外部因素影響,事件數(shù)據(jù)不能及時(shí)傳輸?shù)紽link系統(tǒng)中,導(dǎo)致數(shù)據(jù)亂序、延遲等問題,因此需要一種機(jī)制能夠控制數(shù)據(jù)處理的過程和進(jìn)度;基于event_time時(shí)間的Windows創(chuàng)建后,具體如何確定屬于該Windows中的數(shù)據(jù)元素已經(jīng)全部到達(dá),如果確定全部到達(dá)就可以對所有數(shù)據(jù)進(jìn)行窗口計(jì)算操作(匯總、分組),如果數(shù)據(jù)沒有全部到達(dá),則繼續(xù)等待該窗口中的數(shù)據(jù),但是又不能無限期的等下去,需要有機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,此時(shí)watermark發(fā)揮作用了,它表示當(dāng)達(dá)到watermark后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù));Watermark是處理EventTime 窗口計(jì)算提出的機(jī)制,本質(zhì)上是一種時(shí)間戳,可以在讀取 Source時(shí)候指定或者在transformation操作之前,用自定義的Watermark生成器按照需求指定;
正常情況下,流式數(shù)據(jù)的到達(dá)時(shí)間是有序的,如下圖:
一般情況存在數(shù)據(jù)的亂序(out-of-order)和延遲(late element),此時(shí)水位線機(jī)制能表明該時(shí)間戳之前到當(dāng)前水位線時(shí)間戳的數(shù)據(jù)已經(jīng)全部達(dá)到,沒有比它(水位線)更早的數(shù)據(jù)了,并觸發(fā)計(jì)算;
Flink中生成水位線的方式有兩種:Periodic Watermarks(周期性)和Punctuated Watermarks,前者假設(shè)當(dāng)前時(shí)間戳減去固定時(shí)間,所有數(shù)據(jù)都能達(dá)到,后者要在特定事件指示后觸發(fā)生成水位線;
舉例說明Periodic Watermarks 工作方式:當(dāng)前window為10s,設(shè)想理想情況下消息都沒有延遲,那么eventTime等于系統(tǒng)當(dāng)前時(shí)間,假如設(shè)置watermark等于eventTime的時(shí)候,當(dāng)watermark = 00:00:10的時(shí)候,就會(huì)觸發(fā)w1的計(jì)算,這個(gè)時(shí)后因?yàn)橄⒍紱]有延遲,watermark之前的消息(00:00:00~00:00:10)都已經(jīng)落入到window中,所以會(huì)計(jì)算window中全量的數(shù)據(jù)。那么假如有一條消息eventTime是00:00:01 應(yīng)該屬于w1,在00:00:11才到達(dá),因?yàn)榧僭O(shè)消息沒有延遲,那么watermark等于當(dāng)前時(shí)間,00:00:11,這個(gè)時(shí)候w1已經(jīng)計(jì)算完畢,那么這條消息就會(huì)被丟棄,沒有加入計(jì)算,這樣就會(huì)出現(xiàn)問題。這是已經(jīng)可以理解,代碼中為什么要減去一個(gè)常量作為watermark,假設(shè)每次提取eventTime的時(shí)減去2s,那么當(dāng)data1在00:00:11到達(dá)的時(shí)候,watermark是00:00:09這個(gè)時(shí)候,w1還沒有觸發(fā)計(jì)算,那么data1會(huì)被加入w1,這個(gè)時(shí)候計(jì)算完全沒有問題,所以減去一個(gè)常量是為了對延時(shí)的消息進(jìn)行容錯(cuò);
Punctuated Watermarks提供自定義條件生成水位,例如判斷某個(gè)數(shù)據(jù)元素的當(dāng)前狀態(tài)或tuple類型的某個(gè)值,如果接入事件中狀態(tài)為0則觸發(fā)生成watermark,如果狀態(tài)不為0則不觸發(fā),需要分別復(fù)寫extractTimestamp和checkAndGetNextWatermark方法;
Flink允許提前預(yù)定義數(shù)據(jù)的提取器Timestamp Extractors,在讀取source時(shí)候定義提取時(shí)間戳;
基于Event_time的窗口計(jì)算雖然可以使用warterMark機(jī)制容忍部分延遲,但只能一定程度的緩解該問題,無法應(yīng)對某些延遲特別嚴(yán)重的場景。Flink默認(rèn)丟失延遲數(shù)據(jù),但用戶可以自定義延遲數(shù)據(jù)的處理方式,此時(shí)需要Allowed Lateness機(jī)制近數(shù)據(jù)的額外處理;
DataStream API提供Allowed Lateness方法指定是否對遲到數(shù)據(jù)進(jìn)行處理,參數(shù)是Time類型的時(shí)間間隔大小,代表允許的最大延遲時(shí)間,F(xiàn)link的窗口計(jì)算中會(huì)將Window的Endtime加上該時(shí)間作為窗口最后釋放的結(jié)束時(shí)間(P),當(dāng)接入的數(shù)據(jù)中Event time未超過該時(shí)間(P),但WaterMark已經(jīng)超過Window的Event_Time時(shí)直接觸發(fā)窗口計(jì)算,若Event_Time超過了時(shí)間P,則做丟棄處理;
通常情況下可以使用sideOutputLateData 方法對遲到數(shù)據(jù)進(jìn)行標(biāo)記,然后使用getSideOutput()方法得到被標(biāo)記的延遲數(shù)據(jù),分析延遲原因;
Connect:Flink 提供connect方法實(shí)現(xiàn)兩個(gè)流或多個(gè)流的合并,合并后生成ConnectedStreams,會(huì)對兩個(gè)流的數(shù)據(jù)應(yīng)用不同的處理方法,并且雙流之間可以共享狀態(tài)(比如計(jì)數(shù));ConnectedStream提供的map()和flatMap()需要定義CoMapFunction和CoFlatMapFunction分別處理輸入的DataStream數(shù)據(jù)集;
Union:Union算子主要實(shí)現(xiàn)兩個(gè)或者多個(gè)輸入流合并成一個(gè)數(shù)據(jù)集,需要保證兩個(gè)流的格式一致,輸出的流與輸入完全一致;
Flink支持窗口的多流關(guān)聯(lián),即在一個(gè)窗口上按照相同條件對多個(gè)輸入流進(jìn)行join操作,需要保證輸入的Stream構(gòu)建在相同的Windows上,且有相同類型的Key做為關(guān)聯(lián)條件;
數(shù)據(jù)集inputStream1通過join方法形成JoinedStreams類型數(shù)據(jù)集,調(diào)用where()方法指定inputStream1數(shù)據(jù)集的key,調(diào)用equalTo()方法指定inputStream2對應(yīng)關(guān)聯(lián)的key,通過window()方法指定Window Assigner,最后通過apply()方法中傳入用戶自定義的JoinFunction或者FlatJoinFunction對輸入數(shù)據(jù)元素進(jìn)行窗口計(jì)算;
Windows Join過程中所有的Join操作都是Inner Join類型,也就是必須滿足相同窗口中,每個(gè)Stream都有Key,且key相同才能完成關(guān)聯(lián)操作并輸出結(jié)果;
有狀態(tài)計(jì)算是Flink重要特性,其內(nèi)部存儲(chǔ)計(jì)算產(chǎn)生的中間結(jié)果并提供給后續(xù)的Function或算子使用,狀態(tài)數(shù)據(jù)維系在本地存儲(chǔ)中,可以是Flink的堆內(nèi)存或者堆外內(nèi)存中,也可以借助于第三方的存儲(chǔ)介質(zhì),同storm+ redis / hbase模式相比,F(xiàn)link完善的狀態(tài)管理減少了對外部系統(tǒng)的依賴,減少維護(hù)成本;
Flink根據(jù)數(shù)據(jù)集是否根據(jù)key分區(qū)將狀態(tài)分為Keyed State和 Operator State兩種類型,Keyed State只能用于KeyedStream類型數(shù)據(jù)集對應(yīng)的Function和Operation上,它是Operator State的特例;
Operator State只和并行的算子實(shí)例綁定,和數(shù)據(jù)元素中的key無關(guān),支持當(dāng)算子實(shí)例并行度發(fā)生變化后自動(dòng)重新分配狀態(tài)數(shù)據(jù);
Keyed State和 Operator State均有兩種形式,一種是托管狀態(tài),一種是原始狀態(tài),前者有Flink Runtime控制和管理狀態(tài)數(shù)據(jù)并將狀態(tài)數(shù)據(jù)轉(zhuǎn)換成內(nèi)存Hash tables 或RocksDB的對象存儲(chǔ),后者由算子自己管理數(shù)據(jù)結(jié)構(gòu),當(dāng)觸發(fā)CheckPoint后,F(xiàn)link并不知道狀態(tài)數(shù)據(jù)內(nèi)部的數(shù)據(jù)結(jié)構(gòu),只是將數(shù)據(jù)轉(zhuǎn)換成bytes數(shù)據(jù)存儲(chǔ)在CheckPoint中,當(dāng)從Checkpoint恢復(fù)任務(wù)時(shí),算子自己反序列化出狀態(tài)的數(shù)據(jù)結(jié)構(gòu);
Flink基于輕量級(jí)分布式快照算法提供了CheckPoint機(jī)制,分布式快照可以將同一時(shí)間點(diǎn)的Task/Operator狀態(tài)數(shù)據(jù)全局統(tǒng)一快照處理,包括Keyed State和Operator State
Savepoints是檢查點(diǎn)的一種特殊實(shí)現(xiàn),底層使用CheckPoint機(jī)制,Savepoint是用戶以手工命令方式觸發(fā)CheckPoint,并將結(jié)果持久化到指定的存儲(chǔ)路徑中,其主要目的是幫助用戶在升級(jí)和維護(hù)集群過程中保存系統(tǒng)的狀態(tài)數(shù)據(jù),避免因停機(jī)運(yùn)維或者升級(jí)到知道正常終止的應(yīng)用數(shù)據(jù)狀態(tài)無法恢復(fù)。
到此,關(guān)于“Flink基礎(chǔ)知識(shí)點(diǎn)有哪些”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。