溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃

發(fā)布時(shí)間:2021-12-13 09:58:56 來源:億速云 閱讀:162 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹了spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

基本例子

我們考慮一個(gè)簡單的例子,一個(gè)查詢中涉及到filter以及aggregation,join操作的語句:

# in PySpark API:
query  = (
    questionsDF
    .filter(col('year') == 2019)
    .groupBy('user_id')
    .agg(
        count('*').alias('cnt')
    )
    .join(usersDF, 'user_id')
)

我們把例子中的usersDF是一組問問題的用戶,這些問題用questionsDF來表示。這些問題用year的這一列來進(jìn)行分區(qū),代表著哪一年問的問題。在這個(gè)查詢里,我們對2019年問問題的用戶感興趣,并且想知道每個(gè)人問了多少問題,而且我們想知道在輸出中我們想知道一些額外信息,這就是為什么我們在聚合之后進(jìn)行了usersDF的join操作。
這里有兩種基本的方式去查看物理計(jì)劃。第一種是在DataFrame上調(diào)用explain函數(shù),該函數(shù)展現(xiàn)這個(gè)計(jì)劃的文本化的展示: spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃

這在spark 3.0有了一些優(yōu)化,explain函數(shù)帶有了一個(gè)新參數(shù) mode,這個(gè)參數(shù)的值可以是:formatted,cost,codegen。使用formatted模式將會把查詢計(jì)劃轉(zhuǎn)化為更加有組織的輸出(這里之展現(xiàn)了一部分): spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃 在formatted計(jì)劃中,我們能看到裸數(shù),改裸數(shù)只是展現(xiàn)了操作的名字并帶有一個(gè)括號的數(shù)字。在數(shù)的下面,這里有一些數(shù)字對應(yīng)的細(xì)節(jié)描述。cost模式將會展示除了物理計(jì)劃之外的優(yōu)化的邏輯計(jì)劃,這些邏輯計(jì)劃帶有每個(gè)操作的統(tǒng)計(jì)信息,所以我們能看到在不同執(zhí)行階段的數(shù)據(jù)大小。最終codegen模式展現(xiàn)了將會執(zhí)行的生成的java代碼。
第二種方式是查看spark ui中的sql tab,這里有正在跑的和已經(jīng)完成了的查詢。通過點(diǎn)擊你要查看的查詢,我們可以看到物理計(jì)劃的文本表示。在下面這個(gè)圖片中,我們結(jié)合圖形表示,文本表示以及它們之間的對應(yīng)關(guān)系: spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃 不同點(diǎn)是圖形表示的葉子節(jié)點(diǎn)在上面,根節(jié)點(diǎn)在下面,而文本表示的是反過來的。

CollapseCodegenStages

在物理計(jì)劃的圖形表示中,你能看到一些操作被組織成了一大塊藍(lán)色的矩形。這些大矩形對應(yīng)著codegen階段。這是發(fā)生在物理計(jì)劃的優(yōu)化階段。這個(gè)是叫做CollapseCodegenStages來負(fù)責(zé)優(yōu)化的,原理是把支持代碼生成的操作聚合到一起,通過消除虛擬函數(shù)的調(diào)用來加速。但是并不是所有的操作支持代碼生成。所以一些操作(如exchange操作)并不是大矩形的一部分。在我們的例子中,這里有三個(gè)codegen stages,對應(yīng)著三個(gè)大矩形,你能在操作的括號中看到codegen stage的id。從這個(gè)樹我們也可以分辨出一個(gè)操作是夠支持代碼生成,因?yàn)榧尤胫С执a生成的話,這里將會在對應(yīng)的操作的括號里有個(gè)星號。 spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃

我們簡單的分析一下在我們查詢中的每一個(gè)操作。

Scan parquet

scan parquet操作代表著從parquet文件中讀取數(shù)據(jù)。從明細(xì)信息中,我們能直接看到從這個(gè)數(shù)據(jù)源中我們選擇了哪些列。雖然我們沒指定具體的字段,但是這里也會應(yīng)用ColumnPruning規(guī)則,這個(gè)規(guī)則會確保只有真正字段才會從這個(gè)數(shù)據(jù)源中提取出來。我們也能看到有兩種filters:PartitionFilters和PushFilters。PartitionFilters應(yīng)用在數(shù)據(jù)源分區(qū)的字段上。這是非常重要的因?yàn)槲覀兡芴^我們不需要的數(shù)據(jù)。檢查對應(yīng)的filters是否傳播到正確的位置總是沒錯(cuò)的。這是因?yàn)槲覀儽M可能讀取少量的數(shù)據(jù),因?yàn)镮O是比較費(fèi)時(shí)的。在spark 2.4,這里還有一個(gè)代表實(shí)際讀取到的分區(qū)的partitionCount字段,這個(gè)字段在spark 3.0已經(jīng)去掉了。
PushFilters把字段直接下推到parquet文件中去,假如parquet文件過濾的列是按照過濾字段排序的話,這個(gè)規(guī)則就很有用了,因?yàn)檫@種情況下,我們能利用parquet內(nèi)部結(jié)構(gòu)去過濾數(shù)據(jù)。parquet文件是按照行組和每個(gè)行組的元數(shù)據(jù)文件組成的。這個(gè)元數(shù)據(jù)包含了每個(gè)行組的最大最小值,基于這個(gè)信息,我們就能判斷是否讀取這個(gè)行組。

Filter

Filter操作佷容易理解。它僅僅是代表過濾條件。但是這個(gè)操作怎么創(chuàng)建的并不是很明顯,因?yàn)樵诓樵冎兴⒉皇侵苯訉?yīng)著過濾條件。因?yàn)樗械膄ilters首先被Catalyst optimzer處理,改規(guī)則可能修改或者重新移動(dòng)她們。這里有好幾個(gè)規(guī)則在她們轉(zhuǎn)換為物理計(jì)劃前的邏輯計(jì)劃。我們列舉了一下:

  • PushDownPredicates-這個(gè)規(guī)則通過其他的操作把filter下推到離數(shù)據(jù)源更近的地方,但不是所有的操作都支持。比如,如果表達(dá)式不是確定性的,這就不行,假如我們使用類似first,last,collect_set,collect_list,rand等,filters操作就不能通過這些操作而進(jìn)行下推,因?yàn)檫@些函數(shù)是不確定性的。

  • CombineFilters-結(jié)合兩個(gè)臨近的操作合成一個(gè)(收集兩個(gè)filters條件合成一個(gè)更為復(fù)雜的的條件)

  • InferFiltersFromConstraints-這個(gè)規(guī)則實(shí)際上會創(chuàng)建新的filter操作,如從join操作(從inner join中創(chuàng)建一個(gè)joining key is not null)

  • PruneFilters-移除多余的filters(比如一個(gè)filters總是true)

Exchange

Exchange操作代表著shuffle操作,意味著物理數(shù)據(jù)的集群范圍內(nèi)的移動(dòng)。這個(gè)操作是很費(fèi)時(shí)的,因?yàn)樗鼤ㄟ^網(wǎng)絡(luò)移動(dòng)數(shù)據(jù)。查詢計(jì)劃的信息也包含了一些數(shù)據(jù)重新分區(qū)的細(xì)節(jié)。在我們的例子中,是hashPartitioning(user_id,200): spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃 這意味著數(shù)據(jù)將會根據(jù)user_id列重新分區(qū)為200個(gè)分區(qū),有著同樣user_id的行將會屬于同一個(gè)分區(qū),將會分配到同一個(gè)executor上。為了確保只有200分區(qū),spark將會計(jì)算user_id的hashcode并且對200取模。這個(gè)結(jié)果就是不同的user_ids就會分到同一個(gè)分區(qū)。同時(shí)有些分區(qū)可能是空的。這里也有其他類型的分區(qū)值的去留意一下:

  • RoundRobinPartitioning-數(shù)據(jù)將會隨機(jī)分配到n個(gè)分區(qū)中,n在函數(shù)repartition(n)中指定

  • SinglePartition-所有數(shù)據(jù)將會分配到一個(gè)分區(qū)中,進(jìn)而到一個(gè)executor中。

  • RangePartitioning-這個(gè)用在對數(shù)據(jù)排序中,用在orderBy或者sort操作中

HashAggregate

這個(gè)代表著數(shù)據(jù)聚合,這個(gè)經(jīng)常是兩個(gè)操作,要么被Exchange分開或者不分開: spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃 為什么這里有兩個(gè)HashAggregate操作的原因是第一個(gè)是部分聚合,它在每個(gè)executor上每個(gè)分區(qū)分別進(jìn)行聚合。在我們的例子中,你能看到partial_count(1)的function字段,最終的部分聚合結(jié)果就是第二個(gè)聚合。這個(gè)操作也展示了數(shù)據(jù)按照哪個(gè)分組的Keys字段。results字段展示了在聚合以后的可用的列。

BroadcastHashJoin & BroadcastExchange

BroadcastHashJoin(BHJ)代表著join算法的操作,除了這個(gè),還有SortMergeJoin和ShuffleHashJoin。BHJ總是伴隨著BroadcastExchange,這個(gè)代表著廣播shuffle-數(shù)據(jù)將會收集到driver端并且會被傳播到需要的executor上。

ColumnarToRow

這是在spark 3.0引入的新操作,用于列行之間的轉(zhuǎn)換

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“spark 3.0中如何實(shí)現(xiàn)查詢計(jì)劃”這篇文章對大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI