溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

flink中新的水印策略是什么

發(fā)布時(shí)間:2021-12-31 10:32:13 來(lái)源:億速云 閱讀:130 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“flink中新的水印策略是什么”,在日常操作中,相信很多人在flink中新的水印策略是什么問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”flink中新的水印策略是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

背景

在flink 1.11之前的版本中,提供了兩種生成水?。╓atermark)的策略,分別是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,這兩個(gè)接口都繼承自TimestampAssigner接口。

用戶(hù)想使用不同的水印生成方式,則需要實(shí)現(xiàn)不同的接口,但是這樣引發(fā)了一個(gè)問(wèn)題,對(duì)于想給水印添加一些通用的、公共的功能則變得復(fù)雜,因?yàn)槲覀冃枰o這兩個(gè)接口都同時(shí)添加新的功能,這樣還造成了代碼的重復(fù)。

所以為了避免代碼的重復(fù),在flink 1.11 中對(duì)flink的水印生成接口進(jìn)行了重構(gòu),

新的水印生成接口

當(dāng)我們構(gòu)建了一個(gè)DataStream之后,使用assignTimestampsAndWatermarks方法來(lái)構(gòu)造水印,新的接口需要傳入一個(gè)WatermarkStrategy對(duì)象。

DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)
 

WatermarkStrategy 這個(gè)接口是做什么的呢?這里面提供了很多靜態(tài)的方法和帶有缺省實(shí)現(xiàn)的方法,只有一個(gè)方法是非default和沒(méi)有缺省實(shí)現(xiàn)的,就是下面的這個(gè)方法。

 /**
  * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
  */
 @Override
 WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
 

所以默認(rèn)情況下,我們只需要實(shí)現(xiàn)這個(gè)方法就行了,這個(gè)方法主要是返回一個(gè) WatermarkGenerator,我們?cè)谶M(jìn)入這里邊看看。

@Public
public interface WatermarkGenerator<T> {

 /**
  * Called for every event, allows the watermark generator to examine and remember the
  * event timestamps, or to emit a watermark based on the event itself.
  */
 void onEvent(T event, long eventTimestamp, WatermarkOutput output);

 /**
  * Called periodically, and might emit a new watermark, or not.
  *
  * <p>The interval in which this method is called and Watermarks are generated
  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
  */
 void onPeriodicEmit(WatermarkOutput output);
}
 

這個(gè)方法簡(jiǎn)單明了,主要是有兩個(gè)方法:

  • onEvent :每個(gè)元素都會(huì)調(diào)用這個(gè)方法,如果我們想依賴(lài)每個(gè)元素生成一個(gè)水印,然后發(fā)射到下游(可選,就是看是否用output來(lái)收集水印),我們可以實(shí)現(xiàn)這個(gè)方法.
  • onPeriodicEmit : 如果數(shù)據(jù)量比較大的時(shí)候,我們每條數(shù)據(jù)都生成一個(gè)水印的話(huà),會(huì)影響性能,所以這里還有一個(gè)周期性生成水印的方法。這個(gè)水印的生成周期可以這樣設(shè)置:env.getConfig().setAutoWatermarkInterval(5000L);

我們自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的周期性的發(fā)射水印的例子:

在這個(gè)onEvent方法里,我們從每個(gè)元素里抽取了一個(gè)時(shí)間字段,但是我們并沒(méi)有生成水印發(fā)射給下游,而是自己保存了在一個(gè)變量里,在onPeriodicEmit方法里,使用最大的日志時(shí)間減去我們想要的延遲時(shí)間作為水印發(fā)射給下游。

  DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
    new WatermarkStrategy<Tuple2<String,Long>>(){
     @Override
     public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(
       WatermarkGeneratorSupplier.Context context){
      return new WatermarkGenerator<Tuple2<String,Long>>(){
       private long maxTimestamp;
       private long delay = 3000;
       @Override
       public void onEvent(
         Tuple2<String,Long> event,
         long eventTimestamp,
         WatermarkOutput output){
        maxTimestamp = Math.max(maxTimestamp, event.f1);
       }
       @Override
       public void onPeriodicEmit(WatermarkOutput output){
        output.emitWatermark(new Watermark(maxTimestamp - delay));
       }
      };
     }
    });
     

內(nèi)置水印生成策略

為了方便開(kāi)發(fā),flink提供了一些內(nèi)置的水印生成方法供我們使用。 

固定延遲生成水印

通過(guò)靜態(tài)方法forBoundedOutOfOrderness提供,入?yún)⒔邮找粋€(gè)Duration類(lèi)型的時(shí)間間隔,也就是我們可以接受的最大的延遲時(shí)間.使用這種延遲策略的時(shí)候需要我們對(duì)數(shù)據(jù)的延遲時(shí)間有一個(gè)大概的預(yù)估判斷。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

 

我們實(shí)現(xiàn)一個(gè)延遲3秒的固定延遲水印,可以這樣做:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

 

他的底層使用的WatermarkGenerator接口的一個(gè)實(shí)現(xiàn)類(lèi)BoundedOutOfOrdernessWatermarks。我們看下源碼中的這兩個(gè)方法,是不是和我們上面自己寫(xiě)的很像.

 @Override
 public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
  maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
 }

 @Override
 public void onPeriodicEmit(WatermarkOutput output) {
  output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
 }
   

單調(diào)遞增生成水印

通過(guò)靜態(tài)方法forMonotonousTimestamps來(lái)提供.

WatermarkStrategy.forMonotonousTimestamps()
 

這個(gè)也就是相當(dāng)于上述的延遲策略去掉了延遲時(shí)間,以event中的時(shí)間戳充當(dāng)了水印。

在程序中可以這樣使用:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

 

它的底層實(shí)現(xiàn)是AscendingTimestampsWatermarks,其實(shí)它就是BoundedOutOfOrdernessWatermarks類(lèi)的一個(gè)子類(lèi),沒(méi)有了延遲時(shí)間,我們來(lái)看看具體源碼的實(shí)現(xiàn).

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

 /**
  * Creates a new watermark generator with for ascending timestamps.
  */
 public AscendingTimestampsWatermarks() {
  super(Duration.ofMillis(0));
 }
}
   

event時(shí)間的獲取

上述我們講了flink自帶的兩種水印生成策略,但是對(duì)于我們使用eventtime語(yǔ)義的時(shí)候,我們想從我們的自己的數(shù)據(jù)中抽取eventtime,這個(gè)就需要TimestampAssigner了.

@Public
@FunctionalInterface
public interface TimestampAssigner<T> {

    ............
    
 long extractTimestamp(T element, long recordTimestamp);
}

 

使用的時(shí)候我們主要就是從我們自己的元素element中提取我們想要的eventtime。

使用flink自帶的水印策略和eventtime抽取類(lèi),可以這樣用:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(
    WatermarkStrategy
      .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
      .withTimestampAssigner((event, timestamp)->event.f1));
   

處理空閑數(shù)據(jù)源

在某些情況下,由于數(shù)據(jù)產(chǎn)生的比較少,導(dǎo)致一段時(shí)間內(nèi)沒(méi)有數(shù)據(jù)產(chǎn)生,進(jìn)而就沒(méi)有水印的生成,導(dǎo)致下游依賴(lài)水印的一些操作就會(huì)出現(xiàn)問(wèn)題,比如某一個(gè)算子的上游有多個(gè)算子,這種情況下,水印是取其上游兩個(gè)算子的較小值,如果上游某一個(gè)算子因?yàn)槿鄙贁?shù)據(jù)遲遲沒(méi)有生成水印,就會(huì)出現(xiàn)eventtime傾斜問(wèn)題,導(dǎo)致下游沒(méi)法觸發(fā)計(jì)算。

所以filnk通過(guò)WatermarkStrategy.withIdleness()方法允許用戶(hù)在配置的時(shí)間內(nèi)(即超時(shí)時(shí)間內(nèi))沒(méi)有記錄到達(dá)時(shí)將一個(gè)流標(biāo)記為空閑。這樣就意味著下游的數(shù)據(jù)不需要等待水印的到來(lái)。

當(dāng)下次有水印生成并發(fā)射到下游的時(shí)候,這個(gè)數(shù)據(jù)流重新變成活躍狀態(tài)。

通過(guò)下面的代碼來(lái)實(shí)現(xiàn)對(duì)于空閑數(shù)據(jù)流的處理

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

到此,關(guān)于“flink中新的水印策略是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI