您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka,文章內(nèi)容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時(shí)間間隔創(chuàng)建出來(lái)。在每個(gè)時(shí)間區(qū)間開(kāi)始的時(shí)候,一個(gè)新的批次就創(chuàng)建出來(lái),在該區(qū)間內(nèi)收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí),批次停止增長(zhǎng),時(shí)間區(qū)間的大小是由批次間隔這個(gè)參數(shù)決定的。批次間隔一般設(shè)在500毫秒到幾秒之間,由開(kāi)發(fā)者配置。每個(gè)輸入批次都形成一個(gè)RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng),Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個(gè) RDD 序列,每個(gè)RDD代表數(shù)據(jù)流中一個(gè)時(shí)間片內(nèi)的數(shù)據(jù)。另外加入了窗口操作和狀態(tài)轉(zhuǎn)化,其他和批次處理類(lèi)似。
與StructedStreaming的區(qū)別
StructedStreaming誕生于2.x后,主要用于處理結(jié)構(gòu)化數(shù)據(jù),除了實(shí)現(xiàn)與Spark Streaming的批處理,還實(shí)現(xiàn)了long-running的task,主要理解為處理的時(shí)機(jī)可以是數(shù)據(jù)的生產(chǎn)時(shí)間,而非收到數(shù)據(jù)的時(shí)間,可以細(xì)看下表:
流處理模式 | SparkStreaming | Structed Streaming |
---|---|---|
執(zhí)行模式 | Micro Batch | Micro batch / Streaming |
API | Dstream/streamingContext | Dataset/DataFrame,SparkSession |
Job 生成方式 | Timer定時(shí)器定時(shí)生成job | Trigger觸發(fā) |
支持?jǐn)?shù)據(jù)源 | Socket,filstream,kafka,zeroMq,flume,kinesis | Socket,filstream,kafka,ratesource |
executed-based | Executed based on dstream api | Executed based on sparksql |
Time based | Processing Time | ProcessingTime & eventTIme |
UI | Built-in | No |
對(duì)于流處理,現(xiàn)在生產(chǎn)環(huán)境下使用Flink較多,數(shù)據(jù)源方式,現(xiàn)在基本是以kafka為主,所以本文對(duì)Spark Streaming的場(chǎng)景即ETL流處理結(jié)構(gòu)化日志,將結(jié)果輸入Kafka隊(duì)列
1、客戶端提交Spark Streaming作業(yè)后啟動(dòng)Driver,Driver啟動(dòng)Receiver,Receiver接收數(shù)據(jù)源的數(shù)據(jù)
2、每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行task,SparkStreaming至少包含一個(gè)receiver task(一般情況下)
3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver,然后備份到另外一個(gè) Executor 上
4、ReceiverTracker維護(hù) Reciver 匯報(bào)的BlockId
5、Driver定時(shí)啟動(dòng)JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler
6、JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個(gè)stage包含一到多個(gè)Task,將TaskSet提交給TaskSchedule
7、TaskScheduler負(fù)責(zé)把 Task 調(diào)度到 Executor 上,并維護(hù) Task 的運(yùn)行狀態(tài)
常用數(shù)據(jù)源的讀取方式
常數(shù)據(jù)流:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray) val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
Socket:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray) val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
RDD隊(duì)列:
val queue = new Queue[RDD[Int]]() val queueDStream: InputDStream[Int] = ssc.queueStream(queue)
文件夾:
val lines: DStream[String] = ssc.textFileStream("data/log/")
生產(chǎn)上,常用流程如下,批處理原始Kafka日志,比如請(qǐng)求打點(diǎn)日志等,使用Spark Streaming來(lái)將數(shù)據(jù)清洗轉(zhuǎn)變?yōu)橐欢ǜ袷皆賹?dǎo)入Kafka中,為了保證exact-once,會(huì)將offer自己來(lái)保存,主要保存在redis-offset中
數(shù)據(jù)地址:鏈接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell
sample.log格式如下:
我們將它先放到文件里,模擬生產(chǎn)環(huán)境下xx.log
一個(gè)用來(lái)放原始的日志數(shù)據(jù),一個(gè)用來(lái)放處理過(guò)后的日志
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1
啟動(dòng)redis服務(wù):
./redis-server redis.conf
查看mytopic1數(shù)據(jù)
kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning
第一部分,處理原始文件數(shù)據(jù)寫(xiě)入mytopic1
package com.hoult.Streaming.work import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FilerToKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) // 定義 kafka producer參數(shù) val lines: RDD[String] = sc.textFile("data/sample.log") // 定義 kafka producer參數(shù) val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) // 將讀取到的數(shù)據(jù)發(fā)送到mytopic1 lines.foreachPartition{iter => // KafkaProducer val producer = new KafkaProducer[String, String](prop) iter.foreach{line => val record = new ProducerRecord[String, String]("mytopic1", line) producer.send(record) } producer.close() } } }
第二部分,streaming讀取mytopic1的數(shù)據(jù),寫(xiě)入mytopic2
package com.hoult.Streaming.work import java.util.Properties import com.hoult.Streaming.kafka.OffsetsWithRedisUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 每秒處理Kafka數(shù)據(jù),生成結(jié)構(gòu)化數(shù)據(jù),輸入另外一個(gè)Kafka topic */ object KafkaStreamingETL { val log = Logger.getLogger(this.getClass) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) // 需要消費(fèi)的topic val topics: Array[String] = Array("mytopic1") val groupid = "mygroup1" // 定義kafka相關(guān)參數(shù) val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid) // 從Redis獲取offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid) // 創(chuàng)建DStream val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, // 從kafka中讀取數(shù)據(jù) ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets) ) // 轉(zhuǎn)換后的數(shù)據(jù)發(fā)送到另一個(gè)topic dstream.foreachRDD{rdd => if (!rdd.isEmpty) { val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(process) // 將offset保存到Redis OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid) } } // 啟動(dòng)作業(yè) ssc.start() ssc.awaitTermination() } def process(iter: Iterator[ConsumerRecord[String, String]]) = { iter.map(line => parse(line.value)) .filter(!_.isEmpty) // .foreach(println) .foreach(line =>sendMsg2Topic(line, "mytopic2")) } def parse(text: String): String = { try{ val arr = text.replace("<<<!>>>", "").split(",") if (arr.length != 15) return "" arr.mkString("|") } catch { case e: Exception => log.error("解析數(shù)據(jù)出錯(cuò)!", e) "" } } def getKafkaConsumerParameters(groupid: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) } def getKafkaProducerParameters(): Properties = { val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop } def sendMsg2Topic(msg: String, topic: String): Unit = { val producer = new KafkaProducer[String, String](getKafkaProducerParameters()) val record = new ProducerRecord[String, String](topic, msg) producer.send(record) } }
第三部分,從redis中讀寫(xiě)offset的工具
package com.hoult.Streaming.kafka import java.util import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} import scala.collection.mutable object OffsetsWithRedisUtils { // 定義Redis參數(shù) private val redisHost = "linux121" private val redisPort = 6379 // 獲取Redis的連接 private val config = new JedisPoolConfig // 最大空閑數(shù) config.setMaxIdle(5) // 最大連接數(shù) config.setMaxTotal(10) private val pool = new JedisPool(config, redisHost, redisPort, 10000) private def getRedisConnection: Jedis = pool.getResource private val topicPrefix = "kafka:topic" // Key:kafka:topic:TopicName:groupid private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid" // 根據(jù) key 獲取offsets def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = { val jedis: Jedis = getRedisConnection val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => val key = getKey(topic, groupId) import scala.collection.JavaConverters._ jedis.hgetAll(key) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } } // 歸還資源 jedis.close() offsets.flatten.toMap } // 將offsets保存到Redis中 def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = { // 獲取連接 val jedis: Jedis = getRedisConnection // 組織數(shù)據(jù) offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy(_._1) .foreach{case (topic, buffer) => val key: String = getKey(topic, groupId) import scala.collection.JavaConverters._ val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava // 保存數(shù)據(jù) jedis.hmset(key, maps) } jedis.close() } def main(args: Array[String]): Unit = { val topics = Array("mytopic1") val groupid = "mygroup1" val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid) x.foreach(println) } }
啟動(dòng)redis ./redis-server ./redis.conf
啟動(dòng)kafka并創(chuàng)建topic sh scripts/kafka.sh start
3.2 創(chuàng)建兩個(gè)topic,并創(chuàng)建KafkaProducer來(lái)嫁給你數(shù)據(jù)寫(xiě)入mytopic1
啟動(dòng)FilerToKafka 和 KafkaStreamingETL
spark-streaming讀文件讀不到的問(wèn)題 ,讀取本地文件時(shí)候,要注意,它不會(huì)讀取原本就存在于該文件里的文本,只會(huì)讀取在監(jiān)聽(tīng)期間,傳進(jìn)文件夾里的數(shù)據(jù),而且本文本還有要求,必須是它組后一次更改并且保存的操作,是在監(jiān)聽(tīng)開(kāi)始的那一刻 之后的,其實(shí)意思就是,如果要向被監(jiān)聽(tīng)的文件夾里傳一個(gè)文本,你就要在監(jiān)聽(tīng)開(kāi)始之后,先打開(kāi)這個(gè)文本,隨便輸入幾個(gè)空格,或者回車(chē),或者其他不影響文本內(nèi)容的操作,然后保存,最后再傳進(jìn)文件夾里,這樣它才能 檢測(cè)到這個(gè)被傳進(jìn)來(lái)的文本。
上述就是小編為大家分享的大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka了,如果剛好有類(lèi)似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。