溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Spark分區(qū)并行度決定機(jī)制

發(fā)布時(shí)間:2021-08-31 14:47:09 來(lái)源:億速云 閱讀:193 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“Spark分區(qū)并行度決定機(jī)制”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Spark分區(qū)并行度決定機(jī)制”吧!

大家都知道Spark job中最小執(zhí)行單位為task,合理設(shè)置Spark job每個(gè)stage的task數(shù)是決定性能好壞的重要因素之一,但是Spark自己確定最佳并行度的能力有限,這就要求我們?cè)诹私馄渲袃?nèi)在機(jī)制的前提下,去各種測(cè)試、計(jì)算等來(lái)最終確定最佳參數(shù)配比。

Spark任務(wù)在執(zhí)行時(shí)會(huì)將RDD劃分為不同的stage,一個(gè)stage中task的數(shù)量跟最后一個(gè)RDD的分區(qū)數(shù)量相同。之前已經(jīng)介紹過(guò),stage劃分的關(guān)鍵是寬依賴(lài),而寬依賴(lài)往往伴隨著shuffle操作。對(duì)于一個(gè)stage接收另一個(gè)stage的輸入,這種操作通常都會(huì)有一個(gè)參數(shù)numPartitions來(lái)顯示指定分區(qū)數(shù)。最典型的就是一些ByKey算子,比如groupByKey(numPartitions: Int),但是這個(gè)分區(qū)數(shù)需要多次測(cè)試來(lái)確定合適的值。首先確定父RDD中的分區(qū)數(shù)(通過(guò)rdd.partitions().size()可以確定RDD的分區(qū)數(shù)),然后在此基礎(chǔ)上增加分區(qū)數(shù),多次調(diào)試直至在確定的資源任務(wù)能夠平穩(wěn)、安全的運(yùn)行。  
對(duì)于沒(méi)有父RDD的RDD,比如通過(guò)加載HDFS上的數(shù)據(jù)生成的RDD,它的分區(qū)數(shù)由InputFormat切分機(jī)制決定。通常就是一個(gè)HDFS block塊對(duì)應(yīng)一個(gè)分區(qū),對(duì)于不可切分文件則一個(gè)文件對(duì)應(yīng)一個(gè)分區(qū)。  

對(duì)于通過(guò)SparkContext的parallelize方法或者makeRDD生成的RDD分區(qū)數(shù)可以直接在方法中指定,如果未指定,則參考spark.default.parallelism的參數(shù)配置。下面是默認(rèn)情況下確定defaultParallelism的源碼:
override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

通常,RDD的分區(qū)數(shù)與其所依賴(lài)的RDD的分區(qū)數(shù)相同,除非shuffle。但有幾個(gè)特殊的算子:

1.coalesce和repartition算子

筆者先放兩張關(guān)于該coalesce算子分別在RDD和DataSet中的源碼圖:(DataSet是Spark SQL中的分布式數(shù)據(jù)集,后邊說(shuō)到Spark時(shí)再細(xì)講)

Spark分區(qū)并行度決定機(jī)制

Spark分區(qū)并行度決定機(jī)制

通過(guò)coalesce源碼分析,無(wú)論是在RDD中還是DataSet,默認(rèn)情況下coalesce不會(huì)產(chǎn)生shuffle,此時(shí)通過(guò)coalesce創(chuàng)建的RDD分區(qū)數(shù)小于等于父RDD的分區(qū)數(shù)。 

筆者這里就不放repartition算子的源碼了,分析起來(lái)也比較簡(jiǎn)單,圖中我有所提示。但筆者建議,如下兩種情況,請(qǐng)使用repartition算子:

1)增加分區(qū)數(shù)
repartition觸發(fā)shuffle,shuffle的情況下可以增加分區(qū)數(shù)。

coalesce默認(rèn)不觸發(fā)shuffle,即使調(diào)用該算子增加分區(qū)數(shù),實(shí)際情況是分區(qū)數(shù)仍然是當(dāng)前的分區(qū)數(shù)。

2)極端情況減少分區(qū)數(shù),比如將分區(qū)數(shù)減少為1
調(diào)整分區(qū)數(shù)為1,此時(shí)數(shù)據(jù)處理上游stage并行度降,很影響性能。此時(shí)repartition的優(yōu)勢(shì)即不改變?cè)瓉?lái)stage的并行度就體現(xiàn)出來(lái)了,在大數(shù)據(jù)量下,更為明顯。
但需要注意,因?yàn)閞epartition會(huì)觸發(fā)shuffle,而要衡量好shuffle產(chǎn)生的代價(jià)和因?yàn)橛胷epartition增加并行度帶來(lái)的效益。

2.union算子

還是直接看源碼:

Spark分區(qū)并行度決定機(jī)制

Spark分區(qū)并行度決定機(jī)制

Spark分區(qū)并行度決定機(jī)制

通過(guò)分析源碼,RDD在調(diào)用union算子時(shí),最終生成的RDD分區(qū)數(shù)分兩種情況:
1)union的RDD分區(qū)器已定義并且它們的分區(qū)器相同

多個(gè)父RDD具有相同的分區(qū)器,union后產(chǎn)生的RDD的分區(qū)器與父RDD相同且分區(qū)數(shù)也相同。比如,n個(gè)RDD的分區(qū)器相同且是defined,分區(qū)數(shù)是m個(gè)。那么這n個(gè)RDD最終union生成的一個(gè)RDD的分區(qū)數(shù)仍是m,分區(qū)器也是相同的

2)不滿(mǎn)足第一種情況,則通過(guò)union生成的RDD的分區(qū)數(shù)為父RDD的分區(qū)數(shù)之和
4.cartesian算子

通過(guò)上述coalesce、repartition、union算子介紹和源碼分析,很容易分析cartesian算子的源碼。通過(guò)cartesian得到RDD分區(qū)數(shù)是其父RDD分區(qū)數(shù)的乘積。

Spark分區(qū)并行度決定機(jī)制

在Spark SQL中,任務(wù)并行度參數(shù)則要參考spark.sql.shuffle.partitions,筆者這里先放一張圖,詳細(xì)的后面講到Spark SQL時(shí)再細(xì)說(shuō):

Spark分區(qū)并行度決定機(jī)制

看下圖在Spark流式計(jì)算中,通常將SparkStreaming和Kafka整合,這里又分兩種情況:

1.Receiver方式生成的微批RDD即BlockRDD,分區(qū)數(shù)就是block數(shù)

2.Direct方式生成的微批RDD即kafkaRDD,分區(qū)數(shù)和kafka分區(qū)數(shù)一一對(duì)應(yīng)  

到此,相信大家對(duì)“Spark分區(qū)并行度決定機(jī)制”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI