溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何解析spark-streaming中的socketTextStream

發(fā)布時間:2021-12-17 13:56:07 來源:億速云 閱讀:170 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了如何解析spark-streaming中的socketTextStream,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

package hgs.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
object SocketStreamingTest {
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf();
    conf.setMaster("local[2]").setAppName("SocketStreaming")
    val context = new SparkContext(conf);
    //要添加spark-streaming的依賴包,spark的Seconds
    val streamContext  = new StreamingContext(context,Seconds(5));
    val ds = streamContext.socketTextStream("192.168.6.129", 8888, StorageLevel.MEMORY_ONLY);
    streamContext.checkpoint("d:\\chekpoint")
    //val ds2 = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y)=>(x+y))//這種方式只是對該批次數(shù)據(jù)進行處理,并沒有累計上一個批次
    
    
    //updateFunc: (Iterator[(K, Seq[V], Option[S])]) K:單詞, Seq[V]該批次單詞出現(xiàn)次數(shù)列表,Option:上一次計算的結(jié)果
    val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
      //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
      //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
      iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
    }
    val partitionner = new HashPartitioner(2)
    //通過updateStatByKey來進行累加
    val ds2 = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, partitionner, true)
    //打印
    ds2.print()
    streamContext.start()
    streamContext.awaitTermination()
  }
}

上述內(nèi)容就是如何解析spark-streaming中的socketTextStream,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI