溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

第4課:Spark Streaming的Exactly-Once的事務(wù)處理和不重復(fù)輸出徹底掌握

發(fā)布時間:2020-06-10 00:00:23 來源:網(wǎng)絡(luò) 閱讀:1288 作者:love205088 欄目:大數(shù)據(jù)

前置知識:

1、事務(wù)的特征:1)、處理且僅被處理一次;2)、輸出且只被輸出一次

2、SparkStreaming進行事務(wù)處理有沒有可能處理完全失???

   這個可能性不大,因為Spark是批處理的方式來進行流處理,在SparkStreaming應(yīng)用程序啟動的時候,已經(jīng)為應(yīng)用程序分配了相關(guān)的資源,而且在調(diào)度的過程中可以動態(tài)的分配資源,所以除非整個集群所有的硬件都奔潰了,否則一般情況下都會被處理的。

3、SparkStreaming寫程序的時候是基于Driver和Executor兩部分


SparkStreaming架構(gòu)流程:

1、SparkStreaming基本架構(gòu)流程:

   1)、Receiver(不斷的)接收到數(shù)據(jù)后匯報(把元數(shù)據(jù))給Driver,2)、Driver在收到數(shù)據(jù)之后為了數(shù)據(jù)的安全性會進行CheckPoint,3)、Job的執(zhí)行(在Executor中):完全基于SparkCore的調(diào)度模式

   SparkStreaming基本架構(gòu)流程圖:

第4課:Spark Streaming的Exactly-Once的事務(wù)處理和不重復(fù)輸出徹底掌握

WAL(write ahead log)的機制:寫數(shù)據(jù)的時候,先通過WAL機制寫入文件系統(tǒng)中,然后存儲到Executor,Executor在存儲到磁盤或者內(nèi)存中(這個是根據(jù)StorageLevel的設(shè)置) ,如果前面沒有寫成功的話,后面一定不會存儲到Executor中,而不存儲到Executor中的話,就不會匯報給Driver,數(shù)據(jù)就不會被處理了


Receiver接收的數(shù)據(jù)達到一定程度才會把數(shù)據(jù)存儲到內(nèi)存或者磁盤,當(dāng)還沒有積累到一定程度的時候,Executor或者Receiver奔潰了,這時數(shù)據(jù)就會丟失一點,


SparkStreaming:1、獲取數(shù)據(jù);2、產(chǎn)生作業(yè),執(zhí)行必須透過SparkContext


當(dāng)出現(xiàn)奔潰的時候數(shù)據(jù)恢復(fù)的過程:

1)、Driver級別的恢復(fù)是直接從Driver進行checkpoint的文件系統(tǒng)中把數(shù)據(jù)讀進來,而在內(nèi)部是重新啟動SparkContext(還有SparkContext),恢復(fù)出元數(shù)據(jù)再次產(chǎn)生RDD(恢復(fù)是基于上一次的job執(zhí)行的),提交給集群

2)、Receiver的恢復(fù)是在以前數(shù)據(jù)的基礎(chǔ)上接著去接收數(shù)據(jù),曾經(jīng)接收到的數(shù)據(jù)也會通過WAL機制從磁盤上恢復(fù)回來


Exactly Once的事務(wù)處理:

1)、數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的Receiver,且整個應(yīng)用程序的metadata必須進行CheckPoint,且通過WAL來保證數(shù)據(jù)安全;(我們以數(shù)據(jù)來自Kafka為例,運行在Executor上的Receiver在接收到來自Kafka的數(shù)據(jù)時會向Kafka發(fā)送ACK確認(rèn)收到信息并讀取下一條信息,kafka會updateOffset來記錄Receiver接收到的偏移,這種方式保證了在Executor數(shù)據(jù)零丟失。)

2)、Spark在1.3的時候為了避免WAL的性能損失和實現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統(tǒng)。此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢,至此Spark Streaming+Kafka就構(gòu)建了完美的流處理世界(1,數(shù)據(jù)不需要拷貝副本;2,不需要WAL對性能的損耗;3,Kafka使用ZeroCopy比HDFS更高效)。所有的Executors通過Kafka API直接消息數(shù)據(jù),直接管理Offset,所以也不會重復(fù)消費數(shù)據(jù)


數(shù)據(jù)丟失及其具體解決方式:

在Receiver收到數(shù)據(jù)且通過Driver的調(diào)度Executor開始計算數(shù)據(jù)的時候,如果Driver突然奔潰,則此時Executor會被kill掉,那么Executor中的數(shù)據(jù)就會丟失(如果沒有進行WAL的操作)。

解決方式:此時就必須通過例如WAL的方式,讓所有的數(shù)據(jù)都通過例如HDFS的方式首先進行安全性容錯處理。此時如果Executor中的數(shù)據(jù)丟失的話,就可以通過WAL恢復(fù)回來(這種方式的弊端是通過WAL的方式會極大額損傷SparkStreaming中Receivers接收數(shù)據(jù)的性能)


數(shù)據(jù)重復(fù)讀取的情況:

基于Kafka的情況下,Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒有來得及進行updateOffsets,此時Receiver崩潰后重新啟動就會通過管理Kafka的ZooKeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時SparkStreaming認(rèn)為是成功的,但是Kafka認(rèn)為是失敗的(因為沒有更新offset到ZooKeeper中),此時就會導(dǎo)致數(shù)據(jù)重新消費的情況。

解決方式:以Receiver基于ZooKeeper的方式,當(dāng)讀取數(shù)據(jù)時去訪問Kafka的元數(shù)據(jù)信息,在處理代碼中例如foreachRDD或transform時,將信息寫入到內(nèi)存數(shù)據(jù)庫中(memorySet),在計算時讀取內(nèi)存數(shù)據(jù)庫信息,判斷是否已處理過,如果以處理過則跳過計算。這些元數(shù)據(jù)信息可以保存到內(nèi)存數(shù)據(jù)結(jié)構(gòu)或者memsql,sqllite中(如果通過Kafka作為數(shù)據(jù)來源的話,Kafka中有數(shù)據(jù),然后Receiver接收的時候又會有數(shù)據(jù)副本,這個時候其實是存儲資源的浪費)


數(shù)據(jù)輸出多次重寫

為什么會有這個問題,因為Spark Streaming在計算的時候基于Spark Core,Spark Core天生會做以下事情導(dǎo)致Spark Streaming的部分結(jié)果重復(fù)輸出(例如數(shù)據(jù)輸出后,該Task的后續(xù)程序發(fā)生錯誤,而任務(wù)發(fā)生錯誤,Spark Core會進入如下程序):

  Task重試;慢任務(wù)推測(兩個相同任務(wù)可能會同時執(zhí)行),Stage重復(fù);Job重試;

解決方式:

設(shè)置spark.task.maxFailures次數(shù)為1;

設(shè)置spark.speculation為關(guān)閉狀態(tài)(因為慢任務(wù)推測其實非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能)

Spark Streaming on Kafka的話,Job失敗后可以設(shè)置auto.offset.reset為“l(fā)argest”的方式;



最后再次強調(diào)可以通過transform和foreachRDD基于業(yè)務(wù)邏輯代碼進行邏輯控制來實現(xiàn)數(shù)據(jù)不重復(fù)消費和輸出不重復(fù)!這兩個方式類似于Spark Streaming的后門,可以做任意想象的控制操作!

 

備注:

1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI