溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink中Watermarks怎么用

發(fā)布時間:2021-12-31 10:27:20 來源:億速云 閱讀:182 作者:小新 欄目:大數(shù)據(jù)

這篇文章將為大家詳細講解有關(guān)Flink中Watermarks怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

Watermarks水印:為輸入的數(shù)據(jù)流的設(shè)置一個時間事件(時間戳),對窗口內(nèi)的數(shù)據(jù)輸入流無序與延遲提供解決方案

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1

TimestampsAndWatermarks.java

import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * @Description Watermarks水印:為輸入的數(shù)據(jù)流的設(shè)置一個時間事件(時間戳),對窗口內(nèi)的數(shù)據(jù)輸入流無序與延遲提供解決方案
 */
public class TimestampsAndWatermarks {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
     */

    /**
     * 遍歷集合,分別打印不同性別的信息,對于執(zhí)行超時,自動觸發(fā)定時器
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
        TimeCharacteristic有三種時間類型:
            ProcessingTime:以operator處理的時間為準,它使用的是機器的系統(tǒng)時間來作為data stream的時間;
            IngestionTime:以數(shù)據(jù)進入flink streaming data flow的時間為準;
            EventTime:以數(shù)據(jù)自帶的時間戳字段為準,應用程序需要指定如何從record中抽取時間戳字段;需要實現(xiàn)assignTimestampsAndWatermarks方法,并設(shè)置時間水位線;
         */
        //使用event time,需要指定事件的時間戳
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        //設(shè)置自動生成水印的時間周期,避免數(shù)據(jù)流量大的情況下,頻繁添加水印導致計算性能降低。
        env.getConfig().setAutoWatermarkInterval(1000L);
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();

        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
        DataStream<Tuple2<String, Integer>> dataStream = inStream
                //為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發(fā)此window的計算,Watermarks就是用來觸發(fā)window計算的。
                //Duration.ofSeconds(2),到數(shù)據(jù)流到達flink后,再水位線中設(shè)置延遲時間,也就是在所有數(shù)據(jù)流的最大的事件時間比window窗口結(jié)束時間大或相等時,再延遲多久觸發(fā)window窗口結(jié)束;
//                .assignTimestampsAndWatermarks(
//                        WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//                                .withTimestampAssigner((element, timestamp) -> {
//                                    long times = System.currentTimeMillis() ;
//                                    System.out.println(element.f1 + ","+ element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
//                                    return times;
//                                })
//                )
                .assignTimestampsAndWatermarks(new MyWatermarkStrategy()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) {
                                long times = System.currentTimeMillis();
                                System.out.println(element.f1 + "," + element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                                return times;
                            }
                        }))
                //分區(qū)窗口
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //觸發(fā)3s滾動窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                //執(zhí)行窗口數(shù)據(jù),對keyBy數(shù)據(jù)流批量處理
                .apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                        long times = System.currentTimeMillis() ;
                        System.out.println();
                        System.out.println("窗口處理時間:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                        Iterator<Tuple3<String, String, Integer>> iterator = input.iterator();
                        int total = 0;
                        int size = 0;
                        String sex = "";
                        while (iterator.hasNext()){
                            Tuple3<String, String, Integer> tuple3 = iterator.next();
                            total += tuple3.f2;
                            size ++;
                            sex = tuple3.f1;
                        }
                        out.collect(new Tuple2<>(sex, total / size));
                    }
                });

        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 定期水印生成器
     */
    public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{
        @Override
        public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Tuple3<String, String, Integer>>() {
                //設(shè)置固定的延遲量3.5 seconds
                private final long maxOutOfOrderness = 3500;
                private long currentMaxTimestamp;

                /**
                 * 事件處理
                 * @param event             數(shù)據(jù)流對象
                 * @param eventTimestamp    事件水位線時間
                 * @param output            輸出
                 */
                @Override
                public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 拿上一個水印時間 - 延遲量 = 等于給的窗口最終數(shù)據(jù)最后時間(如果在窗口到期內(nèi),未發(fā)生新的水印事件,則按window正常結(jié)束時間計算,當在最后水印時間-延遲量的時間范圍內(nèi),有新的數(shù)據(jù)流進入,則會重新觸發(fā)窗口內(nèi)對全部數(shù)據(jù)流計算)
                    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
                }
            };
        }
    }

    /**
     * 模擬數(shù)據(jù)持續(xù)輸出
     */
    public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
        @Override
        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
            int j = 0;
            for (int i=0;i<100;i++){
                if (i%6 == 0){
                    j=0;
                }
                ctx.collect(tuple3List.get(j));
                //1秒鐘輸出一個
                Thread.sleep(1 * 1000);
                j ++;
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

打印結(jié)果

man,張三的水位線為:2020-12-27 10:28:20
girl,李四的水位線為:2020-12-27 10:28:21
man,王五的水位線為:2020-12-27 10:28:22
girl,劉六的水位線為:2020-12-27 10:28:23
girl,伍七的水位線為:2020-12-27 10:28:24

窗口處理時間:2020-12-27 10:28:25
(man,20)
man,吳八的水位線為:2020-12-27 10:28:25
man,張三的水位線為:2020-12-27 10:28:26
girl,李四的水位線為:2020-12-27 10:28:27

窗口處理時間:2020-12-27 10:28:28
(girl,28)

窗口處理時間:2020-12-27 10:28:28
(man,29)

關(guān)于“Flink中Watermarks怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI