您好,登錄后才能下訂單哦!
這篇文章主要介紹“flink中新的水印策略是什么”,在日常操作中,相信很多人在flink中新的水印策略是什么問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”flink中新的水印策略是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
在flink 1.11之前的版本中,提供了兩種生成水?。╓atermark)的策略,分別是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,這兩個(gè)接口都繼承自TimestampAssigner接口。
用戶(hù)想使用不同的水印生成方式,則需要實(shí)現(xiàn)不同的接口,但是這樣引發(fā)了一個(gè)問(wèn)題,對(duì)于想給水印添加一些通用的、公共的功能則變得復(fù)雜,因?yàn)槲覀冃枰o這兩個(gè)接口都同時(shí)添加新的功能,這樣還造成了代碼的重復(fù)。
所以為了避免代碼的重復(fù),在flink 1.11 中對(duì)flink的水印生成接口進(jìn)行了重構(gòu),
當(dāng)我們構(gòu)建了一個(gè)DataStream之后,使用assignTimestampsAndWatermarks方法來(lái)構(gòu)造水印,新的接口需要傳入一個(gè)WatermarkStrategy對(duì)象。
DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)
WatermarkStrategy 這個(gè)接口是做什么的呢?這里面提供了很多靜態(tài)的方法和帶有缺省實(shí)現(xiàn)的方法,只有一個(gè)方法是非default和沒(méi)有缺省實(shí)現(xiàn)的,就是下面的這個(gè)方法。
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
所以默認(rèn)情況下,我們只需要實(shí)現(xiàn)這個(gè)方法就行了,這個(gè)方法主要是返回一個(gè) WatermarkGenerator,我們?cè)谶M(jìn)入這里邊看看。
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
這個(gè)方法簡(jiǎn)單明了,主要是有兩個(gè)方法:
我們自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的周期性的發(fā)射水印的例子:
在這個(gè)onEvent方法里,我們從每個(gè)元素里抽取了一個(gè)時(shí)間字段,但是我們并沒(méi)有生成水印發(fā)射給下游,而是自己保存了在一個(gè)變量里,在onPeriodicEmit方法里,使用最大的日志時(shí)間減去我們想要的延遲時(shí)間作為水印發(fā)射給下游。
DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
new WatermarkStrategy<Tuple2<String,Long>>(){
@Override
public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context){
return new WatermarkGenerator<Tuple2<String,Long>>(){
private long maxTimestamp;
private long delay = 3000;
@Override
public void onEvent(
Tuple2<String,Long> event,
long eventTimestamp,
WatermarkOutput output){
maxTimestamp = Math.max(maxTimestamp, event.f1);
}
@Override
public void onPeriodicEmit(WatermarkOutput output){
output.emitWatermark(new Watermark(maxTimestamp - delay));
}
};
}
});
為了方便開(kāi)發(fā),flink提供了一些內(nèi)置的水印生成方法供我們使用。
通過(guò)靜態(tài)方法forBoundedOutOfOrderness提供,入?yún)⒔邮找粋€(gè)Duration類(lèi)型的時(shí)間間隔,也就是我們可以接受的最大的延遲時(shí)間.使用這種延遲策略的時(shí)候需要我們對(duì)數(shù)據(jù)的延遲時(shí)間有一個(gè)大概的預(yù)估判斷。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)
我們實(shí)現(xiàn)一個(gè)延遲3秒的固定延遲水印,可以這樣做:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
他的底層使用的WatermarkGenerator接口的一個(gè)實(shí)現(xiàn)類(lèi)BoundedOutOfOrdernessWatermarks。我們看下源碼中的這兩個(gè)方法,是不是和我們上面自己寫(xiě)的很像.
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
通過(guò)靜態(tài)方法forMonotonousTimestamps來(lái)提供.
WatermarkStrategy.forMonotonousTimestamps()
這個(gè)也就是相當(dāng)于上述的延遲策略去掉了延遲時(shí)間,以event中的時(shí)間戳充當(dāng)了水印。
在程序中可以這樣使用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
它的底層實(shí)現(xiàn)是AscendingTimestampsWatermarks,其實(shí)它就是BoundedOutOfOrdernessWatermarks類(lèi)的一個(gè)子類(lèi),沒(méi)有了延遲時(shí)間,我們來(lái)看看具體源碼的實(shí)現(xiàn).
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/**
* Creates a new watermark generator with for ascending timestamps.
*/
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
上述我們講了flink自帶的兩種水印生成策略,但是對(duì)于我們使用eventtime語(yǔ)義的時(shí)候,我們想從我們的自己的數(shù)據(jù)中抽取eventtime,這個(gè)就需要TimestampAssigner了.
@Public
@FunctionalInterface
public interface TimestampAssigner<T> {
............
long extractTimestamp(T element, long recordTimestamp);
}
使用的時(shí)候我們主要就是從我們自己的元素element中提取我們想要的eventtime。
使用flink自帶的水印策略和eventtime抽取類(lèi),可以這樣用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp)->event.f1));
在某些情況下,由于數(shù)據(jù)產(chǎn)生的比較少,導(dǎo)致一段時(shí)間內(nèi)沒(méi)有數(shù)據(jù)產(chǎn)生,進(jìn)而就沒(méi)有水印的生成,導(dǎo)致下游依賴(lài)水印的一些操作就會(huì)出現(xiàn)問(wèn)題,比如某一個(gè)算子的上游有多個(gè)算子,這種情況下,水印是取其上游兩個(gè)算子的較小值,如果上游某一個(gè)算子因?yàn)槿鄙贁?shù)據(jù)遲遲沒(méi)有生成水印,就會(huì)出現(xiàn)eventtime傾斜問(wèn)題,導(dǎo)致下游沒(méi)法觸發(fā)計(jì)算。
所以filnk通過(guò)WatermarkStrategy.withIdleness()方法允許用戶(hù)在配置的時(shí)間內(nèi)(即超時(shí)時(shí)間內(nèi))沒(méi)有記錄到達(dá)時(shí)將一個(gè)流標(biāo)記為空閑。這樣就意味著下游的數(shù)據(jù)不需要等待水印的到來(lái)。
當(dāng)下次有水印生成并發(fā)射到下游的時(shí)候,這個(gè)數(shù)據(jù)流重新變成活躍狀態(tài)。
通過(guò)下面的代碼來(lái)實(shí)現(xiàn)對(duì)于空閑數(shù)據(jù)流的處理
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
到此,關(guān)于“flink中新的水印策略是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。