溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink的窗口操作有哪些

發(fā)布時間:2022-02-19 11:51:03 來源:億速云 閱讀:220 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹Flink的窗口操作有哪些,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!

我們經(jīng)常需要在一個時間窗口維度上對數(shù)據(jù)進(jìn)行聚合,窗口是流處理應(yīng)用中經(jīng)常需要解決的問題。Flink的窗口算子為我們提供了方便易用的API,我們可以將數(shù)據(jù)流切分成一個個窗口,對窗口內(nèi)的數(shù)據(jù)進(jìn)行處理。

Flink的窗口操作有哪些

一、窗口(window)的類型

對于窗口的操作主要分為兩種,分別對于Keyedstream和Datastream。他們的主要區(qū)別也僅僅在于建立窗口的時候一個為.window(…),一個為.windowAll(…)。對于Keyedstream的窗口來說,他可以使得多任務(wù)并行計算,每一個logical key stream將會被獨立的進(jìn)行處理。

stream
      .keyBy(...)               "assigner"
     [.trigger(...)]            "trigger" (else default trigger)
     [.evictor(...)]            "evictor" (else no evictor)
     [.allowedLateness(...)]    "lateness" (else zero)
     [.sideOutputLateData(...)] "output tag" (else no side output for late data)
      .reduce/aggregate/fold/apply()      "function"
     [.getSideOutput(...)]      "output tag"

按照窗口的Assigner來分,窗口可以分為

Tumbling window, sliding window,session window,global window,custom window

每種窗口又可分別基于processing time和event time,這樣的話,窗口的類型嚴(yán)格來說就有很多。

還有一種window叫做count window,依據(jù)元素到達(dá)的數(shù)量進(jìn)行分配,之后也會提到。

窗口的生命周期開始在第一個屬于這個窗口的元素到達(dá)的時候,結(jié)束于第一個不屬于這個窗口的元素到達(dá)的時候。

二、窗口的操作

2.1 Tumbling window

固定相同間隔分配窗口,每個窗口之間沒有重疊看圖一眼明白。

Flink的窗口操作有哪些

下面的例子定義了每隔3毫秒一個窗口的流:

WindowedStream Rates = rates
   .keyBy(MovieRate::getUserId)
   .window(TumblingEventTimeWindows.of(Time.milliseconds(3)));

2.2 Sliding Windows

跟上面一樣,固定相同間隔分配窗口,只不過每個窗口之間有重疊。窗口重疊的部分如果比窗口小,窗口將會有多個重疊,即一個元素可能被分配到多個窗口里去。

Flink的窗口操作有哪些

下面的例子給出窗口大小為10毫秒,重疊為5毫秒的流:

WindowedStream Rates = rates
               .keyBy(MovieRate::getUserId)
               .window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5)));

2.3 Session window

這種窗口主要是根據(jù)活動的事件進(jìn)行窗口化,他們通常不重疊,也沒有一個固定的開始和結(jié)束時間。一個session window關(guān)閉通常是由于一段時間沒有收到元素。在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續(xù)活躍的周期),由非活躍的間隙分隔開。

Flink的窗口操作有哪些
// 靜態(tài)間隔時間
WindowedStream Rates = rates
               .keyBy(MovieRate::getUserId)
               .window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
// 動態(tài)時間
WindowedStream Rates = rates
               .keyBy(MovieRate::getUserId)
               .window(EventTimeSessionWindows.withDynamicGap(()));

2.4 Global window

將所有相同keyed的元素分配到一個窗口里。好吧,就這樣:

WindowedStream Rates = rates
   .keyBy(MovieRate::getUserId)
   .window(GlobalWindows.create());

三、窗口函數(shù)

窗口函數(shù)就是這四個:ReduceFunction,AggregateFunction,F(xiàn)oldFunction,ProcessWindowFunction。前兩個執(zhí)行得更有效,因為Flink可以增量地聚合每個到達(dá)窗口的元素。

Flink必須在調(diào)用函數(shù)之前在內(nèi)部緩沖窗口中的所有元素,所以使用ProcessWindowFunction進(jìn)行操作效率不高。不過ProcessWindowFunction可以跟其他的窗口函數(shù)結(jié)合使用,其他函數(shù)接受增量信息,ProcessWindowFunction接受窗口的元數(shù)據(jù)。

舉一個AggregateFunction的例子吧,下面代碼為MovieRate按user分組,且分配5毫秒的Tumbling窗口,返回每個user在窗口內(nèi)評分的所有分?jǐn)?shù)的平均值。

DataStream> Rates = rates
               .keyBy(MovieRate::getUserId)
               .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
               .aggregate(new AggregateFunction>() {
                   @Override
                   public AverageAccumulator createAccumulator() {
                       return new AverageAccumulator();
                   }

                   @Override
                   public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
                       acc.userId = movieRate.userId;
                       acc.sum += movieRate.rate;
                       acc.count++;
                       return acc;
                   }

                   @Override
                   public Tuple2 getResult(AverageAccumulator acc) {
                       return  Tuple2.of(acc.userId, acc.sum/(double)acc.count);
                   }

                   @Override
                   public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
                       acc0.count += acc1.count;
                       acc0.sum += acc1.sum;
                       return acc0;
                   }
               });

public static class AverageAccumulator{
       int userId;
       int count;
       double sum;
   }

以下是部分輸出:

...
1> (44,3.0)
4> (96,0.5)
2> (51,0.5)
3> (90,2.75)
...

看上面的代碼,會發(fā)現(xiàn)add()函數(shù)特別生硬,因為我們想返回Tuple2類型,即Integer為key,但AggregateFunction似乎沒有提供這個機(jī)制可以讓AverageAccumulator的構(gòu)造函數(shù)提供參數(shù)。所以,這里引入ProcessWindowFunction與AggregateFunction的結(jié)合版,AggregateFunction進(jìn)行增量疊加,當(dāng)窗口關(guān)閉時,ProcessWindowFunction將會被提供AggregateFunction返回的結(jié)果,進(jìn)行Tuple封裝:

DataStream> Rates = rates
   .keyBy(MovieRate::getUserId)
   .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
   .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());


public static class MyAggregateFunction implements AggregateFunction {
   @Override
   public AverageAccumulator createAccumulator() {
       return new AverageAccumulator();
   }

   @Override
   public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
       acc.sum += movieRate.rate;
       acc.count++;
       return acc;
   }

   @Override
   public Double getResult(AverageAccumulator acc) {
       return  acc.sum/(double)acc.count;
   }

   @Override
   public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
       acc0.count += acc1.count;
       acc0.sum += acc1.sum;
       return acc0;
   }
}

public static class MyProcessWindowFunction extends
   ProcessWindowFunction, Integer, TimeWindow> {

   @Override
   public void process(Integer key,
                       Context context,
                       Iterable results,
                       Collector> out) throws Exception {
       Double result = results.iterator().next();
       out.collect(new Tuple2(key, result));
   }
}

public static class AverageAccumulator{
   int count;
   double sum;
}

可以得到,結(jié)果與上面一樣,但代碼好看了很多。

四、其他操作

4.1 Triggers(觸發(fā)器)

觸發(fā)器定義了窗口何時準(zhǔn)備好被窗口處理。每個窗口分配器默認(rèn)都有一個觸發(fā)器,如果默認(rèn)的觸發(fā)器不符合你的要求,就可以使用trigger(…)自定義觸發(fā)器。

通常來說,默認(rèn)的觸發(fā)器適用于多種場景。例如,多有的event-time窗口分配器都有一個EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過窗口末尾時出發(fā)。

PS:GlobalWindow默認(rèn)的觸發(fā)器時NeverTrigger,該觸發(fā)器從不出發(fā),所以在使用GlobalWindow時必須自定義觸發(fā)器。

4.2 Evictors(驅(qū)逐器)

Evictors可以在觸發(fā)器觸發(fā)之后以及窗口函數(shù)被應(yīng)用之前和/或之后可選擇的移除元素。使用Evictor可以防止預(yù)聚合,因為窗口的所有元素都必須在應(yīng)用計算邏輯之前先傳給Evictor進(jìn)行處理

4.3 Allowed Lateness

當(dāng)使用event-time窗口時,元素可能會晚到,例如Flink用于跟蹤event-time進(jìn)度的watermark已經(jīng)超過了窗口的結(jié)束時間戳。

默認(rèn)來說,當(dāng)watermark超過窗口的末尾時,晚到的元素會被丟棄。但是flink也允許為窗口operator指定最大的allowed lateness,以至于可以容忍在徹底刪除元素之前依然接收晚到的元素,其默認(rèn)值是0。

為了支持該功能,F(xiàn)link會保持窗口的狀態(tài),知道allowed lateness到期。一旦到期,flink會刪除窗口并刪除其狀態(tài)。

把晚到的元素當(dāng)作side output。

SingleOutputStreamOperator result = input
   .keyBy()
   .window()
   .allowedLateness()
   .sideOutputLateData(lateOutputTag)
   .(function>);

以上是“Flink的窗口操作有哪些”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI