溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

flink的Transformation數(shù)據(jù)處理方法是什么

發(fā)布時(shí)間:2021-12-31 15:25:51 來(lái)源:億速云 閱讀:103 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“flink的Transformation數(shù)據(jù)處理方法是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“flink的Transformation數(shù)據(jù)處理方法是什么”吧!

Transformation 數(shù)據(jù)處理

  • 將一個(gè)或多個(gè)DataStream生成新的DataStream的過(guò)程被稱為Transformation。轉(zhuǎn)換過(guò)程中,每種操作類型被定義為不同的Operator,F(xiàn)link能將多個(gè)Transformation組合為一個(gè)DataFlow的拓?fù)洹?/p>

  • 所以DataStream的轉(zhuǎn)換操作可以分為SingleDataStream、MultiDataStream、物理分區(qū)三個(gè)類型。

    • SingleDataStream:?jiǎn)蝹€(gè)DataStream的處理邏輯。

    • MultiDataStream:多個(gè)DataStream的處理邏輯。

    • 物理分區(qū):對(duì)數(shù)據(jù)集中的并行度和數(shù)據(jù)分區(qū)調(diào)整轉(zhuǎn)換的處理邏輯。

SingleDataStream

Map

  • 常用作對(duì)數(shù)據(jù)集內(nèi)數(shù)據(jù)的清晰和轉(zhuǎn)換。如將輸入數(shù)據(jù)的每個(gè)數(shù)值全部加1,并將數(shù)據(jù)輸出到下游。

val dataStream = evn.formElements(("a",3),("d",4),("c",4),("c",5),("a",5))
//方法一
val mapStream:DataStream[(String,Int)] = dataStream.map(t => (t._1,t._2+1))
//方法二
val mapStream:DataStream[(String,Int)] = dataStream.map( new MapFunction[(String,Int),(String, Int)]{
  override def map(t: (String,Int)): (String,Int) ={
    (t._1, t._2+1)
  }
})

FlatMap

  • 主要應(yīng)用于處理輸入一個(gè)元素轉(zhuǎn)換為多個(gè)元素場(chǎng)景,如WordCount,將沒行文本數(shù)據(jù)分割,生成單詞序列。

val dataStream:DataStream[String] = environment.fromCollections()
val resultStream[String] =dataStream.flatMap{str => str.split(" ")}

Filter

  • 按條件對(duì)輸入數(shù)據(jù)集進(jìn)行篩選,輸出符合條件的數(shù)據(jù)。

//通配符
val filter:DataStream[Int] = dataStream.filter{ _ %2 == 0}
//運(yùn)算表達(dá)式
val filter:DataStream[Int] = dataStream.filter { x => x % 2 ==0}

KeyBy

  • 根據(jù)指定的key對(duì)輸入的數(shù)據(jù)集執(zhí)行Partition操作,將相同的key值的數(shù)據(jù)放置到相同的區(qū)域中。

  • 將下標(biāo)為1相同的數(shù)據(jù)放到一個(gè)分區(qū)

val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一個(gè)字段為分區(qū)key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)

Reduce

  • 與MapReduce中reduce原理基本一致,將輸入的KeyedStream通過(guò)傳入用戶自定義的ReduceFunction滾動(dòng)進(jìn)行數(shù)據(jù)聚合處理,定義的ReduceFunction必須滿足運(yùn)算結(jié)合律和交換律。

val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//指定第一個(gè)字段為分區(qū)key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//實(shí)現(xiàn)一:滾動(dòng)第二個(gè)字段進(jìn)行reduce相加求和
val reduceStream = keyedStream.reduce{(t1,t2) => (t1._1, t1._2+t2._2)}
//實(shí)現(xiàn)二:實(shí)現(xiàn)ReduceFunction
val reduceStream1 = keyedStream.reduce(new ReduceFunction[(String, Int)] {
  override def reduce(t1: (String,Int), t2:(String,Int)):(String, int) = {
    (t1._1, t1._2+ t2._2)
  }
})
  • 運(yùn)行結(jié)果為:(c,2)(c,7)(a,3)(d,4)(a,8),結(jié)果不是最后求和的值,是將每條記錄累加后的結(jié)果輸出。

Aggregations

  • DataStream提供的聚合算子,根據(jù)指定的字段進(jìn)行聚合操作,滾動(dòng)產(chǎn)生一系列數(shù)據(jù)聚合結(jié)果。實(shí)際是將Reduce算子中函數(shù)進(jìn)行封裝,封裝的聚合操作有sum、min、minBy、max、maxBy等。這樣就不需要用戶自己定義Reduce函數(shù)。

val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一個(gè)字段為分區(qū)key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//對(duì)第二個(gè)字段進(jìn)行sum統(tǒng)計(jì)
val sumStream: DataStream[(Int,Int)] = keyedStream.sum(1)
//輸出統(tǒng)計(jì)結(jié)果
sumStream.print()
  • 聚合函數(shù)中傳入?yún)?shù)必須數(shù)值型,否則會(huì)拋出異常。

//統(tǒng)計(jì)計(jì)算指定key最小值
val minStream: DataStream[(Int,Int)] = keyedStream.min(1)
//統(tǒng)計(jì)計(jì)算指定key最大值
val maxStream: DataStream[(Int,Int)] = keyedStream.max(1)
//統(tǒng)計(jì)計(jì)算指定key最小值,返回最小值對(duì)應(yīng)元素
val minByStream: DataStream[(Int,Int)] = keyedStream.minBy(1)
//統(tǒng)計(jì)計(jì)算指定key最大值,返回最大值對(duì)應(yīng)元素
val maxByStream: DataStream[(Int,Int)] = keyedStream.maxBy(1)

MultiDataStream

Unio

  • 將兩個(gè)或多個(gè)輸入的數(shù)據(jù)集合并為一個(gè)數(shù)據(jù)集,需要保證輸入待合并數(shù)據(jù)集和輸出數(shù)據(jù)集格式一致。

//創(chuàng)建不同數(shù)據(jù)集
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [(String ,Int)]= env.fromElements(("d",1),("s",2),("a",4),("e",5),("a",6))
val dataStream3: DataStream [(String ,Int)]= env.fromElements(("a",2),("d",1),("s",2),("c",3),("b",1))
//合并兩個(gè)數(shù)據(jù)集
val unionStream = dataStream1.union(dataStream2)
//合并多個(gè)數(shù)據(jù)集
val allUnionStream = dataStream1.union(dataStream2,dataStream3)

Connect,CoMap,CoflatMap

  • 該算子為了合并兩種或多種不同類型的數(shù)據(jù)集,合并后會(huì)保留原始數(shù)據(jù)集的數(shù)類型。連接操作允許共享狀態(tài)數(shù)據(jù),也就是說(shuō)在多個(gè)數(shù)據(jù)集之間可以操作和查看對(duì)方數(shù)據(jù)集的狀態(tài)。

  • 實(shí)例:dataStream1數(shù)據(jù)集為(String,Int)元祖類型,dataStream2數(shù)據(jù)集為Int類型,通過(guò)connect連接將兩種類型數(shù)據(jù)結(jié)合在一起,形成格式為ConnectedStream是的數(shù)據(jù)集,其內(nèi)部數(shù)據(jù)為[(String,Int),Int]的混合數(shù)據(jù)類型,保留兩個(gè)數(shù)據(jù)集的數(shù)據(jù)類型。

val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [Int]= env.fromElements(1,2,4,5,6)
//連接兩個(gè)數(shù)據(jù)集
val connectedStream :ConnectedStreams[(String, Int), Int] = dataStream1.connect(dataStream2)
  • 注意:ConnectedStreams類型的數(shù)據(jù)集不能進(jìn)行類似Print()操作,需轉(zhuǎn)換為DataStream類型數(shù)據(jù)集。

  • ConnectedStreams提供map()和flatMap()需要定義CoMapFunction或CoFlatMapFunction分別處理輸入的DataStream數(shù)據(jù)集,或直接傳入MapFunction來(lái)分別處理兩個(gè)數(shù)據(jù)集。

  • map()實(shí)例如下:

val resultStream = connectedStream.map(new CoMapFunction[(String,Int),Int,(Int, String)]{
  //定義第一個(gè)數(shù)據(jù)集函數(shù)處理邏輯,輸入值為第一個(gè)DataStream
  override def map1(in1: (String,Int)): (Int ,String) = {
    (int1._2 , in1._1)
  }
  //定義第二個(gè)數(shù)據(jù)集函數(shù)處理邏輯
  override def amp2(in2: Int):(Int,String) = {
    (int2,"default")
  }

})
  • 以上實(shí)例中,兩個(gè)函數(shù)會(huì)多線程交替執(zhí)行產(chǎn)生結(jié)果,最后根據(jù)定義生成目標(biāo)數(shù)據(jù)集。

  • flatMap()方法中指定CoFlatMapFunction。兩個(gè)函數(shù)共享number變量,代碼如下:

val resultStream2 = connectedStream.flatMap(new CoFlatMapFunction[(String,Int), Int ,(String ,Int , Int)]{
  //定義共享變量
  var number=0
  //定義第一個(gè)數(shù)據(jù)集處理函數(shù)
  override def flatMap1(in1:(String ,Int ), collector : Collector[(String,Int ,Int)]): Unit = {
    collector.collect((in1._1,in1._2,number))
  }
  //定義第二個(gè)數(shù)據(jù)集處理函數(shù)
  override def flatMap2(in2: Int, collector : Collector[(String , Int ,Int)]):Unit = {
    number=in2
  }
})
  • 如果想通過(guò)指定的條件對(duì)兩個(gè)數(shù)據(jù)集進(jìn)行關(guān)聯(lián),可以借助keyBy韓碩或broadcast廣播變量實(shí)現(xiàn)。keyBy會(huì)將相同key的數(shù)據(jù)路由在同一個(gè)Operator中。broadcast會(huì)在執(zhí)行計(jì)算邏輯前,將DataStream2數(shù)據(jù)集廣播到所有并行計(jì)算的Operator中,再根據(jù)條件對(duì)數(shù)據(jù)集進(jìn)行關(guān)聯(lián)。這兩種方式本質(zhì)是分布式j(luò)oin算子的基本實(shí)現(xiàn)方式。

//通過(guò)keyby函數(shù)根據(jù)指定的key連接兩個(gè)數(shù)據(jù)集
val keyedConnect: ConnectedStreams[(String ,Int ), Int] = dataStream1.connect(dataStream2).keyBy(1,0)
//通過(guò)broadcast關(guān)聯(lián)兩個(gè)數(shù)據(jù)集
val broadcastConnect: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect(dataStream2.broadcast())

split

  • 將一個(gè)DataStream數(shù)據(jù)集按條件進(jìn)行拆分,形成兩個(gè)數(shù)據(jù)集的過(guò)程,union的逆向操作。實(shí)例:如調(diào)用split函數(shù),指定條件判斷,根據(jù)第二個(gè)字段的奇偶性將數(shù)據(jù)集標(biāo)記出來(lái),偶數(shù)標(biāo)記為event,奇數(shù)標(biāo)記為odd,再通過(guò)集合將標(biāo)記返回,最終生成SplitStream數(shù)據(jù)集。

//創(chuàng)建數(shù)據(jù)集
val DataStream1: DataStream[(String, Int)] = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//合并連個(gè)DataStream數(shù)據(jù)集
val splitedStream : SplitStream[(String,Int)] = dataStream1.split(t => if(t._2 % 2 ==0 ) Seq("even") else Seq("odd"))

Select

  • split函數(shù)只是標(biāo)記數(shù)據(jù),沒有拆分?jǐn)?shù)據(jù),因此需要select函數(shù)根據(jù)標(biāo)記將數(shù)據(jù)切分為不同數(shù)據(jù)集。

//篩選出偶數(shù)數(shù)據(jù)集
val evenStream: DataStream[(String,Int)] = splitedStream.select("even")
//篩選出奇數(shù)數(shù)據(jù)集
val oddStream: DataStream[(String,Int)] = splitedStream.select("odd")
//篩選出偶數(shù)和奇數(shù)數(shù)據(jù)集
val allStream: DataStream[(String,Int)] = splitedStream.select("even","odd")

Iterate

  • Iterate適合于迭代計(jì)算,通過(guò)每一次的迭代計(jì)算,并將計(jì)算結(jié)果反饋到下一次迭代計(jì)算中。

//創(chuàng)建數(shù)據(jù)集,map處理為對(duì)數(shù)據(jù)分區(qū)根據(jù)默認(rèn)并行度進(jìn)行平衡
val DataStream = env.fromElements(3,1,2,1,5).map{ t:Int => t}

val iterated = dataStream.iterate((input: ConnectedStreams[Int , String]) => {
  //定義兩個(gè)map處理數(shù)據(jù)集,第一個(gè)map反饋操作,第二個(gè)map將數(shù)據(jù)輸出到下游
  val head= input.map(i => (i+1).toString, s => s) (head.filter( _ == "2"), head.filter (_ != "2"))
},1000)  //超過(guò)1000ms沒有數(shù)據(jù)接入終止迭代

物理分區(qū)

  • 根據(jù)指定的分區(qū)策略將數(shù)據(jù)重新分發(fā)到不同節(jié)點(diǎn)的Task實(shí)例上執(zhí)行,以此優(yōu)化DataStream自身API對(duì)數(shù)據(jù)的分區(qū)控制。

隨機(jī)分區(qū)(Random Partitioning)

  • 隨機(jī)將數(shù)據(jù)集中數(shù)據(jù)分配到下游算子的每個(gè)分區(qū)中,優(yōu)點(diǎn)數(shù)據(jù)相對(duì)均衡,缺點(diǎn)失去原有數(shù)據(jù)的分區(qū)結(jié)構(gòu)

val shuffleStream=dataStream.shuffle

平衡分區(qū)(Roundrobin Partitioning)

  • 循環(huán)將數(shù)據(jù)集中數(shù)據(jù)進(jìn)行重分區(qū),能盡可能保證每個(gè)分區(qū)的數(shù)據(jù)平衡,可有效解決數(shù)據(jù)集的傾斜問(wèn)題。

val shuffleStream= dataStream.rebalance();

Rescaling partitioning

  • 一種通過(guò)循環(huán)方式進(jìn)行數(shù)據(jù)重平衡的分區(qū)策略,與Roundrobin Partitioning不同,它僅會(huì)對(duì)上下游繼承的算子數(shù)據(jù)進(jìn)行重新平衡,具體主要根據(jù)上下游算子的并行度決定。如上游算子的并發(fā)度為2,下游算子的并發(fā)度為4,上游算子中第一個(gè)分區(qū)數(shù)據(jù)按照同等比例將數(shù)據(jù)路由在下游的固定兩個(gè)分區(qū)中,另一個(gè)分區(qū)也是一樣。

//通過(guò)調(diào)用DataStream API中rescale()方法實(shí)現(xiàn)Rescaling Partitioning操作
val shuffleStream = dataStream.rescale();

廣播操作

  • 將輸入的數(shù)據(jù)集復(fù)制到下游算子的并行的Tasks實(shí)例中,下游算子Tasks可直接從本地內(nèi)存中獲取廣播數(shù)據(jù)集,不再依賴網(wǎng)絡(luò)傳輸。

  • 這種分區(qū)策略適合于小集群,如大數(shù)據(jù)集關(guān)聯(lián)小數(shù)據(jù)集時(shí),可通過(guò)廣播方式將小數(shù)據(jù)分發(fā)到算子的分區(qū)中。

//通過(guò)DataStream API的broadcast() 方法實(shí)現(xiàn)廣播分區(qū)
val shuffleStream= dataStream.broadcast()

自定義分區(qū)

  • 實(shí)現(xiàn)自定義分區(qū)器,調(diào)用DataStream API上的partitionCustom()方法將創(chuàng)建的分區(qū)器應(yīng)用到數(shù)據(jù)集上。

  • 如下,自定義分區(qū)器實(shí)現(xiàn)將字段中包含flink關(guān)鍵字的數(shù)據(jù)放在partition為0的分區(qū)中,其余數(shù)據(jù)執(zhí)行隨機(jī)分區(qū)策略,其中num Partitions是從系統(tǒng)中獲取的并行度參數(shù)。

Object customPartitioner extends Partitioner[String]{
  //獲取隨機(jī)數(shù)生成器
  val r=scala.util.Random
  override def partition(key: String, numPartitions: Int): Int ={
    //定義分區(qū)策略,key中如果包含a則放入0分區(qū)中,其他情況則根據(jù)Partitions num隨機(jī)分區(qū)
    if(key.contains("flink")) 0 else r.nextInt(numPartitions)
  }
}
  • 完成自定義分區(qū)器,調(diào)用DataStream API的partitionCustom應(yīng)用分區(qū)器,第二個(gè)參數(shù)指定分區(qū)器使用到的字段,對(duì)于Tuple類型數(shù)據(jù),分區(qū)字段可以通過(guò)字段名稱指定,其他類型數(shù)據(jù)集則通過(guò)位置索引指定。

//通過(guò)數(shù)據(jù)集字段名稱指定分區(qū)字段
dataStream.partitionCustom(customPartitioner,"filed_name");
//通過(guò)數(shù)據(jù)集字段索引指定分區(qū)字段
dataStream.partitionCustom(customPartitioner,0)

到此,相信大家對(duì)“flink的Transformation數(shù)據(jù)處理方法是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI