溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka

發(fā)布時(shí)間:2021-12-15 11:06:36 來(lái)源:億速云 閱讀:246 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka,文章內(nèi)容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1.Spark Streaming簡(jiǎn)介

Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時(shí)間間隔創(chuàng)建出來(lái)。在每個(gè)時(shí)間區(qū)間開(kāi)始的時(shí)候,一個(gè)新的批次就創(chuàng)建出來(lái),在該區(qū)間內(nèi)收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí),批次停止增長(zhǎng),時(shí)間區(qū)間的大小是由批次間隔這個(gè)參數(shù)決定的。批次間隔一般設(shè)在500毫秒到幾秒之間,由開(kāi)發(fā)者配置。每個(gè)輸入批次都形成一個(gè)RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng),Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個(gè) RDD 序列,每個(gè)RDD代表數(shù)據(jù)流中一個(gè)時(shí)間片內(nèi)的數(shù)據(jù)。另外加入了窗口操作和狀態(tài)轉(zhuǎn)化,其他和批次處理類(lèi)似。

與StructedStreaming的區(qū)別

StructedStreaming誕生于2.x后,主要用于處理結(jié)構(gòu)化數(shù)據(jù),除了實(shí)現(xiàn)與Spark Streaming的批處理,還實(shí)現(xiàn)了long-running的task,主要理解為處理的時(shí)機(jī)可以是數(shù)據(jù)的生產(chǎn)時(shí)間,而非收到數(shù)據(jù)的時(shí)間,可以細(xì)看下表:

流處理模式SparkStreamingStructed Streaming
執(zhí)行模式Micro BatchMicro batch / Streaming
APIDstream/streamingContextDataset/DataFrame,SparkSession
Job 生成方式Timer定時(shí)器定時(shí)生成jobTrigger觸發(fā)
支持?jǐn)?shù)據(jù)源Socket,filstream,kafka,zeroMq,flume,kinesisSocket,filstream,kafka,ratesource
executed-basedExecuted based on dstream apiExecuted based on sparksql
Time basedProcessing TimeProcessingTime & eventTIme
UIBuilt-inNo



對(duì)于流處理,現(xiàn)在生產(chǎn)環(huán)境下使用Flink較多,數(shù)據(jù)源方式,現(xiàn)在基本是以kafka為主,所以本文對(duì)Spark Streaming的場(chǎng)景即ETL流處理結(jié)構(gòu)化日志,將結(jié)果輸入Kafka隊(duì)列

2.Spark Sreaming的運(yùn)行流程

1、客戶端提交Spark Streaming作業(yè)后啟動(dòng)Driver,Driver啟動(dòng)Receiver,Receiver接收數(shù)據(jù)源的數(shù)據(jù)

2、每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行task,SparkStreaming至少包含一個(gè)receiver task(一般情況下)

3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver,然后備份到另外一個(gè) Executor 上

4、ReceiverTracker維護(hù) Reciver 匯報(bào)的BlockId

5、Driver定時(shí)啟動(dòng)JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler

6、JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個(gè)stage包含一到多個(gè)Task,將TaskSet提交給TaskSchedule

7、TaskScheduler負(fù)責(zé)把 Task 調(diào)度到 Executor 上,并維護(hù) Task 的運(yùn)行狀態(tài)

常用數(shù)據(jù)源的讀取方式

常數(shù)據(jù)流:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

Socket:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

RDD隊(duì)列:

    val queue = new Queue[RDD[Int]]()
    val queueDStream: InputDStream[Int] = ssc.queueStream(queue)

文件夾:

    val lines: DStream[String] = ssc.textFileStream("data/log/")

3.案例說(shuō)明

生產(chǎn)上,常用流程如下,批處理原始Kafka日志,比如請(qǐng)求打點(diǎn)日志等,使用Spark Streaming來(lái)將數(shù)據(jù)清洗轉(zhuǎn)變?yōu)橐欢ǜ袷皆賹?dǎo)入Kafka中,為了保證exact-once,會(huì)將offer自己來(lái)保存,主要保存在redis-offset中

數(shù)據(jù)地址:鏈接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell

3.1 原始Kafka日志

sample.log格式如下:

大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka

我們將它先放到文件里,模擬生產(chǎn)環(huán)境下xx.log

3.2 創(chuàng)建兩個(gè)topic,并創(chuàng)建KafkaProducer來(lái)嫁給你數(shù)據(jù)寫(xiě)入mytopic1

一個(gè)用來(lái)放原始的日志數(shù)據(jù),一個(gè)用來(lái)放處理過(guò)后的日志

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1

啟動(dòng)redis服務(wù):

./redis-server redis.conf

查看mytopic1數(shù)據(jù)

kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning

3.3 代碼實(shí)現(xiàn)

第一部分,處理原始文件數(shù)據(jù)寫(xiě)入mytopic1

package com.hoult.Streaming.work

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FilerToKafka {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 定義 kafka producer參數(shù)
    val lines: RDD[String] = sc.textFile("data/sample.log")

    // 定義 kafka producer參數(shù)
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    // 將讀取到的數(shù)據(jù)發(fā)送到mytopic1
    lines.foreachPartition{iter =>
      // KafkaProducer
      val producer = new KafkaProducer[String, String](prop)
      iter.foreach{line =>
        val record = new ProducerRecord[String, String]("mytopic1", line)
        producer.send(record)
      }
      producer.close()
    }
  }
}

第二部分,streaming讀取mytopic1的數(shù)據(jù),寫(xiě)入mytopic2

package com.hoult.Streaming.work

import java.util.Properties

import com.hoult.Streaming.kafka.OffsetsWithRedisUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 * 每秒處理Kafka數(shù)據(jù),生成結(jié)構(gòu)化數(shù)據(jù),輸入另外一個(gè)Kafka topic
 */
object KafkaStreamingETL {
  val log = Logger.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 需要消費(fèi)的topic
    val topics: Array[String] = Array("mytopic1")
    val groupid = "mygroup1"
    // 定義kafka相關(guān)參數(shù)
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
    // 從Redis獲取offset
    val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)

    // 創(chuàng)建DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      // 從kafka中讀取數(shù)據(jù)
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
    )

    // 轉(zhuǎn)換后的數(shù)據(jù)發(fā)送到另一個(gè)topic
    dstream.foreachRDD{rdd =>
      if (!rdd.isEmpty) {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition(process)
        // 將offset保存到Redis
        OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
      }
    }

    // 啟動(dòng)作業(yè)
    ssc.start()
    ssc.awaitTermination()
  }

  def process(iter: Iterator[ConsumerRecord[String, String]]) = {
    iter.map(line => parse(line.value))
      .filter(!_.isEmpty)
//      .foreach(println)
      .foreach(line =>sendMsg2Topic(line, "mytopic2"))
  }

  def parse(text: String): String = {
    try{
      val arr = text.replace("<<<!>>>", "").split(",")
      if (arr.length != 15) return ""
      arr.mkString("|")
    } catch {
      case e: Exception =>
        log.error("解析數(shù)據(jù)出錯(cuò)!", e)
        ""
    }
  }

  def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupid,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
  }

  def getKafkaProducerParameters(): Properties = {
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop
  }

  def sendMsg2Topic(msg: String, topic: String): Unit = {
    val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
    val record = new ProducerRecord[String, String](topic, msg)
    producer.send(record)
  }
}

第三部分,從redis中讀寫(xiě)offset的工具

package com.hoult.Streaming.kafka

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.collection.mutable

object OffsetsWithRedisUtils {
  // 定義Redis參數(shù)
  private val redisHost = "linux121"
  private val redisPort = 6379

  // 獲取Redis的連接
  private val config = new JedisPoolConfig
  // 最大空閑數(shù)
  config.setMaxIdle(5)
  // 最大連接數(shù)
  config.setMaxTotal(10)

  private val pool = new JedisPool(config, redisHost, redisPort, 10000)
  private def getRedisConnection: Jedis = pool.getResource

  private val topicPrefix = "kafka:topic"

  // Key:kafka:topic:TopicName:groupid
  private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"

  // 根據(jù) key 獲取offsets
  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection

    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
      val key = getKey(topic, groupId)

      import scala.collection.JavaConverters._

      jedis.hgetAll(key)
        .asScala
        .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
    }

    // 歸還資源
    jedis.close()

    offsets.flatten.toMap
  }

  // 將offsets保存到Redis中
  def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
    // 獲取連接
    val jedis: Jedis = getRedisConnection

    // 組織數(shù)據(jù)
    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
        .groupBy(_._1)
      .foreach{case (topic, buffer) =>
        val key: String = getKey(topic, groupId)

        import scala.collection.JavaConverters._
        val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava

        // 保存數(shù)據(jù)
        jedis.hmset(key, maps)
      }

    jedis.close()
  }

  def main(args: Array[String]): Unit = {
    val topics = Array("mytopic1")
    val groupid = "mygroup1"
    val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid)
    x.foreach(println)
  }
}

3.4 演示

  • 啟動(dòng)redis ./redis-server ./redis.conf

  • 啟動(dòng)kafka并創(chuàng)建topic sh scripts/kafka.sh start 3.2 創(chuàng)建兩個(gè)topic,并創(chuàng)建KafkaProducer來(lái)嫁給你數(shù)據(jù)寫(xiě)入mytopic1

  • 啟動(dòng)FilerToKafka 和 KafkaStreamingETL

大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka 大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka

4.spark-streamin注意事項(xiàng)

spark-streaming讀文件讀不到的問(wèn)題 ,讀取本地文件時(shí)候,要注意,它不會(huì)讀取原本就存在于該文件里的文本,只會(huì)讀取在監(jiān)聽(tīng)期間,傳進(jìn)文件夾里的數(shù)據(jù),而且本文本還有要求,必須是它組后一次更改并且保存的操作,是在監(jiān)聽(tīng)開(kāi)始的那一刻 之后的,其實(shí)意思就是,如果要向被監(jiān)聽(tīng)的文件夾里傳一個(gè)文本,你就要在監(jiān)聽(tīng)開(kāi)始之后,先打開(kāi)這個(gè)文本,隨便輸入幾個(gè)空格,或者回車(chē),或者其他不影響文本內(nèi)容的操作,然后保存,最后再傳進(jìn)文件夾里,這樣它才能 檢測(cè)到這個(gè)被傳進(jìn)來(lái)的文本。

上述就是小編為大家分享的大數(shù)據(jù)開(kāi)發(fā)中Spark Streaming處理數(shù)據(jù)及寫(xiě)入Kafka了,如果剛好有類(lèi)似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI