您好,登錄后才能下訂單哦!
本篇文章為大家展示了如何解析spark-streaming中的socketTextStream,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
package hgs.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.storage.StorageLevel import org.apache.spark.HashPartitioner object SocketStreamingTest { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setMaster("local[2]").setAppName("SocketStreaming") val context = new SparkContext(conf); //要添加spark-streaming的依賴包,spark的Seconds val streamContext = new StreamingContext(context,Seconds(5)); val ds = streamContext.socketTextStream("192.168.6.129", 8888, StorageLevel.MEMORY_ONLY); streamContext.checkpoint("d:\\chekpoint") //val ds2 = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y)=>(x+y))//這種方式只是對該批次數(shù)據(jù)進行處理,并沒有累計上一個批次 //updateFunc: (Iterator[(K, Seq[V], Option[S])]) K:單詞, Seq[V]該批次單詞出現(xiàn)次數(shù)列表,Option:上一次計算的結(jié)果 val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } val partitionner = new HashPartitioner(2) //通過updateStatByKey來進行累加 val ds2 = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, partitionner, true) //打印 ds2.print() streamContext.start() streamContext.awaitTermination() } }
上述內(nèi)容就是如何解析spark-streaming中的socketTextStream,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。