溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

第4課:Spark Streaming的Exactly-One的事務(wù)處理

發(fā)布時(shí)間:2020-04-04 03:52:51 來(lái)源:網(wǎng)絡(luò) 閱讀:1694 作者:lqding1980 欄目:大數(shù)據(jù)

  Spark Streaming的事務(wù)處理和關(guān)系型數(shù)據(jù)庫(kù)的事務(wù)的概念有所不同,關(guān)系型數(shù)據(jù)庫(kù)事務(wù)關(guān)注的是語(yǔ)句級(jí)別的一致性,例如銀行轉(zhuǎn)賬。而Spark Streaming的事務(wù)關(guān)注的是某次job執(zhí)行的一致性。也就是如何保證Job在處理數(shù)據(jù)的過(guò)程中做到如下兩點(diǎn):

  • 不丟失數(shù)據(jù)

  • 不重復(fù)處理數(shù)據(jù)


SparkStreaming程序執(zhí)行架構(gòu)大致如下:

第4課:Spark Streaming的Exactly-One的事務(wù)處理


一、我們先來(lái)說(shuō)說(shuō)丟失數(shù)據(jù)的情況:

  1. Receiver接收到數(shù)據(jù)后,首先會(huì)在Executor級(jí)別上保存數(shù)據(jù)(根據(jù)StorageLevel的設(shè)置),例如socketTextStream的Receiver。在內(nèi)存和磁盤(pán)上保留2份副本數(shù)據(jù)

def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

如果StorageLevel設(shè)置的是只進(jìn)行內(nèi)存級(jí)別的存儲(chǔ),那么當(dāng)程序崩潰后,即便對(duì)Driver進(jìn)行了Checkpoint,然后重新啟動(dòng)程序。該部分?jǐn)?shù)據(jù)也會(huì)丟失。因?yàn)镈river的Checkpoint并不對(duì)計(jì)算數(shù)據(jù)進(jìn)行保存。

我們假設(shè)StorageLevel設(shè)置了磁盤(pán)級(jí)別的存儲(chǔ),也不能完全保證數(shù)據(jù)不被丟失,因?yàn)镽eceiver并不是接收一條數(shù)據(jù)寫(xiě)一次磁盤(pán),而是按照數(shù)據(jù)塊為單位寫(xiě)數(shù)據(jù)。然后將數(shù)據(jù)塊的元數(shù)據(jù)信息發(fā)送給Driver,Driver的Checkpoint記錄的數(shù)Block的元數(shù)據(jù)信息。當(dāng)數(shù)據(jù)塊寫(xiě)到一半的時(shí)候,或者是元數(shù)據(jù)還沒(méi)有發(fā)送給Driver的時(shí)候,Executor崩潰了,數(shù)據(jù)也就丟失啦。

解決方案:為了減少這種情況的發(fā)送,可以在Receiver端引入WAL寫(xiě)機(jī)制,因?yàn)閃AL寫(xiě)的頻率要比數(shù)據(jù)塊的頻率高的多。這樣,當(dāng)Executor恢復(fù)的時(shí)候,可以讀取WAL日志恢復(fù)數(shù)據(jù)塊。

但是通過(guò)WAL方式會(huì)極大的損傷Spark Streaming中Receivers接受數(shù)據(jù)的性能;


WAL也不能完全的解決數(shù)據(jù)丟失的問(wèn)題,就像Oracle一樣,日志文件的寫(xiě),也是先寫(xiě)到內(nèi)存中,然后根據(jù)一定的觸發(fā)條件再將數(shù)據(jù)寫(xiě)到磁盤(pán)。如果還沒(méi)有來(lái)的及寫(xiě)WAL日志,此時(shí)數(shù)據(jù)也會(huì)有不一致的情況(數(shù)據(jù)已經(jīng)接收,但是還沒(méi)有寫(xiě)到WAL的這部分?jǐn)?shù)據(jù)是恢復(fù)不出來(lái)的。)。


Spark Streaming 1.3的時(shí)候?yàn)榱吮苊釽AL的性能損失和實(shí)現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲(chǔ)系統(tǒng)?。。〈藭r(shí)兼具有流的優(yōu)勢(shì)和文件系統(tǒng)的優(yōu)勢(shì),至此,Spark Streaming+Kafka就構(gòu)建了完美的流處理世界?。。∷械腅xecutors通過(guò)Kafka API直接消費(fèi)數(shù)據(jù),直接管理Offset,所以也不會(huì)重復(fù)消費(fèi)數(shù)據(jù);事務(wù)實(shí)現(xiàn)啦?。?!



2. Driver崩潰,此時(shí)Job正在處理的數(shù)據(jù),包括Receiver已經(jīng)接收到還未被處理的數(shù)據(jù)將全部丟失。

解決方案:對(duì)Driver進(jìn)行Checkpoint,此處的Checkpoint和RDD的Checkpoint并不一樣。

我們看看Checkpoint都包含哪些屬性:

private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
  val sparkConfPairs = ssc.conf.getAll

其中g(shù)raph是DStreamGraph的實(shí)例化,它里面包含了InputDStream

private val inputStreams = new ArrayBuffer[InputDStream[_]]()

我們以DirectKafkaInputDStream為例,其中包含了checkpointData

protected[streaming] override val checkpointData =
  new DirectKafkaInputDStreamCheckpointData

其中只是包含:

class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
  def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
    data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
  }

就是每個(gè)batch 的唯一標(biāo)識(shí) time 對(duì)象,以及每個(gè)KafkaRDD對(duì)應(yīng)的的Kafka偏移信息。

所以:

  checkpoint 是非常高效的。沒(méi)有涉及到實(shí)際數(shù)據(jù)的存儲(chǔ)。一般大小只有幾十K,因?yàn)橹淮媪薑afka的偏移量等信息。

  checkpoint 采用的是序列化機(jī)制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函數(shù)應(yīng)該也會(huì)被序列化。如果采用了CheckPoint機(jī)制,而你的程序包做了做了變更,恢復(fù)后可能會(huì)有一定的問(wèn)題。



二、關(guān)于數(shù)據(jù)重復(fù)處理涉及兩個(gè)方面:

  1. 數(shù)據(jù)被重復(fù)讀取:在使用Kafka的情況下,Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒(méi)有來(lái)得及進(jìn)行updateOffsets,此時(shí)Receiver崩潰后重新啟動(dòng)就會(huì)通過(guò)管理Kafka的ZooKeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時(shí)SparkStreaming認(rèn)為是成功的,但是Kafka認(rèn)為是失敗的(因?yàn)闆](méi)有更新offset到ZooKeeper中),此時(shí)就會(huì)導(dǎo)致數(shù)據(jù)重新消費(fèi)的情況。

  2. 數(shù)據(jù)輸出多次重寫(xiě)

    為什么會(huì)有這個(gè)問(wèn)題,因?yàn)镾park Streaming在計(jì)算的時(shí)候基于Spark Core,Spark Core天生會(huì)做以下事情導(dǎo)致Spark Streaming的部分結(jié)果重復(fù)輸出(例如數(shù)據(jù)輸出后,該Task的后續(xù)程序發(fā)生錯(cuò)誤,而任務(wù)發(fā)生錯(cuò)誤,Spark Core會(huì)進(jìn)入如下程序):

    Task重試;慢任務(wù)推測(cè)(兩個(gè)相同任務(wù)可能會(huì)同時(shí)執(zhí)行),Stage重復(fù);Job重試;

  具體解決方案:

設(shè)置spark.task.maxFailures次數(shù)為1;

設(shè)置spark.speculation為關(guān)閉狀態(tài)(因?yàn)槁蝿?wù)推測(cè)其實(shí)非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能)

Spark Streaming on Kafka的話,Job失敗后可以設(shè)置auto.offset.reset為“l(fā)argest”的方式;


Exactly Once的事務(wù)處理必須滿足:

  1. Receiver數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來(lái)源和可靠的Receiver,且通過(guò)WAL來(lái)保證數(shù)據(jù)安全。

  2. 整個(gè)應(yīng)用程序的metadata必須進(jìn)行checkpoint;


最后再次強(qiáng)調(diào)可以通過(guò)transform和foreachRDD基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來(lái)實(shí)現(xiàn)數(shù)據(jù)不重復(fù)消費(fèi)和輸出不重復(fù)!這兩個(gè)方式類(lèi)似于Spark Streaming的后門(mén),可以做任意想象的控制操作!


備注:

1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark 
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains




向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI