您好,登錄后才能下訂單哦!
delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
鑒于merge操作的復(fù)雜性,下面主要對其進(jìn)行展開講解。
1.merge算子操作語法
merge操作的sql表達(dá)如下:
import io.delta.tables._
import org.apache.spark.sql.functions._
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
merge 編碼操作還是有些約束需要詳細(xì)描述的。
1.1 可以有(1,2,3)個wenMatched或者whenNotMatched的子語句。其中,whenMatched操作最多有兩個語句,whenNotMatched最多有一個子語句。
1.2 當(dāng)源表的數(shù)據(jù)和目標(biāo)表的數(shù)據(jù)滿足匹配條件的時候,執(zhí)行的是whenMatched語句。這些語句可以有以下幾個語義:
a) whenMatched語句最多有一個update和一個delete表達(dá)。merge中的update行為僅僅更新滿足條件的目標(biāo)表一行數(shù)據(jù)的指定列。而delete操作會刪除所有匹配的行。
b) 每個whenMatched語句都可以有一個可選的條件。如果該可選的條件存在,update和delete操作僅僅在該可選條件為true的時候,才會在匹配的目標(biāo)數(shù)據(jù)上執(zhí)行相應(yīng)操作。
c) 如果有兩個whenMatched子句,則將按照它們被指定的順序(即,子句的順序很重要)進(jìn)行執(zhí)行。第一個子句必須具有一個子句條件(否則,第二個子句將永遠(yuǎn)不會執(zhí)行)。
d) 如果兩個whenMatched子語句都有條件并且兩個子語句的條件都不為true,那不會對目標(biāo)數(shù)據(jù)進(jìn)行任何修改。
c) 支持滿足條件的源dataset中相關(guān)行的所有列同時更新到目標(biāo)detla表的相關(guān)列,表達(dá)式如下:
whenMatched(...).updateAll()
等價于:
whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
要保證源表和目標(biāo)表有相同的列,否則會拋出異常。
1.3 給定的條件,源表的一行數(shù)據(jù),跟目標(biāo)表沒有完成匹配的時候執(zhí)行whenNotMatched語句。該子語句有以下語法:
a) whenNotMatched僅僅支持insert表達(dá)。根據(jù)指定的列和相關(guān)的條件,該操作會在目標(biāo)表中插入一條新的數(shù)據(jù),當(dāng)目標(biāo)表中存在的列沒有明確的指定的時候,就插入null。
b) whenNotMatched語句可以有可選條件。如果指定了可選條件,數(shù)據(jù)僅僅會在可選條件為true的時候才會插入。否則,源列會被忽略。
c) 也可以插入匹配目標(biāo)表相關(guān)行的所有源表行的數(shù)據(jù)列,表達(dá)式:
whenNotMatched(...).insertAll()
等價于:
whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
要保證源表和目標(biāo)表有相同的列,否則就會拋出異常。
2.schema校驗
merge操作會自動校驗insert和update操作產(chǎn)生額數(shù)據(jù)schema是否與目標(biāo)表的schema匹配。規(guī)則如下:
a) 對于update和insert行為,指定的目標(biāo)列必須在目標(biāo)delta lake表中存在。
b) 對于updateAll和insertAll操作,源dataset必須包含所有目標(biāo)表的列。源dataset可以有目標(biāo)表中不存在的列,但是這些列會被忽略。當(dāng)然也可以通過配置保留僅源dataset有的列。
c) 對于所有操作,如果由生成目標(biāo)列的表達(dá)式生成的數(shù)據(jù)類型與目標(biāo)Delta表中的對應(yīng)列不同,則merge嘗試將其強(qiáng)制轉(zhuǎn)換為表中的類型。
3.自動schema轉(zhuǎn)換
默認(rèn)情況下,updateAll和insertAll操作僅僅會更新或插入在目標(biāo)表中有的相同列名的列,對于僅僅在源dataset中存在而目標(biāo)表中不存在的列,會被忽略。但是有些場景下,我們希望保留源dataset中新增的列。首先需要將前面介紹的一個參數(shù)spark.databricks.delta.schema.autoMerge.enabled設(shè)置為true。
注意:
a. schema自動增加僅僅是針對updateAll操作或者insertAll操作,或者兩者。
b. 僅僅頂層的列會被更改,而不是嵌套的列。
c. 更新和插入操作不能顯式引用目標(biāo)表中不存在的目標(biāo)列(即使其中有updateAll或insertAll作為子句之一)。
4.schema推斷與否對比
據(jù)一些例子,進(jìn)行schema自動推斷與不自動推斷的對比
對比一
目標(biāo)列(key,value),源列(key,value,newValue),對源源表執(zhí)行下面的sql操作:
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
沒有使用自動schema推斷的話:目標(biāo)表的schema信息是不會變的。僅僅key,value列被更新。
使用了schema推斷的話:表的schema就會演變?yōu)?key,value,newValue)。updateAll操作,會更新value和newValue列。對于insertAll操作會插入整行(key,value,newValue)。
對比二
目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql:
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
不使用schema推斷:updateAll和insertAll操作都會拋異常。
使用schema推斷:表的shema會演變?yōu)?key,oldValue,newValue)。updateAll操作會更新key和value列,而oldValue列不變。insertAll操作會插入(key,null,newValue),oldValue會插入null。
對比三
目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().update(Map( "newValue" -> col("s.newValue"))) .whenNotMatched().insertAll() .execute()
不使用schema推斷:update操作會拋出異常,因為newValue在目標(biāo)表中并不存在。
使用schema推斷:update操作會拋出異常,因為newValue在目標(biāo)表中并不存在。
對比四:
目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insert(Map( "key" -> col("s.key"), "newValue" -> col("s.newValue"))) .execute()
不使用schema推斷:insert操作會拋出異常,因為newValue在目標(biāo)表中并不存在。
使用schema推斷:insert操作依然會拋出異常,因為newValue在目標(biāo)表中并不存在。
5.性能調(diào)優(yōu)
下面幾個方法可以有效減少merge的處理時間:
a.減少匹配查找的數(shù)據(jù)量
默認(rèn)情況下,merge操作會掃描整個delta lake表找到滿足條件的數(shù)據(jù)。可以加些謂詞,以減少數(shù)據(jù)量。比如,數(shù)據(jù)是以country和date進(jìn)行分區(qū)的,而你只想更新特定國家的昨天的數(shù)據(jù)。就可以增加一些條件,比如:
events.date = current_date() AND events.country = 'USA'
這樣就只會處理指定分區(qū)的數(shù)據(jù),大大減少了數(shù)據(jù)掃描量。也可以避免不同分區(qū)之間操作的一些沖突。
b.合并文件
如果數(shù)據(jù)存儲的時候有很多小文件,就會降低數(shù)據(jù)的讀取速度。可以合并小文件成一些大文件,來提升讀取的速度。后面會說到這個問題。
c.控制shuffle的分區(qū)數(shù)
為了計算和更新數(shù)據(jù),merge操作會對數(shù)據(jù)進(jìn)行多次shuffle。shuffle過程中task數(shù)量是由參數(shù)spark.sql.shuffle.partitions來設(shè)置,默認(rèn)是200。該參數(shù)不僅能控制shuffle的并行度,也能決定輸出的文件數(shù)。增加這個值雖然可以增加并行度,但也相應(yīng)的增加了產(chǎn)生小文件數(shù)。
d.寫出數(shù)據(jù)之間進(jìn)行重分區(qū)
對與分區(qū)表,merge操作會產(chǎn)生很多小文件,會比shuffle分區(qū)數(shù)多很多。原因是每個shuffle任務(wù)會為多分區(qū)表產(chǎn)生更多的文件,這可能會是一個性能瓶頸。所以,很多場景中使用表的分區(qū)列對數(shù)據(jù)進(jìn)行寫入前重分區(qū)是很有效的??梢酝ㄟ^設(shè)置spark.delta.merge.repartitionBeforeWrite為true來生效。
關(guān)于delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。