您好,登錄后才能下訂單哦!
這篇文章主要介紹“Executor容錯安全性實例分析”,在日常操作中,相信很多人在Executor容錯安全性實例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Executor容錯安全性實例分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
sparkstreaming會不斷的接收數(shù)據(jù)、不斷的產生job、不斷的提交job。所以有一個至關重要的問題就是數(shù)據(jù)安全性。由于sparkstreaming是基于sparkcore的,如果我們可以確保數(shù)據(jù)安全可靠的話(sparkstreaming生產job的時候里面是基于RDD),即使運行的時候出現(xiàn)錯誤或者故障,也可以基于RDD的容錯的能力自動進行恢復。所以要確保數(shù)據(jù)的安全性。
對于executor的安全容錯主要是數(shù)據(jù)的安全容錯。Executor計算時候的安全容錯是借助spark core的RDD的,所以天然是安全的。
數(shù)據(jù)安全性的一種方式是存儲一份副本,另一種方式是不做副本,但是數(shù)據(jù)源支持重放(也就是可以反復的讀取數(shù)據(jù)源的數(shù)據(jù)),如果之前讀取的數(shù)據(jù)出現(xiàn)問題,可以重新讀取數(shù)據(jù)。
做副本的方式可以借助blockmanager做備份。Blockmanager存儲數(shù)據(jù)的時候有很多storagelevel,Receiver接收數(shù)據(jù)后,存儲的時候指定storagelevel為MEMORY_AND_DISK_SER_2的方式。Blockmanager早存儲的時候會先考慮memory,只有memory不夠的時候才會考慮disk,一般memory都是夠的。所以至少兩個executor上都會有數(shù)據(jù),假設一個executor掛掉,就會馬上切換到另一個executor。
ReceiverSupervisorImpl在存儲數(shù)據(jù)的時候會有兩種方式,一種是WAL的方式,究竟是不是WAL得方式是通過配置修改的。默認是false。如果用WAL的方式必須有checkpoint的目錄,因為WAL的數(shù)據(jù)是放在checkpoint的目錄之下的。
def enableReceiverLog(conf: SparkConf): Boolean = {
conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}
Storagelevel是在構建inputDstream的時候傳入的,默認就是MEMORY_AND_DISK_SER_2。
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
現(xiàn)在來看ReceiverSupervisorImpl在存儲數(shù)據(jù)的另一種方式(副本方式)。注釋中說的很清楚,根據(jù)指定的storagelevel把接收的blocks交給blockmanager。也就是通過blockmanager來存儲。
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
Blockmanager存儲的時候會分為多種不同的數(shù)據(jù)類型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。
Blockmanager存儲數(shù)據(jù)前面已經講過了。Receiver在接收到數(shù)據(jù)后除了在自己這個executor上面存儲,還會在另外一個executor上存儲。如果一個executor出現(xiàn)問題會瞬間切換到另一個executor。
WAL的方式原理:在具體的目錄下會做一份日志,假設后續(xù)處理的過程中出了問題,可以基于日志恢復,日志是寫在checkpoint下。在生產環(huán)境下checkpoint是在HDFS上,這樣日志就會有三份副本。
下面就是用WAL存儲數(shù)據(jù)的類,先寫日志再交給blockmanager存儲。
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
如果采用WAL的方式,存儲數(shù)據(jù)的時候就不需要有兩份副本,這樣太浪費內存,如果storagelevel.replication大于1就會打印警告日志。
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}
這里采用兩條線程的線程池,使得blockmanager存儲數(shù)據(jù)和write ahead log可以并發(fā)的執(zhí)行。
// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
這個是把日志寫入WAL中
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
負責讀寫WAL的是WriteAheadLog,這是一個抽象類,負責寫入、讀取、清除數(shù)據(jù)的功能。在寫入數(shù)據(jù)后會返回一個句柄,以供讀取數(shù)據(jù)使用。
看一下具體寫入數(shù)據(jù)的實現(xiàn)。如果失敗并且失敗次數(shù)小于最大的失敗次數(shù)就會重試。確實是返回了一個句柄。
/**
* Write a byte buffer to the log file. This method synchronously writes the data in the
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
var fileSegment: FileBasedWriteAheadLogSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
fileSegment = getLogWriter(time).write(byteBuffer)
if (closeFileAfterWrite) {
resetWriter()
}
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to write to write ahead log")
resetWriter()
failures += 1
}
}
if (fileSegment == null) {
logError(s"Failed to write to write ahead log after $failures failures")
throw lastException
}
fileSegment
}
下面就是把數(shù)據(jù)寫入HDFS的代碼
/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
assertOpen()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
// If the buffer is not backed by an array, we transfer using temp array
// Note that despite the extra array copy, this should be faster than byte-by-byte copy
while (data.hasRemaining) {
val array = new Array[Byte](data.remaining)
data.get(array)
stream.write(array)
}
}
flush()
nextOffset = stream.getPos()
segment
}
不管是WAL還是直接交給blockmanager都是采用副本的方式。還有一種是數(shù)據(jù)源支持數(shù)據(jù)存放,典型的就是kafka。Kafka已經成為了數(shù)據(jù)存儲系統(tǒng),它天然具有容錯和數(shù)據(jù)副本。
Kafka有receiver和direct的方式。Receiver的方式其實是交給zookeper來管理matadata的(偏移量offset),如果數(shù)據(jù)處理失敗后,kafka會基于offset重新讀取數(shù)據(jù)。為什么可以重新讀???如果程序崩潰或者數(shù)據(jù)沒處理完是不會給zookeper發(fā)ack。Zookeper就認為這個數(shù)據(jù)沒有被消費。實際生產環(huán)境下越來越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。這就可以保證有且只有一次的容錯處理。DirectKafkaInputDstream,它會去看最新的offset,并把這個內容放入batch中。
獲取最新的offset,通過最新的offset減去上一個offset就可以確定讀哪些數(shù)據(jù),也就是一個batch中的數(shù)據(jù)。
@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}
容錯的弊端就是消耗性能,占用時間。也不是所有情況都不能容忍數(shù)據(jù)丟失。有些情況下可以不進行容錯來提高性能。
假如一次處理1000個block,但是有1個block出錯,就需要把1000個block進行重新讀取或者恢復,這也有性能問題。
到此,關于“Executor容錯安全性實例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。