您好,登錄后才能下訂單哦!
本期內(nèi)容:
Direct Access
Kafka
前面有幾期我們講了帶Receiver的Spark Streaming 應(yīng)用的相關(guān)源碼解讀。但是現(xiàn)在開發(fā)Spark Streaming的應(yīng)用越來越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的優(yōu)勢(shì):
1. 更強(qiáng)的控制自由度
2. 語義一致性
其實(shí)No Receivers的方式更符合我們讀取數(shù)據(jù),操作數(shù)據(jù)的思路的。因?yàn)镾park 本身是一個(gè)計(jì)算框架,他底層會(huì)有數(shù)據(jù)來源,如果沒有Receivers,我們直接操作數(shù)據(jù)來源,這其實(shí)是一種更自然的方式。 如果要操作數(shù)據(jù)來源,肯定要有一個(gè)封裝器,這個(gè)封裝器一定是RDD類型。 以直接訪問Kafka中的數(shù)據(jù)為例:
object DirectKafkaWordCount { def main(args: Array[String]) { val Array(brokers, topics) = args // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() } }
Spark Streaming會(huì)封裝一個(gè)KafkaRDD:
/** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka <a to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */private[kafka]class KafkaRDD[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag, R: ClassTag] private[spark] ( sc: SparkContext, kafkaParams: Map[String, String], val offsetRanges: Array[OffsetRange], leaders: Map[TopicAndPartition, (String, Int)], messageHandler: MessageAndMetadata[K, V] => R ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray } ... override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { val part = thePart.asInstanceOf[KafkaRDDPartition] assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { new KafkaRDDIterator(part, context) } }
RDD中重要的方法 getPartitions 和 compute 其中compute中返回了一個(gè) KafkaRDDIterator:
private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] { val kc = new KafkaCluster(kafkaParams) ... private def fetchBatch: Iterator[MessageAndOffset] = { val req = new FetchRequestBuilder() .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) .build() val resp = consumer.fetch(req) handleFetchErr(resp) // kafka may return a batch that starts before the requested offset resp.messageSet(part.topic, part.partition) .iterator .dropWhile(_.offset < requestOffset) } override def close(): Unit = { if (consumer != null) { consumer.close() } } override def getNext(): R = { if (iter == null || !iter.hasNext) { iter = fetchBatch } if (!iter.hasNext) { assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) finished = true null.asInstanceOf[R] } else { val item = iter.next() if (item.offset >= part.untilOffset) { assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part)) finished = true null.asInstanceOf[R] } else { requestOffset = item.nextOffset messageHandler(new MessageAndMetadata( part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } } } }
其中會(huì)調(diào)用KafkaCluster的connect方法:
org/apache/spark/streaming/kafka/KafkaCluster.scala def connect(host: String, port: Int): SimpleConsumer = new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId)
KafkaCluster的connect方法返回了一個(gè) SimpleConsumer,如果想自定義控制kafka消息的消費(fèi),則可自定義Kafka的consumer。
我們?cè)倩剡^頭看看:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)
實(shí)際生成了什么:
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R] ssc, kafkaParams, fromOffsets, cleanedHandler) }
生成了一個(gè)DirectKafkaInputDStream:
org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker. val offsetRanges = currentOffsets.map { case (tp, fo) => val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) } val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }
這里面即產(chǎn)生了KafkaRDD實(shí)例。
我們?cè)僦匦滤伎加蠷eceiver和No Receiver的Spark Streaming應(yīng)用 Direct訪問的好處:
1. 不需要緩存,不會(huì)出現(xiàn)OOM等問題(數(shù)據(jù)緩存在Kafka中)
2. 如果采用Receiver的方式,Receiver和Worker上Executor綁定了,不方便做分布式(配置一下也可以做)。如果采用Direct的方式,直接是RDD操作,數(shù)據(jù)默認(rèn)分布在多個(gè)Executor上,天然就是分布式的。
3. 數(shù)據(jù)消費(fèi)的問題,在實(shí)際操作的時(shí)候,如果采用Receiver的方式,如果數(shù)據(jù)操作來不及消費(fèi),Delay多次之后,Spark Streaming程序有可能崩潰。如果是Direct的方式,就不會(huì)。
4. 完全的語義一致性,不會(huì)重復(fù)消費(fèi),且只被消費(fèi)一次。
備注:
1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。