您好,登錄后才能下訂單哦!
本文首發(fā)于 vivo互聯(lián)網(wǎng)技術(shù) 微信公眾號(hào)
作者:李勇
目錄:
1.SparkSql
2.連接查詢和連接條件
3.謂詞下推
4.內(nèi)連接查詢中的謂詞下推規(guī)則
4.1.Join后條件通過(guò)AND連接
4.2.Join后條件通過(guò)OR連接
4.3.分區(qū)表使用OR連接過(guò)濾條件
SparkSql 是架構(gòu)在 Spark 計(jì)算框架之上的分布式 Sql 引擎,使用 DataFrame 和 DataSet 承載結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)來(lái)實(shí)現(xiàn)數(shù)據(jù)復(fù)雜查詢處理,提供的 DSL可以直接使用 scala 語(yǔ)言完成 Sql 查詢,同時(shí)也使用? thriftserver 提供服務(wù)化的 Sql 查詢功能。
SparkSql 提供了 DataSource API ,用戶通過(guò)這套 API 可以自己開(kāi)發(fā)一套 Connector,直接查詢各類數(shù)據(jù)源,數(shù)據(jù)源包括 NoSql、RDBMS、搜索引擎以及 HDFS 等分布式文件系統(tǒng)上的文件等。和 SparkSql 類似的系統(tǒng)有 Hive、PrestoDB 以及 Impala,這類系統(tǒng)都屬于所謂的" Sql on Hadoop "系統(tǒng),每個(gè)都相當(dāng)火爆,畢竟在這個(gè)不搞 SQL 就是耍流氓的年代,沒(méi) SQL 確實(shí)很難找到用戶使用。
Sql中的連接查詢(join),主要分為內(nèi)連接查詢(inner join)、外連接查詢(outter join)和半連接查詢(semi join),具體的區(qū)別可以參考wiki的解釋。
連接條件(join condition),則是指當(dāng)這個(gè)條件滿足時(shí)兩表的兩行數(shù)據(jù)才能"join"在一起被返回,例如有如下查詢:
其中的"LT.id=RT.idAND LT.id>1"這部分條件被稱為"join中條件",直接用來(lái)判斷被join的兩表的兩行記錄能否被join在一起,如果不滿足這個(gè)條件,兩表的這兩行記錄并非全部被踢出局,而是根據(jù)連接查詢類型的不同有不同的處理,所以這并非一個(gè)單表的過(guò)濾過(guò)程或者兩個(gè)表的的“聯(lián)合過(guò)濾”過(guò)程;而where后的"RT.id>2"這部分被稱為"join后條件",這里雖然成為"join后條件",但是并非一定要在join后才能去過(guò)濾數(shù)據(jù),只是說(shuō)明如果在join后進(jìn)行過(guò)濾,肯定可以得到一個(gè)正確的結(jié)果,這也是我們后邊分析問(wèn)題時(shí)得到正確結(jié)果的基準(zhǔn)方法。
所謂謂詞(predicate),英文定義是這樣的:A predicate is a function that returns bool (or something that can be implicitly converted to bool),也就是返回值是true或者false的函數(shù),使用過(guò)scala或者spark的同學(xué)都知道有個(gè)filter方法,這個(gè)高階函數(shù)傳入的參數(shù)就是一個(gè)返回true或者false的函數(shù)。
但是如果是在sql語(yǔ)言中,沒(méi)有方法,只有表達(dá)式。where后邊的表達(dá)式起的作用正是過(guò)濾的作用,而這部分語(yǔ)句被sql層解析處理后,在數(shù)據(jù)庫(kù)內(nèi)部正是以謂詞的形式呈現(xiàn)的。
那么問(wèn)題來(lái)了,謂詞為什么要下推呢? SparkSql中的謂詞下推有兩層含義,第一層含義是指由誰(shuí)來(lái)完成數(shù)據(jù)過(guò)濾,第二層含義是指何時(shí)完成數(shù)據(jù)過(guò)濾。要解答這兩個(gè)問(wèn)題我們需要了解SparkSql的Sql語(yǔ)句處理邏輯,大致可以把SparkSql中的查詢處理流程做如下的劃分:
SparkSql首先會(huì)對(duì)輸入的Sql語(yǔ)句進(jìn)行一系列的分析(Analyse),包括詞法解析(可以理解為搜索引擎中的分詞這個(gè)過(guò)程)、語(yǔ)法分析以及語(yǔ)義分析(例如判斷database或者table是否存在、group by必須和聚合函數(shù)結(jié)合等規(guī)則);之后是執(zhí)行計(jì)劃的生成,包括邏輯計(jì)劃和物理計(jì)劃。其中在邏輯計(jì)劃階段會(huì)有很多的優(yōu)化,對(duì)謂詞的處理就在這個(gè)階段完成;而物理計(jì)劃則是RDD的DAG圖的生成過(guò)程;這兩步完成之后則是具體的執(zhí)行了(也就是各種重量級(jí)的計(jì)算邏輯,例如join、groupby、filter以及distinct等),這就會(huì)有各種物理操作符(RDD的Transformation)的亂入。
能夠完成數(shù)據(jù)過(guò)濾的主體有兩個(gè),第一是分布式Sql層(在execute階段),第二個(gè)是數(shù)據(jù)源。那么謂詞下推的第一層含義就是指由Sql層的Filter操作符來(lái)完成過(guò)濾,還是由Scan操作符在掃描階段完成過(guò)濾。
上邊提到,我們可以通過(guò)封裝SparkSql的Data Source API完成各類數(shù)據(jù)源的查詢,那么如果底層數(shù)據(jù)源無(wú)法高效完成數(shù)據(jù)的過(guò)濾,就會(huì)執(zhí)行全局掃描,把每條相關(guān)的數(shù)據(jù)都交給SparkSql的Filter操作符完成過(guò)濾,雖然SparkSql使用的Code Generation技術(shù)極大的提高了數(shù)據(jù)過(guò)濾的效率,但是這個(gè)過(guò)程無(wú)法避免大量數(shù)據(jù)的磁盤讀取,甚至在某些情況下會(huì)涉及網(wǎng)絡(luò)IO(例如數(shù)據(jù)非本地化存儲(chǔ)時(shí));如果底層數(shù)據(jù)源在進(jìn)行掃描時(shí)能非??焖俚耐瓿蓴?shù)據(jù)的過(guò)濾,那么就會(huì)把過(guò)濾交給底層數(shù)據(jù)源來(lái)完成(至于哪些數(shù)據(jù)源能高效完成數(shù)據(jù)的過(guò)濾以及SparkSql又是如何完成高效數(shù)據(jù)過(guò)濾的則不是本文討論的重點(diǎn),會(huì)在其他系列的文章中介紹)。
那么謂詞下推第二層含義,即何時(shí)完成數(shù)據(jù)過(guò)濾則一般是在指連接查詢中,是先對(duì)單表數(shù)據(jù)進(jìn)行過(guò)濾再和其他表連接還是在先把多表進(jìn)行連接再對(duì)連接后的臨時(shí)表進(jìn)行過(guò)濾,則是本系列文章要分析和討論的重點(diǎn)。
假設(shè)我們有兩張表,表結(jié)構(gòu)很簡(jiǎn)單,數(shù)據(jù)也都只有兩條,但是足以講清楚我們的下推規(guī)則,兩表如下,一個(gè)lefttable,一個(gè)righttable:
先來(lái)看一條查詢語(yǔ)句:
這個(gè)查詢是一個(gè)內(nèi)連接查詢,join后條件是用and連接的兩個(gè)表的過(guò)濾條件,假設(shè)我們不下推,而是先做內(nèi)連接判斷,這時(shí)是可以得到正確結(jié)果的,步驟如下:
左表id為1的行在右表中可以找到,即這兩行數(shù)據(jù)可以"join"在一起
至此,join的臨時(shí)結(jié)果表(之所以是臨時(shí)表,因?yàn)檫€沒(méi)有進(jìn)行過(guò)濾)如下:
然后使用where條件進(jìn)行過(guò)濾,顯然臨時(shí)表中的第一行不滿足條件,被過(guò)濾掉,最后結(jié)果如下:
來(lái)看看先進(jìn)行謂詞下推的情況。先對(duì)兩表進(jìn)行過(guò)濾,過(guò)濾的結(jié)果分別如下:
然后再對(duì)這兩個(gè)過(guò)濾后的表進(jìn)行內(nèi)連接處理,結(jié)果如下:
可見(jiàn),這和先進(jìn)行join再過(guò)濾得到的結(jié)果一致。
再來(lái)看一條查詢語(yǔ)句:
我們先進(jìn)行join處理,臨時(shí)表的結(jié)果如下:
然后使用where條件進(jìn)行過(guò)濾,最終查詢結(jié)果如下:
如果我們先使用where條件后每個(gè)表各自的過(guò)濾條件進(jìn)行過(guò)濾,那么兩表的過(guò)濾結(jié)果如下:
然后對(duì)這兩個(gè)臨時(shí)表進(jìn)行內(nèi)連接處理,結(jié)果如下:
表格有問(wèn)題吧,只有字段名,沒(méi)有字段值,怎么回事?是的,你沒(méi)看錯(cuò),確實(shí)沒(méi)有值,因?yàn)樽蟊磉^(guò)濾結(jié)果只有id為1的行,右表過(guò)濾結(jié)果只有id為2的行,這兩行是不能內(nèi)連接上的,所以沒(méi)有結(jié)果。
那么為什么where條件中兩表的條件被or連接就會(huì)出現(xiàn)錯(cuò)誤的查詢結(jié)果呢?分析原因主要是因?yàn)?,?duì)于or兩側(cè)的過(guò)濾條件,任何一個(gè)滿足條件即可以返回TRUE,那么對(duì)于"LT.value = 'two' OR RT.value = 'two' "這個(gè)查詢條件,如果使用LT.value='two'把只有LT.value為'two'的左表記錄過(guò)濾出來(lái),那么對(duì)于左表中LT.value不為two的行,他們可能在跟右表使用id字段連接上之后,右表的RT.value恰好為two,也滿足"LT.value = 'two' OR RT.value = 'two' ",但是可惜呀可惜,這行記錄因?yàn)橹暗拇直┨幚恚呀?jīng)被過(guò)濾掉,結(jié)果就是得到了錯(cuò)誤的查詢結(jié)果。所以這種情況下謂詞是不能下推的。
但是OR連接兩表join后條件也有兩個(gè)例外,這里順便分析第一個(gè)例外。第一個(gè)例外是過(guò)濾條件字段恰好為Join字段,比如如下的查詢:
在這個(gè)查詢中,join后條件依然是使用OR連接兩表的過(guò)濾條件,不同的是,join中條件不再是id相等,而是value字段相等,也就是說(shuō)過(guò)濾條件字段恰好就是join條件字段。大家可以自行采用上邊的分步法分析謂詞下推和不下推時(shí)的查詢結(jié)果,得到的結(jié)果是相同的。
我們來(lái)看看上邊不能下推時(shí)出現(xiàn)的情況在這種查詢里會(huì)不會(huì)出現(xiàn)。對(duì)于左表,如果使用LT.value='two'過(guò)濾掉不符合條件的其他行,那么因?yàn)閖oin條件字段也是value字段,說(shuō)明在左表中LT.value不等于two的行,在右表中也不能等于two,否則就不滿足"LT.value=RT.value"了。這里其實(shí)有一個(gè)條件傳遞的過(guò)程,通過(guò)join中條件,已經(jīng)在邏輯上提前把兩表整合成了一張表。
至于第二個(gè)例外,則涉及了SparkSql中的一個(gè)優(yōu)化,所以需要單獨(dú)介紹。
如果兩個(gè)表都是分區(qū)表,會(huì)出現(xiàn)什么情況呢?我們先來(lái)看如下的查詢:
此時(shí)左表和右表都不再是普通的表,而是分區(qū)表,分區(qū)字段是pt,按照日期進(jìn)行數(shù)據(jù)分區(qū)。同時(shí)兩表查詢條件依然使用OR進(jìn)行連接。試想,如果不能提前對(duì)兩表進(jìn)行過(guò)濾,那么會(huì)有非常巨量的數(shù)據(jù)要首先進(jìn)行連接處理,這個(gè)代價(jià)是非常大的。但是如果按照我們?cè)?中的分析,使用OR連接兩表的過(guò)濾條件,又不能隨意的進(jìn)行謂詞下推,那要如何處理呢?SparkSql在這里使用了一種叫做“分區(qū)裁剪”的優(yōu)化手段,即把分區(qū)并不看做普通的過(guò)濾條件,而是使用了“一刀切”的方法,把不符合查詢分區(qū)條件的目錄直接排除在待掃描的目錄之外。
我們知道分區(qū)表在HDFS上是按照目錄來(lái)存儲(chǔ)一個(gè)分區(qū)的數(shù)據(jù)的,那么在進(jìn)行分區(qū)裁剪時(shí),直接把要掃描的HDFS目錄通知Spark的Scan操作符,這樣,Spark在進(jìn)行掃描時(shí),就可以直接咔嚓掉其他的分區(qū)數(shù)據(jù)了。但是,要完成這種優(yōu)化,需要SparkSql的語(yǔ)義分析邏輯能夠正確的分析出Sql語(yǔ)句所要表達(dá)的精確目的,所以分區(qū)字段在SparkSql的元數(shù)據(jù)中也是獨(dú)立于其他普通字段,進(jìn)行了單獨(dú)的標(biāo)示,就是為了方便語(yǔ)義分析邏輯能區(qū)別處理Sql語(yǔ)句中where條件里的這種特殊情況。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。