溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Executor容錯安全性實例分析

發(fā)布時間:2021-12-16 16:32:37 來源:億速云 閱讀:127 作者:iii 欄目:云計算

這篇文章主要介紹“Executor容錯安全性實例分析”,在日常操作中,相信很多人在Executor容錯安全性實例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Executor容錯安全性實例分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

sparkstreaming會不斷的接收數(shù)據(jù)、不斷的產生job、不斷的提交job。所以有一個至關重要的問題就是數(shù)據(jù)安全性。由于sparkstreaming是基于sparkcore的,如果我們可以確保數(shù)據(jù)安全可靠的話(sparkstreaming生產job的時候里面是基于RDD),即使運行的時候出現(xiàn)錯誤或者故障,也可以基于RDD的容錯的能力自動進行恢復。所以要確保數(shù)據(jù)的安全性。

對于executor的安全容錯主要是數(shù)據(jù)的安全容錯。Executor計算時候的安全容錯是借助spark core的RDD的,所以天然是安全的。

數(shù)據(jù)安全性的一種方式是存儲一份副本,另一種方式是不做副本,但是數(shù)據(jù)源支持重放(也就是可以反復的讀取數(shù)據(jù)源的數(shù)據(jù)),如果之前讀取的數(shù)據(jù)出現(xiàn)問題,可以重新讀取數(shù)據(jù)。

做副本的方式可以借助blockmanager做備份。Blockmanager存儲數(shù)據(jù)的時候有很多storagelevel,Receiver接收數(shù)據(jù)后,存儲的時候指定storagelevel為MEMORY_AND_DISK_SER_2的方式。Blockmanager早存儲的時候會先考慮memory,只有memory不夠的時候才會考慮disk,一般memory都是夠的。所以至少兩個executor上都會有數(shù)據(jù),假設一個executor掛掉,就會馬上切換到另一個executor。

ReceiverSupervisorImpl在存儲數(shù)據(jù)的時候會有兩種方式,一種是WAL的方式,究竟是不是WAL得方式是通過配置修改的。默認是false。如果用WAL的方式必須有checkpoint的目錄,因為WAL的數(shù)據(jù)是放在checkpoint的目錄之下的。

def enableReceiverLog(conf: SparkConf): Boolean = {
  conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}

Storagelevel是在構建inputDstream的時候傳入的,默認就是MEMORY_AND_DISK_SER_2。

* @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */

def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

現(xiàn)在來看ReceiverSupervisorImpl在存儲數(shù)據(jù)的另一種方式(副本方式)。注釋中說的很清楚,根據(jù)指定的storagelevel把接收的blocks交給blockmanager。也就是通過blockmanager來存儲。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */

private[streaming] class BlockManagerBasedBlockHandler(
    blockManager: BlockManager, storageLevel: StorageLevel)

Blockmanager存儲的時候會分為多種不同的數(shù)據(jù)類型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。

Blockmanager存儲數(shù)據(jù)前面已經講過了。Receiver在接收到數(shù)據(jù)后除了在自己這個executor上面存儲,還會在另外一個executor上存儲。如果一個executor出現(xiàn)問題會瞬間切換到另一個executor。

WAL的方式原理:在具體的目錄下會做一份日志,假設后續(xù)處理的過程中出了問題,可以基于日志恢復,日志是寫在checkpoint下。在生產環(huán)境下checkpoint是在HDFS上,這樣日志就會有三份副本。

下面就是用WAL存儲數(shù)據(jù)的類,先寫日志再交給blockmanager存儲。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks in both, a write ahead log and a block manager.
 */

private[streaming] class WriteAheadLogBasedBlockHandler(

如果采用WAL的方式,存儲數(shù)據(jù)的時候就不需要有兩份副本,這樣太浪費內存,如果storagelevel.replication大于1就會打印警告日志。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

這里采用兩條線程的線程池,使得blockmanager存儲數(shù)據(jù)和write ahead log可以并發(fā)的執(zhí)行。

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
  ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

這個是把日志寫入WAL中

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
  writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

負責讀寫WAL的是WriteAheadLog,這是一個抽象類,負責寫入、讀取、清除數(shù)據(jù)的功能。在寫入數(shù)據(jù)后會返回一個句柄,以供讀取數(shù)據(jù)使用。

看一下具體寫入數(shù)據(jù)的實現(xiàn)。如果失敗并且失敗次數(shù)小于最大的失敗次數(shù)就會重試。確實是返回了一個句柄。

/**
 * Write a byte buffer to the log file. This method synchronously writes the data in the
 * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
 * to HDFS, and will be available for readers to read.
 */

def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
  var fileSegment: FileBasedWriteAheadLogSegment = null
  var failures = 0
  var lastException: Exception = null
  var succeeded = false
  while (!succeeded && failures < maxFailures) {
    try {
      fileSegment = getLogWriter(time).write(byteBuffer)
      if (closeFileAfterWrite) {
        resetWriter()
      }
      succeeded = true
    catch {
      case ex: Exception =>
        lastException = ex
        logWarning("Failed to write to write ahead log")
        resetWriter()
        failures += 1
    }
  }
  if (fileSegment == null) {
    logError(s"Failed to write to write ahead log after $failures failures")
    throw lastException
  }
  fileSegment
}

下面就是把數(shù)據(jù)寫入HDFS的代碼

/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
  assertOpen()
  data.rewind() // Rewind to ensure all data in the buffer is retrieved
  val lengthToWrite = data.remaining()
  val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
  stream.writeInt(lengthToWrite)
  if (data.hasArray) {
    stream.write(data.array())
  } else {
    // If the buffer is not backed by an array, we transfer using temp array
    // Note that despite the extra array copy, this should be faster than byte-by-byte copy
    while (data.hasRemaining) {
      val array = new Array[Byte](data.remaining)
      data.get(array)
      stream.write(array)
    }
  }
  flush()
  nextOffset stream.getPos()
  segment
}

不管是WAL還是直接交給blockmanager都是采用副本的方式。還有一種是數(shù)據(jù)源支持數(shù)據(jù)存放,典型的就是kafka。Kafka已經成為了數(shù)據(jù)存儲系統(tǒng),它天然具有容錯和數(shù)據(jù)副本。

Kafka有receiver和direct的方式。Receiver的方式其實是交給zookeper來管理matadata的(偏移量offset),如果數(shù)據(jù)處理失敗后,kafka會基于offset重新讀取數(shù)據(jù)。為什么可以重新讀???如果程序崩潰或者數(shù)據(jù)沒處理完是不會給zookeper發(fā)ack。Zookeper就認為這個數(shù)據(jù)沒有被消費。實際生產環(huán)境下越來越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。這就可以保證有且只有一次的容錯處理。DirectKafkaInputDstream,它會去看最新的offset,并把這個內容放入batch中。

獲取最新的offset,通過最新的offset減去上一個offset就可以確定讀哪些數(shù)據(jù),也就是一個batch中的數(shù)據(jù)。

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
  // Either.fold would confuse @tailrec, do it manually
  if (o.isLeft) {
    val err = o.left.get.toString
    if (retries <= 0) {
      throw new SparkException(err)
    } else {
      log.error(err)
      Thread.sleep(kc.config.refreshLeaderBackoffMs)
      latestLeaderOffsets(retries - 1)
    }
  } else {
    o.right.get
  }
}

容錯的弊端就是消耗性能,占用時間。也不是所有情況都不能容忍數(shù)據(jù)丟失。有些情況下可以不進行容錯來提高性能。

假如一次處理1000個block,但是有1個block出錯,就需要把1000個block進行重新讀取或者恢復,這也有性能問題。

到此,關于“Executor容錯安全性實例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI