溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么應(yīng)對Spark-Redis行海量數(shù)據(jù)插入、查詢作業(yè)時碰到的問題

發(fā)布時間:2021-12-17 09:56:04 來源:億速云 閱讀:185 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)怎么應(yīng)對Spark-Redis行海量數(shù)據(jù)插入、查詢作業(yè)時碰到的問題,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

由于redis是基于內(nèi)存的數(shù)據(jù)庫,穩(wěn)定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis時也會碰到很多問題,尤其是執(zhí)行海量數(shù)據(jù)插入與查詢的場景中。

海量數(shù)據(jù)查詢

Redis是基于內(nèi)存讀取的數(shù)據(jù)庫,相比其它的數(shù)據(jù)庫,Redis的讀取速度會更快。但是當(dāng)我們要查詢上千萬條的海量數(shù)據(jù)時,即使是Redis也需要花費較長時間。這時候如果我們想要終止select作業(yè)的執(zhí)行,我們希望的是所有的running task立即killed。

Spark是有作業(yè)調(diào)度機制的。SparkContext是Spark的入口,相當(dāng)于應(yīng)用程序的main函數(shù)。SparkContext中的cancelJobGroup函數(shù)可以取消正在運行的job。

/**
  * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
  * for more information.
  */
 def cancelJobGroup(groupId: String) {
   assertNotStopped()
   dagScheduler.cancelJobGroup(groupId)
 }

按理說取消job之后,job下的所有task應(yīng)該也終止。而且當(dāng)我們?nèi)∠鹲elect作業(yè)時,executor會throw TaskKilledException,而這個時候負責(zé)task作業(yè)的TaskContext在捕獲到該異常之后,會執(zhí)行killTaskIfInterrupted。

 // If this task has been killed before we deserialized it, let's quit now. Otherwise,
 // continue executing the task.
 val killReason = reasonIfKilled
 if (killReason.isDefined) {
   // Throw an exception rather than returning, because returning within a try{} block
   // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
   // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
   // for the task.
   throw new TaskKilledException(killReason.get)
 }
/**
 * If the task is interrupted, throws TaskKilledException with the reason for the interrupt.
 */
 private[spark] def killTaskIfInterrupted(): Unit

但是Spark-Redis中還是會出現(xiàn)終止作業(yè)但是task仍然running。因為task的計算邏輯最終是在RedisRDD中實現(xiàn)的,RedisRDD的compute會從Jedis中取獲取keys。所以說要解決這個問題,應(yīng)該在RedisRDD中取消正在running的task。這里有兩種方法:

方法一:參考Spark的JDBCRDD,定義close(),結(jié)合InterruptibleIterator。

def close() {
   if (closed) return
   try {
     if (null != rs) {
       rs.close()
     }
   } catch {
     case e: Exception => logWarning("Exception closing resultset", e)
   }
   try {
     if (null != stmt) {
       stmt.close()
     }
   } catch {
     case e: Exception => logWarning("Exception closing statement", e)
   }
   try {
     if (null != conn) {
       if (!conn.isClosed && !conn.getAutoCommit) {
         try {
           conn.commit()
         } catch {
           case NonFatal(e) => logWarning("Exception committing transaction", e)
         }
       }
       conn.close()
     }
     logInfo("closed connection")
   } catch {
     case e: Exception => logWarning("Exception closing connection", e)
   }
   closed = true
 }
 
 context.addTaskCompletionListener{ context => close() } 
CompletionIterator[InternalRow, Iterator[InternalRow]](
   new InterruptibleIterator(context, rowsIterator), close())

方法二:異步線程執(zhí)行compute,主線程中判斷task isInterrupted

try{
   val thread = new Thread() {
     override def run(): Unit = {
       try {
          keys = doCall
       } catch {
         case e =>
           logWarning(s"execute http require failed.")
       }
       isRequestFinished = true
     }
   }
 
   // control the http request for quite if user interrupt the job
   thread.start()
   while (!context.isInterrupted() && !isRequestFinished) {
     Thread.sleep(GetKeysWaitInterval)
   }
   if (context.isInterrupted() && !isRequestFinished) {
     logInfo(s"try to kill task ${context.getKillReason()}")
     context.killTaskIfInterrupted()
   }
   thread.join()
   CompletionIterator[T, Iterator[T]](
     new InterruptibleIterator(context, keys), close)

我們可以異步線程來執(zhí)行compute,然后在另外的線程中判斷是否task isInterrupted,如果是的話就執(zhí)行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted無法殺掉task,再結(jié)合InterruptibleIterator:一種迭代器,以提供任務(wù)終止功能。通過檢查[TaskContext]中的中斷標(biāo)志來工作。

海量數(shù)據(jù)插入

我們都已經(jīng)redis的數(shù)據(jù)是保存在內(nèi)存中的。當(dāng)然Redis也支持持久化,可以將數(shù)據(jù)備份到硬盤中。當(dāng)插入海量數(shù)據(jù)時,如果Redis的內(nèi)存不夠的話,很顯然會丟失部分數(shù)據(jù)。這里讓使用者困惑的點在于: 當(dāng)Redis已使用內(nèi)存大于最大可用內(nèi)存時,Redis會報錯:command not allowed when used memory > ‘maxmemory’。但是當(dāng)insert job的數(shù)據(jù)大于Redis的可用內(nèi)存時,部分數(shù)據(jù)丟失了,并且還沒有任何報錯。

因為不管是Jedis客戶端還是Redis服務(wù)器,當(dāng)插入數(shù)據(jù)時內(nèi)存不夠,不會插入成功,但也不會返回任何response。所以目前能想到的解決辦法就是當(dāng)insert數(shù)據(jù)丟失時,擴大Redis內(nèi)存。

Spark-Redis是一個應(yīng)用還不是很廣泛的開源項目,不像Spark JDBC那樣已經(jīng)商業(yè)化。所以Spark-Redis還是存在很多問題。相信隨著commiter的努力,Spark-Redis也會越來越強大。

看完上述內(nèi)容,你們對怎么應(yīng)對Spark-Redis行海量數(shù)據(jù)插入、查詢作業(yè)時碰到的問題有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI