溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark SQL中掌控sql語句的執(zhí)行是怎么樣的

發(fā)布時(shí)間:2021-12-17 10:34:34 來源:億速云 閱讀:154 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān) Spark SQL中掌控sql語句的執(zhí)行是怎么樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

背景

自spark 2.x 的sql以及申明行DataFrame APi以來,在spark查詢數(shù)據(jù)越來越方便。僅僅用幾行代碼就能表達(dá)出復(fù)雜的查詢邏輯以及實(shí)現(xiàn)復(fù)雜的操作。 這個(gè)api最大的優(yōu)勢在于用戶不需要考慮太多的執(zhí)行情況,自動(dòng)有優(yōu)化器優(yōu)化出最有效率的執(zhí)行方式去執(zhí)行此次查詢。而且有效的查詢語句執(zhí)行不僅是因?yàn)槟軌蚬?jié)約資源,而且能夠減少終端用戶等待結(jié)果的時(shí)間。
Spark SQL 優(yōu)化器實(shí)際上是很成熟的,尤其是隨著3.0的到來,該版本會(huì)引入一些新特性,比如動(dòng)態(tài)分支裁剪以及動(dòng)態(tài)查詢執(zhí)行。 優(yōu)化器是工作在查詢計(jì)劃內(nèi)部的并且能夠應(yīng)用各種規(guī)則去優(yōu)化查詢計(jì)劃。 例如能夠改變transformation的執(zhí)行順序或者對于不影響最終結(jié)果的直接丟棄。雖然有很多優(yōu)秀的優(yōu)化,但是有些場景人是能夠做的更好的。在這篇文章里,我們就來看一下特例,并且使用一些技巧來更好的執(zhí)行查詢計(jì)劃。

例子

首先讓我們來引入一個(gè)例子。加入我們有下列json格式的數(shù)據(jù):

{"id": 1, "user_id": 100, "price": 50}
{"id": 2, "user_id": 100, "price": 200}
{"id": 3, "user_id": 101, "price": 120}
{"id": 4, "price": 120}

每一個(gè)記錄就像一個(gè)事務(wù),而user_id這一列可能包含了很多重復(fù)的值(也可能包含null),除此之外還有其他的列來描述這個(gè)事務(wù)。 現(xiàn)在我們的查詢是基于兩個(gè)聚合的union操作,兩個(gè)聚合的不同僅僅在于過濾條件的不同。在第一個(gè)聚合中我們想要獲取價(jià)格總和小于50的用戶,第二個(gè)聚合中我們想要獲取價(jià)格綜合大于100的用戶,而且在第二個(gè)聚合中我們只考慮user_id不為null的。這個(gè)例子只是復(fù)雜例子的簡化版本,但是這種復(fù)雜的例子是實(shí)際存在的。 以下是使用PySpark DataFrame API去表達(dá)我們想要的查詢:

df = spark.read.json(data_path)
df_small = (
df
.groupBy("user_id")
.agg(sum("price").alias("price"))
.filter(col("price") < 50)
)
df_big = (
df
.filter(col("user_id").isNotNull())
.groupBy("user_id")
.agg(sum("price").alias("price"))
.filter(col("price") > 100)  
)
result = df_small.union(df_big)

計(jì)劃的解釋翻譯

對于優(yōu)化查詢性能的關(guān)鍵點(diǎn)在于能夠去理解并解釋翻譯查詢計(jì)劃。計(jì)劃的本身是能夠通過Spark DataFrame explain函數(shù)展示出來的,或者如果計(jì)劃已經(jīng)是在運(yùn)行了,我們可以通過Spark UI找到SQL這個(gè)tab,從而找到該計(jì)劃。 Spark SQL中掌控sql語句的執(zhí)行是怎么樣的 這個(gè)SQL tab中有已經(jīng)完成的和正在運(yùn)行的查詢列表,所以選中我們的查詢就能看到物理計(jì)劃的圖形化展示(這里我們移除了指標(biāo)信息,這樣能夠使圖??更加簡單) Spark SQL中掌控sql語句的執(zhí)行是怎么樣的 這個(gè)計(jì)劃是樹形結(jié)構(gòu),每個(gè)節(jié)點(diǎn)代表了一些操作,并且攜帶了一些執(zhí)行信息。我們可以看到這個(gè)例子中我們有兩個(gè)分支,和一個(gè)root分支在最底層,葉子在最頂層,也是執(zhí)行開始的地方。scan json葉子節(jié)點(diǎn)代表從source中讀取數(shù)據(jù),然后這里有一對hashAggregate操作,代表著聚合。在這兩個(gè)聚合操作之間有一個(gè)Exchange操作,代表著shuffle。filters操作攜帶著過濾條件信息。
這個(gè)計(jì)劃是一個(gè)典型的union操作,每一個(gè)dataframe都有一個(gè)新的分支,而且因?yàn)槲覀兊睦又蠨ataFrame是基于同樣的數(shù)據(jù)源,這就意味著該數(shù)據(jù)源被scan了兩次。現(xiàn)在我們能明白這里是存在優(yōu)化的空間的.讓數(shù)據(jù)源只被scan一次是一個(gè)很好的優(yōu)化,尤其是在IO代價(jià)非常大的情況下。
在這里我們想要實(shí)現(xiàn)的是重利用計(jì)算--scan數(shù)據(jù)和聚合的計(jì)算,因?yàn)樵贒ataFrame上的操作是一樣的,原則上計(jì)算一次就足夠了。

Cache緩存

spark中一個(gè)典型的解決重新計(jì)算的方法是利用cache。在DataFrame中有一個(gè)cache函數(shù):

df.cache()

這個(gè)是一個(gè)延遲轉(zhuǎn)換,意味著只有在一些action觸發(fā)后數(shù)據(jù)才會(huì)放到緩存層,在spark中Caching是一個(gè)很普通的操作,然而這是有限制的,特別是數(shù)據(jù)量很大和集群集資源非常緊張的情況下。而且我們必須意識(shí)到存儲(chǔ)數(shù)據(jù)在緩沖層是需要額外的開銷的,而且操作自身也是需要開銷的。 在整個(gè)DataFrame df中調(diào)用cache操作并不能優(yōu)化因?yàn)檫@個(gè)操作會(huì)緩存所有的列到存儲(chǔ)中。一個(gè)更好的方法是只緩存選擇被使用的字段。

重新使用Exchage

除了緩存,也還有另一種方法,這個(gè)方法不好用圖形化描述,且基于重新利用Exchange。這個(gè)Exchange操作代表著用來集群之間移動(dòng)數(shù)據(jù)的shuffle操作。shuffle操作一般在聚合,join,和一些轉(zhuǎn)換操作中會(huì)用到。關(guān)于shuffle比較重要的事是spark總是會(huì)把shuffle 寫的數(shù)據(jù)存儲(chǔ)在磁盤,而且因?yàn)榇鎯?chǔ)在磁盤,在必要的時(shí)候可以重新被使用。實(shí)際上spark在某個(gè)時(shí)機(jī)上會(huì)重新利用該數(shù)據(jù)。比如在spark發(fā)現(xiàn)從葉子節(jié)點(diǎn)到exchange節(jié)點(diǎn)的多個(gè)分支時(shí)重復(fù)的時(shí)候就會(huì)進(jìn)行reuse操作[ReuseExchange規(guī)則],如果存在這種情況,說明我們這些重復(fù)的分支是有一樣的計(jì)算,是可以重新被使用的。我們可以從計(jì)劃中識(shí)別出來是否有這種場景,因?yàn)檫@些分支應(yīng)該像以下這樣: Spark SQL中掌控sql語句的執(zhí)行是怎么樣的 在我們的例子中,spark并不會(huì)重新利用Exchange,但是可以利用一些技巧而從使它被重新利用。為什么在我們的例子中Exchange不能被重新利用的原因是右邊的分支有著user_id不為null的條件。該過濾條件是union操作的兩個(gè)分支的唯一不同點(diǎn),如果我們能消除這個(gè)不同點(diǎn),spark將會(huì)重新利用EXchange。

計(jì)劃的改進(jìn)

我們怎么樣才能分支是一樣的呢?假如說是這個(gè)filer操作導(dǎo)致的,那我們可以顛倒filter的順序,在聚合之后再進(jìn)行過濾操作,因?yàn)檫@個(gè)對結(jié)果沒有影響。然而這有一個(gè)陷阱。假如我們?nèi)缦逻@樣修改:

df_big = (
 df.groupBy("user_id")
 .agg(sum("price").alias("price"))
 .filter(col("price") > 100)
 .filter(col("price").isNotNull())
)

再一次檢查最終的查詢計(jì)劃,我們發(fā)現(xiàn)這個(gè)計(jì)劃沒有改變。解釋很簡單--這個(gè)filter操作被優(yōu)化器移動(dòng)了。

從概念上來講,存在著兩種計(jì)劃 邏輯計(jì)劃和物理計(jì)劃,這個(gè)時(shí)很好理解的。并且邏輯計(jì)劃在轉(zhuǎn)換為物理計(jì)劃前會(huì)經(jīng)過一個(gè)優(yōu)化階段。當(dāng)我們改變了一些轉(zhuǎn)換以后,直接反應(yīng)在邏輯計(jì)劃中。優(yōu)化器會(huì)應(yīng)用一系列的優(yōu)化規(guī)則,這些規(guī)則通常是基于推斷的。在我們的例子中,這個(gè)規(guī)則是PushDownPredicate,該規(guī)則是確保filters操作盡量被移動(dòng)到靠近數(shù)據(jù)源的位置。它來源于進(jìn)行過濾操作再進(jìn)行數(shù)據(jù)集的操作效率更高。這個(gè)規(guī)則在大部分場景是很有用的。 然而在這里卻不適用我們的例子。
為了讓filter在合適的位置,我們必須限制優(yōu)化器。從spark 2.4以來我們可以通過配置項(xiàng)來讓優(yōu)化器排除某種規(guī)則:

spark.conf.set(
"spark.sql.optimizer.excludedRules",     "org.apache.spark.sql.catalyst.optimizer.PushDownPredicate")

設(shè)置了這個(gè)以后,再一次運(yùn)行查詢語句,我們能看到filters操作的位置就如我們想的一樣。這兩個(gè)分支是一樣的了,spark將會(huì)重新利用Exchange,數(shù)據(jù)將會(huì)只會(huì)被掃描一次,聚合操作也只會(huì)計(jì)算一次。
在spark 3.0 情況有些不用,優(yōu)化規(guī)則有不同的名字--PushDownPredicates,而且還有一個(gè)額外的規(guī)則用來下推filter-PushPredicateThroughNonJoin,所以實(shí)際上我們需要排除兩個(gè)規(guī)則。

總結(jié)

我們看到通過這個(gè),spark 開發(fā)者給了我們一種控制優(yōu)化器的能力。但是也伴隨著一種責(zé)任,我們列舉了一下當(dāng)使用這種技術(shù)的一些重點(diǎn):

  • 當(dāng)我們排除了PushDownPredicate,我們就得對這個(gè)查詢中所有的filter負(fù)責(zé),不僅僅是我們想要重新定位的filter。 這個(gè)還存在著另一種filter,這種filter很大概率出現(xiàn)的,例如分區(qū)filter,所以我們需要確保他們被放在合適的位置。

  • 限制了優(yōu)化器,使用filter就是用戶的工作了。在我們的例子中,加速查詢是在IO比較昂貴的情況下,因?yàn)槲覀兡軐?shí)現(xiàn)數(shù)據(jù)只能被瀏覽一次,如果數(shù)據(jù)有很多列,這適用在文件格式不是列格式的青情況下,像json或者csv格式

  • 如果數(shù)據(jù)集很小,就不值得控制優(yōu)化器了,反而cache能達(dá)到同樣的效果。然而當(dāng)數(shù)據(jù)集很大的時(shí)候,存儲(chǔ)數(shù)據(jù)的額外開銷就很明顯了。從另一方面說,重新利用Exchange就沒有額外的開銷了,因?yàn)閟huffle數(shù)據(jù)都存儲(chǔ)在磁盤

  • 這個(gè)技術(shù)基于spark內(nèi)部的行為,并沒有官方文檔,并且如果以后功能上有改動(dòng),很難去察覺。在我們的例子中,在spark 3.0中是有改動(dòng)的,首先規(guī)則被重命名,并且加上了另一個(gè)規(guī)則

結(jié)論

我們知道如果要實(shí)現(xiàn)優(yōu)化的前提是我們能夠理解查詢計(jì)劃。spark的優(yōu)化器通過一系列的推導(dǎo)規(guī)則能夠很好的優(yōu)化我們的查詢。然而這里也有一些場景優(yōu)化規(guī)則是不適用的。 有時(shí)候查詢重寫很好,有時(shí)候不好,因?yàn)橹貙懖樵儗?huì)實(shí)現(xiàn)不同的邏輯計(jì)劃,并且我們不能直接控制被執(zhí)行的物理計(jì)劃。因?yàn)閺膕park 2.4以來,我們可以通過配置excludedRules來限制優(yōu)化器,從未來定制了一些常規(guī)的物理計(jì)劃。
在很多場景中,依賴于優(yōu)化器我們可以得到固定的計(jì)劃,并且有一個(gè)高效的執(zhí)行。然而 這里有一些性能壓力,這里我們可以檢查最終的計(jì)劃,并且查看是否可以通過限制優(yōu)化器來進(jìn)行優(yōu)化。

上述就是小編為大家分享的 Spark SQL中掌控sql語句的執(zhí)行是怎么樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI