溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

【Flink】Flink對(duì)于遲到數(shù)據(jù)的處理

發(fā)布時(shí)間:2020-06-05 01:09:20 來(lái)源:網(wǎng)絡(luò) 閱讀:1015 作者:13157330443 欄目:大數(shù)據(jù)

設(shè)置允許延遲的時(shí)間是通過(guò)allowedLateness(lateness: Time)設(shè)置


保存延遲數(shù)據(jù)則是通過(guò)sideOutputLateData(outputTag: OutputTag[T])保存


獲取延遲數(shù)據(jù)是通過(guò)DataStream.getSideOutput(tag: OutputTag[X])獲取


下面先分別講解這幾個(gè)方法,再給出具體的實(shí)例加深理解


1、allowedLateness(lateness: Time)

def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {

? ? javaStream.allowedLateness(lateness)

? ? this

}

該方法傳入一個(gè)Time值,設(shè)置允許數(shù)據(jù)遲到的時(shí)間,這個(gè)時(shí)間和waterMark中的時(shí)間概念不同。再來(lái)回顧一下,


waterMark=數(shù)據(jù)的事件時(shí)間-允許亂序時(shí)間值


隨著新數(shù)據(jù)的到來(lái),waterMark的值會(huì)更新為最新數(shù)據(jù)事件時(shí)間-允許亂序時(shí)間值,但是如果這時(shí)候來(lái)了一條歷史數(shù)據(jù),waterMark值則不會(huì)更新??偟膩?lái)說(shuō),waterMark是為了能接收到盡可能多的亂序數(shù)據(jù)。


那這里的Time值呢?主要是為了等待遲到的數(shù)據(jù),在一定時(shí)間范圍內(nèi),如果屬于該窗口的數(shù)據(jù)到來(lái),仍會(huì)進(jìn)行計(jì)算,后面會(huì)對(duì)計(jì)算方式仔細(xì)說(shuō)明


注意:該方法只針對(duì)于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值則會(huì)拋出異常


2、sideOutputLateData(outputTag: OutputTag[T])

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {

? ? javaStream.sideOutputLateData(outputTag)

? ? this

}

該方法是將遲來(lái)的數(shù)據(jù)保存至給定的outputTag參數(shù),而OutputTag則是用來(lái)標(biāo)記延遲數(shù)據(jù)的一個(gè)對(duì)象。


3、DataStream.getSideOutput(tag: OutputTag[X])

通過(guò)window等操作返回的DataStream調(diào)用該方法,傳入標(biāo)記延遲數(shù)據(jù)的對(duì)象來(lái)獲取延遲的數(shù)據(jù)


4、對(duì)延遲數(shù)據(jù)的理解

延遲數(shù)據(jù)是指:


在當(dāng)前窗口【假設(shè)窗口范圍為10-15】已經(jīng)計(jì)算之后,又來(lái)了一個(gè)屬于該窗口的數(shù)據(jù)【假設(shè)事件時(shí)間為13】,這時(shí)候仍會(huì)觸發(fā)window操作,這種數(shù)據(jù)就稱為延遲數(shù)據(jù)。


那么問(wèn)題來(lái)了,延遲時(shí)間怎么計(jì)算呢?


假設(shè)窗口范圍為10-15,延遲時(shí)間為2s,則只要waterMark<15+2,并且屬于該窗口,就能觸發(fā)window操作。而如果來(lái)了一條數(shù)據(jù)使得waterMark>=15+2,10-15這個(gè)窗口就不能再觸發(fā)window操作,即使新來(lái)的數(shù)據(jù)的event-time<15+2+3


5、代碼實(shí)例講解

大概講解一下代碼的流程:


1、監(jiān)聽(tīng)某主機(jī)的9000端口,讀取socket數(shù)據(jù)(格式為? name:timestamp)


2、給當(dāng)前進(jìn)入flink程序的數(shù)據(jù)加上waterMark,值為eventTime-3s


3、根據(jù)name值進(jìn)行分組,根據(jù)窗口大小為5s劃分窗口,設(shè)置允許遲到時(shí)間為2s,依次統(tǒng)計(jì)窗口中各name值的數(shù)據(jù)


4、輸出統(tǒng)計(jì)結(jié)果以及遲到數(shù)據(jù)


5、啟動(dòng)Job


import org.apache.commons.lang3.time.FastDateFormat

import org.apache.flink.api.java.tuple.Tuple

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks

import org.apache.flink.streaming.api.scala.function.WindowFunction

import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}

import org.apache.flink.streaming.api.watermark.Watermark

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.util.Collector

?

import scala.collection.mutable.ArrayBuffer

?

/**

? * 延遲測(cè)試

? * 詳細(xì)講解博客地址:https://blog.csdn.net/hlp4207/article/details/90717905

? */

object WaterMarkFunc02 {

? // 線程安全的時(shí)間格式化對(duì)象

? val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")

?

? def main(args: Array[String]): Unit = {

? ? val hostName = "s102"

? ? val port = 9000

? ? val delimiter = '\n'

? ? val env = StreamExecutionEnvironment.getExecutionEnvironment

? ? // 將EventTime設(shè)置為流數(shù)據(jù)時(shí)間類型

? ? env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

? ? env.setParallelism(1)

? ? val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)

? ? import org.apache.flink.api.scala._

? ? val data = streams.map(data => {

? ? ? // 輸入數(shù)據(jù)格式:name:時(shí)間戳

? ? ? // flink:1559223685000

? ? ? try {

? ? ? ? val items = data.split(":")

? ? ? ? (items(0), items(1).toLong)

? ? ? } catch {

? ? ? ? case _: Exception => println("輸入數(shù)據(jù)不符合格式:" + data)

? ? ? ? ? ("0", 0L)

? ? ? }

? ? }).filter(data => !data._1.equals("0") && data._2 != 0L)

?

? ? //為數(shù)據(jù)流中的元素分配時(shí)間戳,并定期創(chuàng)建水印以監(jiān)控事件時(shí)間進(jìn)度

? ? val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {

? ? ? // 事件時(shí)間

? ? ? var currentMaxTimestamp = 0L

? ? ? val maxOutOfOrderness = 3000L

? ? ? var lastEmittedWatermark: Long = Long.MinValue

?

? ? ? // Returns the current watermark

? ? ? override def getCurrentWatermark: Watermark = {

? ? ? ? // 允許延遲三秒

? ? ? ? val potentialWM = currentMaxTimestamp - maxOutOfOrderness

? ? ? ? // 保證水印能依次遞增

? ? ? ? if (potentialWM >= lastEmittedWatermark) {

? ? ? ? ? lastEmittedWatermark = potentialWM

? ? ? ? }

? ? ? ? new Watermark(lastEmittedWatermark)

? ? ? }

?

? ? ? // Assigns a timestamp to an element, in milliseconds since the Epoch

? ? ? override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {

? ? ? ? // 將元素的時(shí)間字段值作為該數(shù)據(jù)的timestamp

? ? ? ? val time = element._2

? ? ? ? if (time > currentMaxTimestamp) {

? ? ? ? ? currentMaxTimestamp = time

? ? ? ? }

? ? ? ? val outData = String.format("key: %s? ? EventTime: %s? ? waterMark:? %s",

? ? ? ? ? element._1,

? ? ? ? ? sdf.format(time),

? ? ? ? ? sdf.format(getCurrentWatermark.getTimestamp))

? ? ? ? println(outData)

? ? ? ? time

? ? ? }

? ? })

? ? val lateData = new OutputTag[(String,Long)]("late")

? ? val result: DataStream[String] = waterStream.keyBy(0)// 根據(jù)name值進(jìn)行分組

? ? ? .window(TumblingEventTimeWindows.of(Time.seconds(5L)))// 5s跨度的基于事件時(shí)間的翻滾窗口

? ? /**

? ? ? * 對(duì)于此窗口而言,允許2秒的遲到數(shù)據(jù),即第一次觸發(fā)是在watermark > end-of-window時(shí)

? ? ? * 第二次(或多次)觸發(fā)的條件是watermark < end-of-window + allowedLateness時(shí)間內(nèi),這個(gè)窗口有l(wèi)ate數(shù)據(jù)到達(dá)

? ? ? */

? ? ? .allowedLateness(Time.seconds(2L))

? ? ? .sideOutputLateData(lateData)

? ? ? .apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {

? ? ? ? override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {

? ? ? ? ? val timeArr = ArrayBuffer[String]()

? ? ? ? ? val iterator = input.iterator

? ? ? ? ? while (iterator.hasNext) {

? ? ? ? ? ? val tup2 = iterator.next()

? ? ? ? ? ? timeArr.append(sdf.format(tup2._2))

? ? ? ? ? }

? ? ? ? ? val outData = String.format("key: %s? ? data: %s? ? startTime:? %s? ? endTime:? %s",

? ? ? ? ? ? key.toString,

? ? ? ? ? ? timeArr.mkString("-"),

? ? ? ? ? ? sdf.format(window.getStart),

? ? ? ? ? ? sdf.format(window.getEnd))

? ? ? ? ? out.collect(outData)

? ? ? ? }

? ? ? })

? ? result.print("window計(jì)算結(jié)果:")

?

? ? val late = result.getSideOutput(lateData)

? ? late.print("遲到的數(shù)據(jù):")

?

? ? env.execute(this.getClass.getName)

? }

}

接下來(lái)開(kāi)始輸入數(shù)據(jù)進(jìn)行測(cè)試驗(yàn)證:




可以看到window范圍為【15-20】,這時(shí)候我們?cè)佥斎霂讞l屬于該范圍的數(shù)據(jù):






輸入了事件時(shí)間為17、16、15三條數(shù)據(jù),都觸發(fā)了window操作,那我們?cè)囍斎胍幌麓翱诜秶鸀椤?0-15】的數(shù)據(jù):






窗口范圍為【10-15】的數(shù)據(jù)則屬于遲到的數(shù)據(jù),已經(jīng)超過(guò)了最大等待時(shí)間,我們可以來(lái)試著計(jì)算一下允許上個(gè)窗口遲到數(shù)據(jù)的waterMark值


窗口結(jié)束時(shí)間+延遲時(shí)間=最大waterMark值


15 + 2 = 17


當(dāng)前的waterMark值為20,大于17,所以窗口范圍為10-15的數(shù)據(jù)已經(jīng)是遲到的數(shù)據(jù)了


再來(lái)計(jì)算一下窗口時(shí)間范圍為15-20的臨界值:


20 + 2 = 22


即當(dāng)waterMark上漲到22,15-20窗口范圍內(nèi)的數(shù)據(jù)就屬于遲到數(shù)據(jù),不能再參與計(jì)算了


記住我們算出的臨界值22,繼續(xù)輸入數(shù)據(jù)測(cè)試:






輸入數(shù)據(jù)A時(shí),waterMark上漲至21,此時(shí)輸入屬于15-20窗口范圍內(nèi)的數(shù)據(jù)B,依然能觸發(fā)窗口操作;


輸入數(shù)據(jù)C,waterMark上漲至22,等于剛才我們算出來(lái)的臨界值,此時(shí)輸入,數(shù)據(jù)B,則已不能觸發(fā)窗口操作,屬于遲到的數(shù)據(jù)。


最后,總結(jié)一下flink對(duì)于延遲數(shù)據(jù)的處理:


如果延遲的數(shù)據(jù)有業(yè)務(wù)需要,則設(shè)置好允許延遲的時(shí)間,每個(gè)窗口都有屬于自己的最大等待延遲數(shù)據(jù)的時(shí)間限制:


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI