溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark性能優(yōu)化用foreachPartition還是與foreach

發(fā)布時(shí)間:2021-12-09 16:15:36 來(lái)源:億速云 閱讀:163 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“Spark性能優(yōu)化用foreachPartition還是與foreach”,在日常操作中,相信很多人在Spark性能優(yōu)化用foreachPartition還是與foreach問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Spark性能優(yōu)化用foreachPartition還是與foreach”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

首先,我們對(duì)比一下foreachPartition和foreach兩個(gè)方法的實(shí)現(xiàn),有什么不同的地方:

  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }

2個(gè)方法,參數(shù)都是一個(gè)函數(shù)文本,不同的是foreach當(dāng)中,函數(shù)文本希望的參數(shù)是T,也就是RDD當(dāng)中的元素類(lèi)型;foreachPartition當(dāng)中,函數(shù)文本希望的參數(shù)是Iterator[T],也就是一個(gè)partition。

而在內(nèi)部實(shí)現(xiàn)上,其實(shí)是大同小異的。對(duì)于foreachPartition而言,直接在各個(gè)partition上運(yùn)行傳入的函數(shù)文本;而對(duì)于foreach而言,是把傳入的函數(shù)文本,交給各個(gè)partition的foreach去執(zhí)行。
我們查看一些spark性能優(yōu)化指南,會(huì)提到用foreachPartition替代foreach,有助于性能的提高。那么我們要怎樣來(lái)理解這句話呢?看看下面這段代碼:

rdd.foreach { x => {

    val dbClient = new DBClient

    dbClient.ins(x)

}}

在上面這段代碼當(dāng)中,針對(duì)RDD當(dāng)中的每一條數(shù)據(jù),都會(huì)new一個(gè)db client,這樣的效率,顯然是無(wú)比底下的。正確的寫(xiě)法應(yīng)該是這個(gè)樣子的:

rdd.foreachPartition { part => {

    val dbClient = new DBClient

    part.foreach{ x => {

        dbClient.ins(x)

    }}

}}
那么這種寫(xiě)法究竟好在哪里,還是要從spark的核心概念開(kāi)始講起,我們都知道spark是一個(gè)分布式的實(shí)時(shí)計(jì)算系統(tǒng),而RDD是分布式計(jì)算的基礎(chǔ),而partition分區(qū)又是這個(gè)當(dāng)中的關(guān)鍵,比如我們搭建一個(gè)3*4core的spark集群,對(duì)于一個(gè)大任務(wù)而言,我們往往是希望有12個(gè)線程一起來(lái)完成這個(gè)任務(wù),用下面的代碼來(lái)構(gòu)建rdd就能夠達(dá)到我們的目的:

val rdd = sc.textFile("hdfs://master:9000/woozoom/mavlink1.log", 12)

注意紅色字體的部分,代表著構(gòu)建出來(lái)的rdd的分區(qū)數(shù)量。之后,rdd.foreachPartition,spark集群會(huì)把12個(gè)分區(qū)分別交給12個(gè)線程來(lái)分別進(jìn)行處理。結(jié)合上面的代碼,dbClient 會(huì)在每個(gè)線程當(dāng)中分別構(gòu)建,會(huì)有12個(gè)db client被構(gòu)建。

那么有沒(méi)有另一種可能性,我們只構(gòu)建一個(gè)db client,12個(gè)線程都用這一個(gè)db client來(lái)執(zhí)行數(shù)據(jù)庫(kù)操作,像下面這樣:

val dbClient = new DBClient

rdd.foreach { x => {   

    dbClient.ins(x)

}}

要這么寫(xiě),需要有2個(gè)前提:1、dbClient 是線程安全的,2、dbClient 實(shí)現(xiàn)了java的序列化接口。而在很多情況下,例如在對(duì)hbase進(jìn)行訪問(wèn)的時(shí)候,這兩個(gè)條件都是不滿足的。

到此,關(guān)于“Spark性能優(yōu)化用foreachPartition還是與foreach”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI