溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink的SideOutputSplit分流怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-31 15:29:28 來源:億速云 閱讀:146 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink的SideOutputSplit分流怎么實(shí)現(xiàn)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink的SideOutputSplit分流怎么實(shí)現(xiàn)”吧!

版本說明:

環(huán)境: Windiws

Scala: 2.11.8

Flink :1.10.1

大部分的DataStream API的算子的輸出是單一輸出,也就是某種數(shù)據(jù)類型的流。

除了split算子,可以將一條流分成多條流,這些流的數(shù)據(jù)類型也都相同。

process function的side outputs功能可以產(chǎn)生多條流(Flink 1.9版本之后推薦此種方案),并且這些流的數(shù)據(jù)類型可以不一樣。一個(gè)side output可以定義為OutputTag[X]對(duì)象,X是輸出流的數(shù)據(jù)類型。process function可以通過Context對(duì)象發(fā)射一個(gè)事件到一個(gè)或者多個(gè)side outputs。

import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

/**
  *
  * @param deviceNo    設(shè)備號(hào)
  * @param timestamp   時(shí)間戳
  * @param temperature 溫度
  */
case class SensorReading(deviceNo: String, timestamp: Long, temperature: Double)

object SensorReadingSplitStreaming {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //設(shè)置時(shí)間語義  時(shí)間發(fā)生時(shí)間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)


        val socketSource: DataStream[String] = env.readTextFile("D:\\tmp\\file1.txt")

        val mapStream: DataStream[SensorReading] = socketSource
            .map(data => {
                val split: Array[String] = data.split(",")
                SensorReading(split(0).trim, split(1).trim.toLong, split(2).trim.toDouble)
            })

        //對(duì)數(shù)據(jù)流進(jìn)行分流處理
        val tmpStageStream: DataStream[SensorReading] = mapStream.process(new TempStageProcess())

        tmpStageStream.print("main");
        val lowStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("low-tmp"))
        val highStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("high-tmp"))
        lowStream.print("low")
        highStream.print("high")
        env.execute()
    }

}


class TempStageProcess() extends ProcessFunction[SensorReading, SensorReading] {
    // 定義側(cè)輸出流
    lazy val lowTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("low-tmp");
    lazy val HighTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("high-tmp");

    //處理數(shù)據(jù)
    override def processElement(value: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = {
        if (value.temperature < 10) {
            context.output(lowTmp, (value.deviceNo, value.temperature))
        } else if (value.temperature > 70) {
            context.output(HighTmp, (value.deviceNo, value.temperature))
        } else {
            collector.collect(value)
        }
    }
}  //測(cè)試文件內(nèi)容如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

/*
設(shè)備8,1610035289736,84.3
設(shè)備5,1610035371758,38.8
設(shè)備5,1610035458637,60.2
設(shè)備1,1610035543127,10.2
設(shè)備7,1610035623427,51.6
設(shè)備5,1610035705302,20.1
設(shè)備5,1610035787387,12.9
設(shè)備7,1610035877019,88.2
設(shè)備6,1610035960537,33.5
設(shè)備7,1610036043040,63.0
設(shè)備5,1610036125179,64.5
設(shè)備6,1610036214972,30.2
設(shè)備5,1610036296542,56.5
設(shè)備7,1610036377999,29.7
設(shè)備6,1610036467523,59.4
設(shè)備4,1610036557446,71.1
設(shè)備5,1610036641100,28.2
設(shè)備2,1610036725803,88.8
設(shè)備8,1610036808041,73.5
設(shè)備1,1610036897060,18.0
設(shè)備7,1610036980127,14.9
設(shè)備2,1610037069523,47.4
設(shè)備4,1610037154507,59.5
設(shè)備5,1610037235099,35.0
設(shè)備6,1610037317868,76.4
設(shè)備2,1610037403367,10.0
設(shè)備2,1610037484177,18.5
設(shè)備4,1610037571384,98.7
設(shè)備5,1610037653666,95.6
設(shè)備6,1610037735520,32.6
設(shè)備6,1610037823906,83.3
設(shè)備3,1610037913756,29.1
設(shè)備7,1610037994980,74.6
設(shè)備6,1610038081606,22.2
設(shè)備3,1610038163043,10.4
設(shè)備5,1610038244717,56.9
設(shè)備3,1610038326227,64.8
設(shè)備4,1610038411053,65.0
設(shè)備8,1610038500538,93.2
設(shè)備8,1610038583924,76.2
設(shè)備1,1610038670150,42.1
設(shè)備5,1610038756839,35.1
設(shè)備3,1610038840180,75.9
設(shè)備3,1610038929751,83.4
設(shè)備7,1610039019422,24.1
設(shè)備3,1610039101778,85.0
設(shè)備8,1610039183077,45.6
設(shè)備3,1610039264498,79.5
設(shè)備1,1610039351600,44.4
設(shè)備8,1610039434187,73.3
設(shè)備3,1610039518048,77.9
設(shè)備7,1610039598556,9.79
設(shè)備4,1610039679144,19.0
設(shè)備2,1610039761967,56.1
設(shè)備3,1610039847823,88.2
設(shè)備6,1610039933024,77.4
設(shè)備7,1610040014212,14.4
設(shè)備4,1610040101627,98.2
設(shè)備8,1610040182379,85.0
設(shè)備6,1610040265210,61.8
設(shè)備2,1610040345769,48.0
設(shè)備3,1610040432855,19.9
設(shè)備4,1610040515943,30.9
設(shè)備4,1610040601373,51.7
設(shè)備1,1610040681803,29.7
設(shè)備8,1610040770779,31.6
設(shè)備3,1610040851986,67.1
設(shè)備4,1610040941421,93.2
設(shè)備7,1610041022836,37.2
設(shè)備8,1610041105401,84.6
設(shè)備6,1610041189301,19.2
設(shè)備4,1610041270735,99.0
設(shè)備4,1610041354109,77.0
設(shè)備5,1610041435113,49.7
設(shè)備1,1610041521773,74.2
設(shè)備8,1610041603035,42.2
設(shè)備3,1610041687230,87.1
設(shè)備1,1610041767985,82.7
設(shè)備3,1610041848130,0.59
設(shè)備4,1610041933021,7.38
設(shè)備2,1610042016080,28.9
設(shè)備2,1610042103229,99.2
設(shè)備2,1610042190222,42.2
設(shè)備3,1610042277841,12.0
設(shè)備7,1610042364076,93.5
設(shè)備7,1610042444652,10.5
設(shè)備4,1610042530461,68.5
設(shè)備1,1610042615421,78.2
設(shè)備3,1610042702219,18.5
設(shè)備6,1610042787478,64.8
設(shè)備5,1610042874301,6.34
設(shè)備2,1610042956073,65.6
設(shè)備8,1610043038793,10.6
設(shè)備8,1610043122971,30.3
設(shè)備7,1610043203810,17.5
設(shè)備8,1610043291566,83.8
設(shè)備5,1610043373188,30.5
設(shè)備2,1610043456107,84.7
設(shè)備1,1610043545998,53.4
設(shè)備3,1610043627174,97.4

 */

輸出結(jié)果:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/.m2/repository/ch/qos/logback/logback-classic/1.2.0/logback-classic-1.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17:19:42,659 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
17:19:42,725 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
17:19:43,088 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
17:19:43,089 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
high> (設(shè)備8,84.3)
main> SensorReading(設(shè)備5,1610035371758,38.8)
main> SensorReading(設(shè)備5,1610035458637,60.2)
main> SensorReading(設(shè)備1,1610035543127,10.2)
main> SensorReading(設(shè)備7,1610035623427,51.6)
main> SensorReading(設(shè)備5,1610035705302,20.1)
main> SensorReading(設(shè)備5,1610035787387,12.9)
high> (設(shè)備7,88.2)
main> SensorReading(設(shè)備6,1610035960537,33.5)
main> SensorReading(設(shè)備7,1610036043040,63.0)
main> SensorReading(設(shè)備5,1610036125179,64.5)
main> SensorReading(設(shè)備6,1610036214972,30.2)
main> SensorReading(設(shè)備5,1610036296542,56.5)
main> SensorReading(設(shè)備7,1610036377999,29.7)
main> SensorReading(設(shè)備6,1610036467523,59.4)
high> (設(shè)備4,71.1)
main> SensorReading(設(shè)備5,1610036641100,28.2)
high> (設(shè)備2,88.8)
high> (設(shè)備8,73.5)
main> SensorReading(設(shè)備1,1610036897060,18.0)
main> SensorReading(設(shè)備7,1610036980127,14.9)
main> SensorReading(設(shè)備2,1610037069523,47.4)
main> SensorReading(設(shè)備4,1610037154507,59.5)
main> SensorReading(設(shè)備5,1610037235099,35.0)
high> (設(shè)備6,76.4)
main> SensorReading(設(shè)備2,1610037403367,10.0)
main> SensorReading(設(shè)備2,1610037484177,18.5)
high> (設(shè)備4,98.7)
high> (設(shè)備5,95.6)
main> SensorReading(設(shè)備6,1610037735520,32.6)
high> (設(shè)備6,83.3)
main> SensorReading(設(shè)備3,1610037913756,29.1)
high> (設(shè)備7,74.6)
main> SensorReading(設(shè)備6,1610038081606,22.2)
main> SensorReading(設(shè)備3,1610038163043,10.4)
main> SensorReading(設(shè)備5,1610038244717,56.9)
main> SensorReading(設(shè)備3,1610038326227,64.8)
main> SensorReading(設(shè)備4,1610038411053,65.0)
high> (設(shè)備8,93.2)
high> (設(shè)備8,76.2)
main> SensorReading(設(shè)備1,1610038670150,42.1)
main> SensorReading(設(shè)備5,1610038756839,35.1)
high> (設(shè)備3,75.9)
high> (設(shè)備3,83.4)
main> SensorReading(設(shè)備7,1610039019422,24.1)
high> (設(shè)備3,85.0)
main> SensorReading(設(shè)備8,1610039183077,45.6)
high> (設(shè)備3,79.5)
main> SensorReading(設(shè)備1,1610039351600,44.4)
high> (設(shè)備8,73.3)
high> (設(shè)備3,77.9)
low> (設(shè)備7,9.79)
main> SensorReading(設(shè)備4,1610039679144,19.0)
main> SensorReading(設(shè)備2,1610039761967,56.1)
high> (設(shè)備3,88.2)
high> (設(shè)備6,77.4)
main> SensorReading(設(shè)備7,1610040014212,14.4)
high> (設(shè)備4,98.2)
high> (設(shè)備8,85.0)
main> SensorReading(設(shè)備6,1610040265210,61.8)
main> SensorReading(設(shè)備2,1610040345769,48.0)
main> SensorReading(設(shè)備3,1610040432855,19.9)
main> SensorReading(設(shè)備4,1610040515943,30.9)
main> SensorReading(設(shè)備4,1610040601373,51.7)
main> SensorReading(設(shè)備1,1610040681803,29.7)
main> SensorReading(設(shè)備8,1610040770779,31.6)
main> SensorReading(設(shè)備3,1610040851986,67.1)
high> (設(shè)備4,93.2)
main> SensorReading(設(shè)備7,1610041022836,37.2)
high> (設(shè)備8,84.6)
main> SensorReading(設(shè)備6,1610041189301,19.2)
high> (設(shè)備4,99.0)
high> (設(shè)備4,77.0)
main> SensorReading(設(shè)備5,1610041435113,49.7)
high> (設(shè)備1,74.2)
main> SensorReading(設(shè)備8,1610041603035,42.2)
high> (設(shè)備3,87.1)
high> (設(shè)備1,82.7)
low> (設(shè)備3,0.59)
low> (設(shè)備4,7.38)
main> SensorReading(設(shè)備2,1610042016080,28.9)
high> (設(shè)備2,99.2)
main> SensorReading(設(shè)備2,1610042190222,42.2)
main> SensorReading(設(shè)備3,1610042277841,12.0)
high> (設(shè)備7,93.5)
main> SensorReading(設(shè)備7,1610042444652,10.5)
main> SensorReading(設(shè)備4,1610042530461,68.5)
high> (設(shè)備1,78.2)
main> SensorReading(設(shè)備3,1610042702219,18.5)
main> SensorReading(設(shè)備6,1610042787478,64.8)
low> (設(shè)備5,6.34)
main> SensorReading(設(shè)備2,1610042956073,65.6)
main> SensorReading(設(shè)備8,1610043038793,10.6)
main> SensorReading(設(shè)備8,1610043122971,30.3)
main> SensorReading(設(shè)備7,1610043203810,17.5)
high> (設(shè)備8,83.8)
main> SensorReading(設(shè)備5,1610043373188,30.5)
high> (設(shè)備2,84.7)
main> SensorReading(設(shè)備1,1610043545998,53.4)
high> (設(shè)備3,97.4)

Process finished with exit code 0

感謝各位的閱讀,以上就是“Flink的SideOutputSplit分流怎么實(shí)現(xiàn)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Flink的SideOutputSplit分流怎么實(shí)現(xiàn)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI