溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

coalesce與repartition怎么使用

發(fā)布時(shí)間:2021-12-09 16:52:27 來(lái)源:億速云 閱讀:161 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“coalesce與repartition怎么使用”,在日常操作中,相信很多人在coalesce與repartition怎么使用問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”coalesce與repartition怎么使用”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

一、功能介紹

coalesce算子最基本的功能就是返回一個(gè)numPartitions個(gè)partition的RDD。

二、使用及注意事項(xiàng)

這個(gè)算子的結(jié)果默認(rèn)是窄依賴,舉個(gè)例子

coalesce(100)

如果你想把1000個(gè)partition減少到100個(gè)partition,此時(shí)不會(huì)發(fā)生shuffle,而是每一個(gè)你設(shè)定的新partition都會(huì)替代原來(lái)的10個(gè)partition。如果初始的最大partition是100個(gè),而你想用coalesce(1000)把partition數(shù)增至1000,這是不行的。
現(xiàn)在有一個(gè)需求,需要將某一個(gè)文件做ETL,最后想輸出成一個(gè)文件,你會(huì)怎么辦呢?
這樣么?

val logs=sc.textFile(args(0),6)//你想初始化6個(gè)分區(qū),并行執(zhí)行,之后再合并成1個(gè)文件

logs.map(x=>{
      if(x.split("\t").length==72){
        val clean=parse(x)  //此處是進(jìn)行了ETL
        clean
      }
    }).coalesce(2).saveAsTextFile(args(1))

如果你同意的話,可以寫個(gè)demo測(cè)試一下,你會(huì)發(fā)現(xiàn),僅僅有一個(gè)task!在生產(chǎn)上這是絕對(duì)不行!因?yàn)樯鲜鯡TL的spark job僅僅有一個(gè)stage,你雖然初始化RDD是設(shè)定的6個(gè)partition,但是在action之前你使用了.coalesce(1),此時(shí)會(huì)優(yōu)先使用coalesce里面的partition數(shù)量初始化RDD,所以僅僅有一個(gè)task。生產(chǎn)中文件很大的話,你就只能用兩個(gè)節(jié)點(diǎn)處理,這樣無(wú)法發(fā)揮集群的優(yōu)勢(shì)了。解決:要在coalesce中加shuffle=tule

val logs=sc.textFile(args(0),6)

logs.map(x=>{
      if(x.split("\t").length==72){
        val clean=parse(x)  //此處是進(jìn)行了ETL
        clean
      }
    }).coalesce(2,shuffle = true).saveAsTextFile(args(1))

這樣,我們就會(huì)有兩個(gè)stage,stage1是6個(gè)并行高速ETL處理,stage2是通過(guò)shuffle合并成2個(gè)文件
如下圖
coalesce與repartition怎么使用
我們知道了,可以手動(dòng)設(shè)定shuffle的發(fā)生,那么問(wèn)題來(lái)了,剛剛我們不能將初始化的分區(qū)數(shù)變大,如果加上shuffle可不可以呢?答案是可以的~
如果出事RDD為100個(gè)分區(qū),你覺(jué)得并行度不夠,你可以coalesce(1000,shuffle = true),將分區(qū)數(shù)增加到1000(默認(rèn)hash partitioner進(jìn)行重新),當(dāng)然你也可以使用自定義分區(qū)器,但是一定要序列化。

三、總結(jié)

  1. coalesce算子默認(rèn)只能減少分區(qū)數(shù)量,但是可以通過(guò)開(kāi)啟shuffle增加分區(qū)數(shù)量

  2. coalesce的作用常常是減少分區(qū)數(shù),已達(dá)到輸出時(shí)合并小文件的效果。

  3. 在一個(gè)stage中,coalesce中設(shè)定的分區(qū)數(shù)是優(yōu)先級(jí)最高的,如果想增加并行度,并合并文件,那么請(qǐng)開(kāi)啟coalesce中的shuffle,這樣就會(huì)變成兩個(gè)stage。達(dá)到并行且合并的效果。

repartition

/**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   *
   * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

這個(gè)算子前后是一個(gè)寬依賴,字面就是重新分區(qū)的意思,與coalesce不同,repartition一定會(huì)將分區(qū)變成numPartitions個(gè)的!通過(guò)看源碼可知,它底層時(shí)調(diào)用的coalesce算子,并且使用該算子一定會(huì)shuffle。
coalesce與repartition怎么使用

到此,關(guān)于“coalesce與repartition怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI