溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

flink1.2版本時(shí)間、水位線的介紹和用法

發(fā)布時(shí)間:2021-06-29 09:34:21 來(lái)源:億速云 閱讀:340 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“flink1.2版本時(shí)間、水位線的介紹和用法”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“flink1.2版本時(shí)間、水位線的介紹和用法”吧!

水位線

  • 水位線是flink的一種處理延時(shí)數(shù)據(jù)的機(jī)制,主要對(duì)設(shè)定時(shí)間內(nèi)延時(shí)數(shù)據(jù)的自動(dòng)容錯(cuò),水位線的本質(zhì)是時(shí)間戳,計(jì)算公式為:當(dāng)前事件最大時(shí)間值 - 數(shù)據(jù)延時(shí)時(shí)間。(看了幾遍有點(diǎn)懵)

  • 個(gè)人理解:

    • 水位線是收到數(shù)據(jù)邏輯時(shí)間便簽,是處理延時(shí)數(shù)據(jù)的基礎(chǔ),通過(guò)與數(shù)據(jù)自帶的生成時(shí)間Timestamps,實(shí)現(xiàn)延遲數(shù)據(jù)矯正。

種類(lèi)

順序事件中的Watermarks

  • 理想狀態(tài)下的水位線,即數(shù)據(jù)元素的事件事件是有序的,Watermark時(shí)間戳?xí)S著數(shù)據(jù)元素的事件時(shí)間安裝順序生成,此時(shí),水位線時(shí)間和時(shí)間時(shí)間保持一致。

亂序事件中的Watermarks

  • 現(xiàn)實(shí)情況數(shù)據(jù)元素往往并不按照其生產(chǎn)順序接入Flink,而頻繁處理亂序或遲到情況,這時(shí)候需要watermark來(lái)處理,當(dāng)事件8和事件11同時(shí)進(jìn)入系統(tǒng),flink系統(tǒng)將根據(jù)設(shè)定延時(shí)值分別計(jì)算它們的watermark,兩個(gè)事件到達(dá)一個(gè)operator中后,匹配事件時(shí)間的虛擬時(shí)間與watermark匹配,觸發(fā)響應(yīng)的計(jì)算。

并行數(shù)據(jù)流中的Watermarks

  • Watermark在Source Operator中生成,且在每個(gè)Operator的子Task中獨(dú)立生成。

  • 如果一個(gè)watermark同時(shí)更新一個(gè)算子Task的當(dāng)前事件時(shí)間,F(xiàn)link會(huì)選擇最小的水位線進(jìn)行更新。當(dāng)一個(gè)Window算子Task中水位線大于Window結(jié)束時(shí)間,立即觸發(fā)窗口計(jì)算。

時(shí)間概念

  • 流式處理中最大的特點(diǎn)是數(shù)據(jù)上具有時(shí)間的屬性特征,F(xiàn)link根據(jù)時(shí)間產(chǎn)生的位置不同,將時(shí)間分為三種概念:事件生成時(shí)間(Event Time)、事件接入時(shí)間(Ingestion TIme)、事件處理時(shí)間(Processing Time)。

    • 事件生成時(shí)間:數(shù)據(jù)從終端或系統(tǒng)中產(chǎn)生的過(guò)程消耗的時(shí)間。

    • 數(shù)據(jù)接入時(shí)間:數(shù)據(jù)接入DataSource時(shí)的時(shí)間。

    • 事件處理時(shí)間:處理過(guò)程中獲取的主機(jī)時(shí)間。

Event Time

  • Timestamps和Watermark成對(duì)對(duì)存在,使用時(shí),都要指定

watermark
  • watermark設(shè)定Flink中Watermark默認(rèn)200ms生成一次,也可以手動(dòng)指定,代碼如下:

// 1、創(chuàng)建flink運(yùn)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 設(shè)置并行度
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);  //處理模式設(shè)定:流或批

// 生成 watermark 的時(shí)間間隔(每 n 毫秒),設(shè)置周期性的產(chǎn)生水位線的時(shí)間間隔。當(dāng)數(shù)據(jù)流很大的時(shí)候,如果每個(gè)事件都產(chǎn)生水位線,會(huì)影響性能。
//env.getConfig().setAutoWatermarkInterval(1000); // 自動(dòng)水印時(shí)間間隔 12版本不用設(shè)置,有默認(rèn)

指定Timestamps

  • 此處以滾動(dòng)窗口為例,窗口知識(shí)下次分享,首先對(duì)數(shù)據(jù)進(jìn)行機(jī)構(gòu)化,數(shù)據(jù)結(jié)構(gòu):"yyyy-MM-dd HH:mm:ss|type|num",處理代碼如下:

SingleOutputStreamOperator<Tuple3<String,String, Integer>> formatData =text.map(new MapFunction<String, Tuple3<String, String, Integer>>() { 
  // 數(shù)據(jù)格式轉(zhuǎn)換
  private static final long serialVersionUID = 1L;
  @Override
  public Tuple3<String, String, Integer> map(String value) throws Exception {
    Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>();
    String[] dataTmp = value.split("\\|");

    data.f0 = dataTmp[0];
    data.f1 = dataTmp[1];
    data.f2 = Integer.parseInt(dataTmp[2]);
    return data;
  }
});
  • 設(shè)置Timestamps和最大時(shí)延

SingleOutputStreamOperator<Tuple3<String,String, Integer>> orderDSWithWatemark=formatData
    .assignTimestampsAndWatermarks( // 設(shè)置watermark  watemark = 最大事件時(shí)間 - 最大延遲或亂序時(shí)間
    WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定maxOutOfOrderness最大無(wú)序度時(shí)間即最大延遲時(shí)間/亂序時(shí)間
    .withTimestampAssigner((data,timestamp) -> Long.parseLong(DateUtil.dateToUTC(data.f0))*1000)  //時(shí)間為毫秒級(jí)

);
  • 設(shè)定窗口大小和處理邏輯

SingleOutputStreamOperator<Tuple3<String,String, Integer>> result=orderDSWithWatemark.keyBy(one -> one.f1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 設(shè)定窗口大小
//		.allowedLateness(Time.seconds(1))  //延時(shí)處理時(shí)間
//		.sideOutputLateData(lateOutputTag)   //側(cè)輸出
.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { // 處理邏輯
    private static final long serialVersionUID = -6695049408336015245L;

    @Override
    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1,
        Tuple3<String, String, Integer> value2) throws Exception {
      Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>();
      data.f0 = value2.f0;
      data.f1 = value1.f1;
      data.f2 = value1.f2 + value2.f2;
      System.out.println(data);
      return data;
    }
  });
result.print("滾動(dòng)事件時(shí)間");
env.execute();

總結(jié)

  • 時(shí)間和水位線是flink中比較難理解且重要的概念,我也是一知半解,在使用的過(guò)程中再慢慢深化,基本邏輯是針對(duì)數(shù)據(jù)建立自己的時(shí)間標(biāo)簽,并通過(guò)時(shí)間范圍(窗口)和數(shù)據(jù)延遲完成事件內(nèi)數(shù)據(jù)的匯集、計(jì)算和輸出,以此,完成更精確的實(shí)時(shí)事件數(shù)據(jù)計(jì)算。

  • 技術(shù)是需求的一種呈現(xiàn),基礎(chǔ)本質(zhì)相互交疊,編程語(yǔ)言、技術(shù)框架都是,最重要的細(xì)微處的優(yōu)化和整體的使用的簡(jiǎn)便,功能的穩(wěn)定和強(qiáng)大。

到此,相信大家對(duì)“flink1.2版本時(shí)間、水位線的介紹和用法”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI