溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

四、flink--window、eventTime和wate

發(fā)布時(shí)間:2020-05-18 13:41:13 來(lái)源:網(wǎng)絡(luò) 閱讀:439 作者:隔壁小白 欄目:大數(shù)據(jù)

一、flink的window機(jī)制

1.1 window概述

? streaming流式計(jì)算是一種被設(shè)計(jì)用于處理無(wú)限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無(wú)限數(shù)據(jù)集是指一種不斷增長(zhǎng)的本質(zhì)上無(wú)限的數(shù)據(jù)集,而window是一種切割無(wú)限數(shù)據(jù)為有限塊進(jìn)行處理的手段。
? Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計(jì)算操作。

1.2 window的類(lèi)型

window可以分為兩大類(lèi):
CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個(gè)Window,與時(shí)間無(wú)關(guān)。比較少用
TimeWindow:按照時(shí)間生成Window。非常常用,下面主要將時(shí)間窗口有哪些類(lèi)型。主要有四類(lèi):滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)、會(huì)話窗口(Session Window)和全局窗口(global window比較少用 )。

1.2.1 滾動(dòng)窗口(Tumbling Windows)

概述:將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切片。只有一個(gè)工作參數(shù),就是窗口大小
特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊。
? 滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊(前后時(shí)間點(diǎn)都是緊接著的)。例如:如果你指定了一個(gè)5分鐘大小的滾動(dòng)窗口,窗口的創(chuàng)建如下圖所示:
四、flink--window、eventTime和wate
? 圖 1.2.1 滾動(dòng)窗口
適用場(chǎng)景:適合做BI統(tǒng)計(jì)等(做每個(gè)時(shí)間段的聚合計(jì)算)。

1.2.2 滑動(dòng)窗口(Sliding Windows)

概述:滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口工作參數(shù)由固定的窗口長(zhǎng)度和滑動(dòng)間隔組成。
特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,有重疊。
? 滑動(dòng)窗口分配器將元素分配到固定長(zhǎng)度的窗口中,與滾動(dòng)窗口類(lèi)似,窗口的大小由窗口大小參數(shù)來(lái)配置,另一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口開(kāi)始的頻率。因此,滑動(dòng)窗口如果滑動(dòng)參數(shù)小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動(dòng),那么每個(gè)窗口中5分鐘的窗口里包含著上個(gè)10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:
四、flink--window、eventTime和wate
? 圖 1.2.2 滑動(dòng)窗口
適用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來(lái)決定是否要報(bào)警)。

1.2.3 會(huì)話窗口(Session Windows)

概述:由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的timeout間隙組成,類(lèi)似于web應(yīng)用的session,也就是一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口。
特點(diǎn):時(shí)間無(wú)對(duì)齊。窗口無(wú)固定長(zhǎng)度
? session窗口分配器通過(guò)session活動(dòng)來(lái)對(duì)元素進(jìn)行分組,session窗口跟滾動(dòng)窗口和滑動(dòng)窗口相比,不會(huì)有重疊和固定的開(kāi)始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那個(gè)這個(gè)窗口就會(huì)關(guān)閉。一個(gè)session窗口通過(guò)一個(gè)session間隔來(lái)配置,這個(gè)session間隔定義了非活躍周期的長(zhǎng)度,當(dāng)這個(gè)非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。
四、flink--window、eventTime和wate
? 圖1.2.3 會(huì)話窗口

1.3 window窗口api

1.3.1 window api分類(lèi)

window數(shù)據(jù)源分為兩種,一種是典型的KV類(lèi)型(keyedStream),另一種是非KV類(lèi)型(Non-keyedStream)。
區(qū)別:
keyedStream:
需要在使用窗口操作前,調(diào)用 keyBy對(duì)KV按照key進(jìn)行分區(qū),然后才可以調(diào)用window操作的api,比如 countWindow,timeWindow等

Non-keyedstream:
如果使用窗口操作前,沒(méi)有使用keyBy算子,那么就認(rèn)為是Non-keyedstream,調(diào)用的window api就是 xxxWindowAll,比如countWindowAll,timeWindowAll,而且因?yàn)槭欠荎V,所以無(wú)法分區(qū),也就是只有一個(gè)分區(qū),那么這個(gè)窗口并行度只能是1。這個(gè)是要注意的。

1.3.2 countWindow

CountWindow根據(jù)窗口中相同key元素的數(shù)量來(lái)觸發(fā)執(zhí)行,執(zhí)行時(shí)只計(jì)算元素?cái)?shù)量達(dá)到窗口大小的key對(duì)應(yīng)的結(jié)果。

有兩個(gè)用法:
countWindow(window_size):只指定窗口大小,此時(shí)窗口是滾動(dòng)窗口
countWindow(window_size, slide):指定窗口大小以及滑動(dòng)間隔,此時(shí)窗口是滑動(dòng)窗口

注意:CountWindow的window_size指的是相同Key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。

1、滾動(dòng)窗口
默認(rèn)的CountWindow是一個(gè)滾動(dòng)窗口,只需要指定窗口大小即可,當(dāng)元素?cái)?shù)量達(dá)到窗口大小時(shí),就會(huì)觸發(fā)窗口的執(zhí)行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("/test.txt");
        source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String s1 : s.split(" ")) {
                    collector.collect(new Tuple2<>(s1, 1));
                }
            }
        }).keyBy(0).countWindow(5).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        env.execute("滾動(dòng)窗口");
    }

}

2、滑動(dòng)窗口
動(dòng)窗口和滾動(dòng)窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。
下面代碼中的sliding_size設(shè)置為了2,也就是說(shuō),每收到兩個(gè)相同key的數(shù)據(jù)就計(jì)算一次,每一次計(jì)算的window范圍是5個(gè)元素。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("/test.txt");
        source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String s1 : s.split(" ")) {
                    collector.collect(new Tuple2<>(s1, 1));
                }
            }
        }).keyBy(0).countWindow(5,2).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        env.execute("滑動(dòng)窗口");
    }

}

1.3.3 timeWindow

? TimeWindow是將指定時(shí)間范圍內(nèi)的所有數(shù)據(jù)組成一個(gè)window,一次對(duì)一個(gè)window里面的所有數(shù)據(jù)進(jìn)行計(jì)算。同樣支持類(lèi)似上面的滾動(dòng)窗口和滑動(dòng)窗口模式。有兩個(gè)工作參數(shù):window_size和slide。只指定window_size時(shí)是滾動(dòng)窗口。

1、滾動(dòng)窗口
? Flink默認(rèn)的時(shí)間窗口根據(jù)Processing Time 進(jìn)行窗口的劃分,將Flink獲取到的數(shù)據(jù)根據(jù)進(jìn)入Flink的時(shí)間劃分到不同的窗口中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("/test.txt");
        source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String s1 : s.split(" ")) {
                    collector.collect(new Tuple2<>(s1, 1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(2)).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        env.execute("滾動(dòng)窗口");
    }

}

2、滑動(dòng)窗口
和上面類(lèi)似,就是參數(shù)里面增加了slide參數(shù),也就是滑動(dòng)時(shí)間間隔。時(shí)間間隔可以通過(guò)Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個(gè)來(lái)指定。

1.3.4 window reduce

也就是在窗口算子之后執(zhí)行reduce算子,用法和普通的reduce一樣,只不過(guò)reduce的單位是一個(gè)窗口。即每一個(gè)窗口返回一次reduce結(jié)果。程序在上面,不重復(fù)了。

1.3.5 window fold

也就是在窗口算子之后執(zhí)行fold算子,用法和普通的fold一樣,只不過(guò)fold的單位是一個(gè)窗口。即每一個(gè)窗口返回一次reduce結(jié)果。程序在上面,不重復(fù)了。

1.3.6 window聚合操作

指的是max、min等這些聚合算子,只不過(guò)是在window算子之后使用,以窗口為單位,每一個(gè)窗口返回一次聚合結(jié)果,而不是像普通那樣,每一次聚合結(jié)果都返回。

二、time、watermark和window

2.1 flink中 time的分類(lèi)

在flink中,time有不同分類(lèi),如下:
Event Time:
是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,F(xiàn)link通過(guò)時(shí)間戳分配器訪問(wèn)事件時(shí)間戳。

Ingestion Time:
是數(shù)據(jù)進(jìn)入Flink的時(shí)間。

Processing Time:
是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。也就是數(shù)據(jù)被處理時(shí)的當(dāng)前時(shí)間。

這些時(shí)間有什么不同呢?因網(wǎng)絡(luò)傳輸需要時(shí)間,所以Ingestion Time不一定和Event Time相等,很多情況下是不等的。同樣Processing Time表示數(shù)據(jù)處理時(shí)的時(shí)間,如果數(shù)據(jù)是很久之前采集的,現(xiàn)在才處理,那么很明顯,三個(gè)時(shí)間time都不會(huì)相等的。
四、flink--window、eventTime和wate
? 圖 2.1 flink--時(shí)間的概念

例子:
一條日志進(jìn)入Flink的時(shí)間為2017-11-12 10:00:00.123,到達(dá)Window的系統(tǒng)時(shí)間為2017-11-12 10:00:01.234,日志的內(nèi)容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
可以看到,三個(gè)time都不相等。而對(duì)于業(yè)務(wù)來(lái)說(shuō),要統(tǒng)計(jì)1min內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?—— eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。但是flink默認(rèn)的窗口的時(shí)間是Processing Time,那么如何引入eventTime呢?

2.2 eventTime的引入

? 在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用eventTime,一般只在eventTime無(wú)法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。默認(rèn)使用的是ProcessingTime。那么如何指定flink使用指定的time呢?

2.2.1 引入方式1:設(shè)置env時(shí)間類(lèi)型

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(時(shí)間類(lèi)型);

//三種類(lèi)型的time對(duì)應(yīng)如下:
TimeCharacteristic.EventTime;  eventtime
TimeCharacteristic.IngestionTime;  到達(dá)flink的時(shí)間
TimeCharacteristic.ProcessingTime;  處理數(shù)據(jù)的時(shí)間

這種方式是整個(gè)env全局生效的,是直接將env默認(rèn)的時(shí)間設(shè)置為eventtime。后面的窗口操作默認(rèn)就會(huì)使用eventtime作為時(shí)間依據(jù)。如果想不同的窗口設(shè)置不同的時(shí)間類(lèi)型,這種方式就行不通了。

2.2.2 引入方式2:?jiǎn)为?dú)設(shè)置window的實(shí)際類(lèi)型

stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.window這個(gè)api就是所有窗口總的api,其他窗口api都是通過(guò)這個(gè)api封裝出來(lái)的??梢酝ㄟ^(guò)這個(gè)總api,參數(shù)直接窗口的類(lèi)型,比如上面的就是指定eventtime 的timewindow,這樣并不會(huì)影響整個(gè)env的時(shí)間類(lèi)型。

同樣的,其他時(shí)間類(lèi)型窗口,比如:
SlidingEventTimeWindows  滑動(dòng)eventtime窗口

基本上看名字就知道是什么時(shí)間類(lèi)型(三大時(shí)間類(lèi)型)、以及什么類(lèi)型(滑動(dòng)、滾動(dòng)、會(huì)話窗口)的窗口了。注意:eventtime沒(méi)有session窗口,processingTime和

2.3 watermark的原理

2.3.1 引入背景

? 我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。
四、flink--window、eventTime和wate
? 圖 2.3 數(shù)據(jù)的亂序
? 那么此時(shí)出現(xiàn)一個(gè)問(wèn)題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無(wú)限期的等下去,此時(shí)必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。
解釋?zhuān)?
如果只按照到達(dá)的event的eventtime來(lái)觸發(fā)窗口操作,假設(shè)有event1~5。如果到達(dá)順序是亂的,比如event5最先達(dá)到,然后event1也達(dá)到了,那么flink這邊怎么知道這中間還有沒(méi)有數(shù)據(jù)呢?沒(méi)辦法的,不能確定數(shù)據(jù)是否完整到達(dá),也不能無(wú)限制等待下去。所以需要一種機(jī)制來(lái)處理這種情況。

2.3.2 watermark機(jī)制原理

? Watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性,數(shù)據(jù)本身攜帶著對(duì)應(yīng)的Watermark。Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
? 數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
? Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長(zhǎng)t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被watermark觸發(fā)執(zhí)行。
解釋?zhuān)?br/>? watermark是一種概率性的機(jī)制。假設(shè)event1~5,如果event5已經(jīng)到達(dá)了,那么其實(shí)按照event產(chǎn)生的先后順序,正常情況下,前面的event1~4應(yīng)該也到達(dá)了。而為了保證前面的event1~4的到達(dá)(其實(shí)是更多的到達(dá),但是不一定全部都到達(dá)),在event5到達(dá)了之后,提供一定的延遲時(shí)間t。當(dāng)event5到達(dá),且經(jīng)過(guò) t 時(shí)間之后,正常情況下,前面的event1~4 大概率會(huì)到達(dá)了,如果沒(méi)有到達(dá),屬于少數(shù)情況,那么就認(rèn)為event5之前的event都到達(dá)了,無(wú)論是否真的全部到達(dá)了。如果在延遲時(shí)間之后到達(dá)了,這個(gè)舊數(shù)據(jù)直接會(huì)被丟棄。所以其實(shí)watermark就是一種保障更多event亂序到達(dá)的機(jī)制,提供了一定的延時(shí)機(jī)制,而因?yàn)橹粫?huì)延遲一定的時(shí)間,所以也不會(huì)導(dǎo)致flink無(wú)限期地等待下去。

有序數(shù)據(jù)流的watermark如下:(watermark設(shè)置為0)
四、flink--window、eventTime和wate
? 圖 2.4 有序數(shù)據(jù)流的watermark
亂序數(shù)據(jù)流的watermark如下:(watermark設(shè)置為2)
四、flink--window、eventTime和wate
? 圖 2.5 亂序數(shù)據(jù)流的watermark
? 當(dāng)Flink接收到每一條數(shù)據(jù)時(shí),都會(huì)產(chǎn)生一條Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime - 延遲時(shí)長(zhǎng)t,也就是說(shuō),Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過(guò)程中無(wú)法獲取新的數(shù)據(jù),那么沒(méi)有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
? 上圖中,我們?cè)O(shè)置的允許最大延遲到達(dá)時(shí)間為2s,所以時(shí)間戳為7s的事件對(duì)應(yīng)的Watermark是5s,時(shí)間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時(shí)間戳為7s的事件到達(dá)時(shí)的Watermarker恰好觸發(fā)窗口1,時(shí)間戳為12s的事件到達(dá)時(shí)的Watermark恰好觸發(fā)窗口2。
? Window會(huì)不斷產(chǎn)生,屬于這個(gè)Window范圍的數(shù)據(jù)會(huì)被不斷加入到Window中,所有未被觸發(fā)的Window都會(huì)等待觸發(fā),只要Window還沒(méi)觸發(fā),屬于這個(gè)Window范圍的數(shù)據(jù)就會(huì)一直被加入到Window中,直到Window被觸發(fā)才會(huì)停止數(shù)據(jù)的追加,而當(dāng)Window觸發(fā)之后才接受到的屬于被觸發(fā)Window的數(shù)據(jù)會(huì)被丟棄。如果產(chǎn)生的窗口中沒(méi)有新到的數(shù)據(jù),也就不會(huì)有watermark,那么窗口就不會(huì)被觸發(fā)計(jì)算。

2.3.3 watermark的觸發(fā)計(jì)算的條件

watermark時(shí)間(max_eventTime-t) >= window_end_time;
在[window_start_time,window_end_time)中有數(shù)據(jù)存在。

2.3.4 watermark的產(chǎn)生方式

Punctuated:不間斷產(chǎn)生
數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark。
在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。

Periodic:周期性產(chǎn)生
周期性的(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。
在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會(huì)有很大的延時(shí)。

這兩種有不同的api實(shí)現(xiàn),下面會(huì)講

2.4 watermark的引入以及接口

2.4.1 watermark引入

需要先引入eventime,然后引入watermark

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStreamSource<String> source = env.readTextFile("/test.txt");

//引入的watermark的實(shí)現(xiàn)類(lèi)
source.assignTimestampsAndWatermarks(xx)

watermark的實(shí)現(xiàn)有兩大類(lèi),對(duì)應(yīng)上面的兩種watermark的產(chǎn)生方式,有兩個(gè)接口:

AssignerWithPeriodicWatermarks;   周期性產(chǎn)生watermark,即Period
AssignerWithPunctuatedWatermarks;  Punctuated:不間斷產(chǎn)生

2.4.2 AssignerWithPeriodicWatermarks接口

看看AssignerWithPeriodicWatermarks這個(gè)接口的源碼,主要用于周期性產(chǎn)生watermark

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    //獲取當(dāng)前的watermark
    @Nullable
    Watermark getCurrentWatermark();
}

//父接口===================
public interface TimestampAssigner<T> extends Function {
    //獲取當(dāng)前的時(shí)間戳
    long extractTimestamp(T var1, long var2);
}

主要就是有兩個(gè)方法需要覆蓋,getCurrentWatermark()用于生成watermark,extractTimestamp用于獲取每個(gè)event的timestamp。
由于這是一個(gè)周期性產(chǎn)生watermark的接口,所以需要指定這個(gè)生成周期有多長(zhǎng),需要env的配置中指定,如:

env.getConfig().setAutoWatermarkInterval(n ms);
記住間隔時(shí)間單位是毫秒

例子:

/*根據(jù)eventTime 創(chuàng)建處理watermark
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    //watermark延遲時(shí)間 t,單位是毫秒
    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    //保存當(dāng)前最大的時(shí)間戳
    private long currentMaxTimestamp;

    //根據(jù)傳遞進(jìn)來(lái)的event,獲取time,然后如果比當(dāng)前最大的time還大,就替換,否則保持。因?yàn)閿?shù)據(jù)亂序到達(dá)是無(wú)法保證時(shí)間是遞增的
    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    //返回watermark
    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

再加上設(shè)置的setAutoWatermarkInterval(n ms),就可以周期性生成watermark。

2.4.3 AssignerWithPunctuatedWatermarks接口

看看AssignerWithPunctuatedWatermarks這個(gè)接口的源碼,主要用于實(shí)時(shí)產(chǎn)生watermark

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
    //獲取最新的watermark
    @Nullable
    Watermark checkAndGetNextWatermark(T var1, long var2);
}

//父接口
public interface TimestampAssigner<T> extends Function {
    //從event中獲取timestamp
    long extractTimestamp(T var1, long var2);
}

寫(xiě)法其實(shí)和上面的類(lèi)似,只是這里不會(huì)設(shè)置生成watermark的時(shí)間間隔

2.4.4 flink自帶的watermark實(shí)現(xiàn)類(lèi)

1、BoundedOutOfOrdernessTimestampExtractor
繼承了AssignerWithPeriodicWatermarks接口的一個(gè)類(lèi),看看它的源碼

package org.apache.flink.streaming.api.functions.timestamps;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = -9223372036854775808L;
    private final long maxOutOfOrderness;

    //構(gòu)造方法中接收一個(gè)參數(shù),就是延遲時(shí)間 t
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0L) {
            throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        } else {
            this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
            this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness;
        }
    }

    public long getMaxOutOfOrdernessInMillis() {
        return this.maxOutOfOrderness;
    }

    //需要重寫(xiě)的方法,用于獲取timestamp
    public abstract long extractTimestamp(T var1);

    //獲取watermark的方法已經(jīng)寫(xiě)好了,用傳遞進(jìn)來(lái)的延遲時(shí)間t來(lái)計(jì)算得出watermark
    public final Watermark getCurrentWatermark() {
        long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
        if (potentialWM >= this.lastEmittedWatermark) {
            this.lastEmittedWatermark = potentialWM;
        }

        return new Watermark(this.lastEmittedWatermark);
    }

    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = this.extractTimestamp(element);
        if (timestamp > this.currentMaxTimestamp) {
            this.currentMaxTimestamp = timestamp;
        }

        return timestamp;
    }
}

這個(gè)類(lèi)就是實(shí)現(xiàn)了用戶(hù)可以自定義設(shè)定延遲時(shí)間t 的一個(gè)watermark。

2、AscendingTimestampExtractor
也是繼承了AssignerWithPeriodicWatermarks接口的一個(gè)類(lèi)。具有穩(wěn)定的遞增時(shí)間戳的數(shù)據(jù)源,比如kafka的分區(qū)數(shù)據(jù),每一條信息都是遞增+1的,適用于這個(gè)類(lèi)。只需要重寫(xiě)
extractAscendingTimestamp方法。

2.5 eventTime、window和watermark結(jié)合使用例子

package flinktest;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class EventTimeTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        DataStreamSource<String> source = env.readTextFile("/tmp/test.txt");

        source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(3000)) {
            @Override
            public long extractTimestamp(String s) {
                return Integer.valueOf(s.split(" ")[0]);
            }
        }).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Tuple2<String, Integer> tmpTuple = new Tuple2<>();
                for (String s1 : s.split(" ")) {
                    tmpTuple.setFields(s1, 1);
                    collector.collect(tmpTuple);
                }
            }
        }).keyBy(0)
                .timeWindow(Time.seconds(10))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                        return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
                    }
                })
                .print();
         try {
            env.execute("eventtime test");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

window api的類(lèi)繼承結(jié)構(gòu)

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI