您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“flink1.2版本時(shí)間、水位線的介紹和用法”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“flink1.2版本時(shí)間、水位線的介紹和用法”吧!
水位線是flink的一種處理延時(shí)數(shù)據(jù)的機(jī)制,主要對(duì)設(shè)定時(shí)間內(nèi)延時(shí)數(shù)據(jù)的自動(dòng)容錯(cuò),水位線的本質(zhì)是時(shí)間戳,計(jì)算公式為:當(dāng)前事件最大時(shí)間值 - 數(shù)據(jù)延時(shí)時(shí)間。(看了幾遍有點(diǎn)懵)
個(gè)人理解:
水位線是收到數(shù)據(jù)邏輯時(shí)間便簽,是處理延時(shí)數(shù)據(jù)的基礎(chǔ),通過(guò)與數(shù)據(jù)自帶的生成時(shí)間Timestamps,實(shí)現(xiàn)延遲數(shù)據(jù)矯正。
理想狀態(tài)下的水位線,即數(shù)據(jù)元素的事件事件是有序的,Watermark時(shí)間戳?xí)S著數(shù)據(jù)元素的事件時(shí)間安裝順序生成,此時(shí),水位線時(shí)間和時(shí)間時(shí)間保持一致。
現(xiàn)實(shí)情況數(shù)據(jù)元素往往并不按照其生產(chǎn)順序接入Flink,而頻繁處理亂序或遲到情況,這時(shí)候需要watermark來(lái)處理,當(dāng)事件8和事件11同時(shí)進(jìn)入系統(tǒng),flink系統(tǒng)將根據(jù)設(shè)定延時(shí)值分別計(jì)算它們的watermark,兩個(gè)事件到達(dá)一個(gè)operator中后,匹配事件時(shí)間的虛擬時(shí)間與watermark匹配,觸發(fā)響應(yīng)的計(jì)算。
Watermark在Source Operator中生成,且在每個(gè)Operator的子Task中獨(dú)立生成。
如果一個(gè)watermark同時(shí)更新一個(gè)算子Task的當(dāng)前事件時(shí)間,F(xiàn)link會(huì)選擇最小的水位線進(jìn)行更新。當(dāng)一個(gè)Window算子Task中水位線大于Window結(jié)束時(shí)間,立即觸發(fā)窗口計(jì)算。
流式處理中最大的特點(diǎn)是數(shù)據(jù)上具有時(shí)間的屬性特征,F(xiàn)link根據(jù)時(shí)間產(chǎn)生的位置不同,將時(shí)間分為三種概念:事件生成時(shí)間(Event Time)、事件接入時(shí)間(Ingestion TIme)、事件處理時(shí)間(Processing Time)。
事件生成時(shí)間:數(shù)據(jù)從終端或系統(tǒng)中產(chǎn)生的過(guò)程消耗的時(shí)間。
數(shù)據(jù)接入時(shí)間:數(shù)據(jù)接入DataSource時(shí)的時(shí)間。
事件處理時(shí)間:處理過(guò)程中獲取的主機(jī)時(shí)間。
Timestamps和Watermark成對(duì)對(duì)存在,使用時(shí),都要指定
watermark設(shè)定Flink中Watermark默認(rèn)200ms生成一次,也可以手動(dòng)指定,代碼如下:
// 1、創(chuàng)建flink運(yùn)行環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); // 設(shè)置并行度 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //處理模式設(shè)定:流或批 // 生成 watermark 的時(shí)間間隔(每 n 毫秒),設(shè)置周期性的產(chǎn)生水位線的時(shí)間間隔。當(dāng)數(shù)據(jù)流很大的時(shí)候,如果每個(gè)事件都產(chǎn)生水位線,會(huì)影響性能。 //env.getConfig().setAutoWatermarkInterval(1000); // 自動(dòng)水印時(shí)間間隔 12版本不用設(shè)置,有默認(rèn)
此處以滾動(dòng)窗口為例,窗口知識(shí)下次分享,首先對(duì)數(shù)據(jù)進(jìn)行機(jī)構(gòu)化,數(shù)據(jù)結(jié)構(gòu):"yyyy-MM-dd HH:mm:ss|type|num",處理代碼如下:
SingleOutputStreamOperator<Tuple3<String,String, Integer>> formatData =text.map(new MapFunction<String, Tuple3<String, String, Integer>>() { // 數(shù)據(jù)格式轉(zhuǎn)換 private static final long serialVersionUID = 1L; @Override public Tuple3<String, String, Integer> map(String value) throws Exception { Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>(); String[] dataTmp = value.split("\\|"); data.f0 = dataTmp[0]; data.f1 = dataTmp[1]; data.f2 = Integer.parseInt(dataTmp[2]); return data; } });
設(shè)置Timestamps和最大時(shí)延
SingleOutputStreamOperator<Tuple3<String,String, Integer>> orderDSWithWatemark=formatData .assignTimestampsAndWatermarks( // 設(shè)置watermark watemark = 最大事件時(shí)間 - 最大延遲或亂序時(shí)間 WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定maxOutOfOrderness最大無(wú)序度時(shí)間即最大延遲時(shí)間/亂序時(shí)間 .withTimestampAssigner((data,timestamp) -> Long.parseLong(DateUtil.dateToUTC(data.f0))*1000) //時(shí)間為毫秒級(jí) );
設(shè)定窗口大小和處理邏輯
SingleOutputStreamOperator<Tuple3<String,String, Integer>> result=orderDSWithWatemark.keyBy(one -> one.f1) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 設(shè)定窗口大小 // .allowedLateness(Time.seconds(1)) //延時(shí)處理時(shí)間 // .sideOutputLateData(lateOutputTag) //側(cè)輸出 .reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { // 處理邏輯 private static final long serialVersionUID = -6695049408336015245L; @Override public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception { Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>(); data.f0 = value2.f0; data.f1 = value1.f1; data.f2 = value1.f2 + value2.f2; System.out.println(data); return data; } }); result.print("滾動(dòng)事件時(shí)間"); env.execute();
時(shí)間和水位線是flink中比較難理解且重要的概念,我也是一知半解,在使用的過(guò)程中再慢慢深化,基本邏輯是針對(duì)數(shù)據(jù)建立自己的時(shí)間標(biāo)簽,并通過(guò)時(shí)間范圍(窗口)和數(shù)據(jù)延遲完成事件內(nèi)數(shù)據(jù)的匯集、計(jì)算和輸出,以此,完成更精確的實(shí)時(shí)事件數(shù)據(jù)計(jì)算。
技術(shù)是需求的一種呈現(xiàn),基礎(chǔ)本質(zhì)相互交疊,編程語(yǔ)言、技術(shù)框架都是,最重要的細(xì)微處的優(yōu)化和整體的使用的簡(jiǎn)便,功能的穩(wěn)定和強(qiáng)大。
到此,相信大家對(duì)“flink1.2版本時(shí)間、水位線的介紹和用法”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。