溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的

發(fā)布時間:2021-12-23 16:49:04 來源:億速云 閱讀:202 作者:柒染 欄目:大數(shù)據(jù)

delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

鑒于merge操作的復(fù)雜性,下面主要對其進(jìn)行展開講解。

1.merge算子操作語法

merge操作的sql表達(dá)如下:

import io.delta.tables._import org.apache.spark.sql.functions._

DeltaTable.forPath(spark, "/data/events/")  .as("events")  .merge(    updatesDF.as("updates"),    "events.eventId = updates.eventId")  .whenMatched  .updateExpr(    Map("data" -> "updates.data"))  .whenNotMatched  .insertExpr(    Map(      "date" -> "updates.date",      "eventId" -> "updates.eventId",      "data" -> "updates.data"))  .execute()

merge 編碼操作還是有些約束需要詳細(xì)描述的。

1.1 可以有(1,2,3)個wenMatched或者whenNotMatched的子語句。其中,whenMatched操作最多有兩個語句,whenNotMatched最多有一個子語句。

1.2 當(dāng)源表的數(shù)據(jù)和目標(biāo)表的數(shù)據(jù)滿足匹配條件的時候,執(zhí)行的是whenMatched語句。這些語句可以有以下幾個語義:

a) whenMatched語句最多有一個update和一個delete表達(dá)。merge中的update行為僅僅更新滿足條件的目標(biāo)表一行數(shù)據(jù)的指定列。而delete操作會刪除所有匹配的行。

b) 每個whenMatched語句都可以有一個可選的條件。如果該可選的條件存在,update和delete操作僅僅在該可選條件為true的時候,才會在匹配的目標(biāo)數(shù)據(jù)上執(zhí)行相應(yīng)操作。

c) 如果有兩個whenMatched子句,則將按照它們被指定的順序(即,子句的順序很重要)進(jìn)行執(zhí)行。第一個子句必須具有一個子句條件(否則,第二個子句將永遠(yuǎn)不會執(zhí)行)。

d) 如果兩個whenMatched子語句都有條件并且兩個子語句的條件都不為true,那不會對目標(biāo)數(shù)據(jù)進(jìn)行任何修改。

c) 支持滿足條件的源dataset中相關(guān)行的所有列同時更新到目標(biāo)detla表的相關(guān)列,表達(dá)式如下:

whenMatched(...).updateAll()

等價于:

whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保證源表和目標(biāo)表有相同的列,否則會拋出異常。

1.3 給定的條件,源表的一行數(shù)據(jù),跟目標(biāo)表沒有完成匹配的時候執(zhí)行whenNotMatched語句。該子語句有以下語法:

a) whenNotMatched僅僅支持insert表達(dá)。根據(jù)指定的列和相關(guān)的條件,該操作會在目標(biāo)表中插入一條新的數(shù)據(jù),當(dāng)目標(biāo)表中存在的列沒有明確的指定的時候,就插入null。

b) whenNotMatched語句可以有可選條件。如果指定了可選條件,數(shù)據(jù)僅僅會在可選條件為true的時候才會插入。否則,源列會被忽略。

c) 也可以插入匹配目標(biāo)表相關(guān)行的所有源表行的數(shù)據(jù)列,表達(dá)式:

whenNotMatched(...).insertAll()

等價于:

whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保證源表和目標(biāo)表有相同的列,否則就會拋出異常。

2.schema校驗

merge操作會自動校驗insert和update操作產(chǎn)生額數(shù)據(jù)schema是否與目標(biāo)表的schema匹配。規(guī)則如下:

a) 對于update和insert行為,指定的目標(biāo)列必須在目標(biāo)delta lake表中存在。

b) 對于updateAll和insertAll操作,源dataset必須包含所有目標(biāo)表的列。源dataset可以有目標(biāo)表中不存在的列,但是這些列會被忽略。當(dāng)然也可以通過配置保留僅源dataset有的列。

c) 對于所有操作,如果由生成目標(biāo)列的表達(dá)式生成的數(shù)據(jù)類型與目標(biāo)Delta表中的對應(yīng)列不同,則merge嘗試將其強(qiáng)制轉(zhuǎn)換為表中的類型。

3.自動schema轉(zhuǎn)換

默認(rèn)情況下,updateAll和insertAll操作僅僅會更新或插入在目標(biāo)表中有的相同列名的列,對于僅僅在源dataset中存在而目標(biāo)表中不存在的列,會被忽略。但是有些場景下,我們希望保留源dataset中新增的列。首先需要將前面介紹的一個參數(shù)spark.databricks.delta.schema.autoMerge.enabled設(shè)置為true。

注意:

a. schema自動增加僅僅是針對updateAll操作或者insertAll操作,或者兩者。

b. 僅僅頂層的列會被更改,而不是嵌套的列。

c. 更新和插入操作不能顯式引用目標(biāo)表中不存在的目標(biāo)列(即使其中有updateAll或insertAll作為子句之一)。 

4.schema推斷與否對比

據(jù)一些例子,進(jìn)行schema自動推斷與不自動推斷的對比

對比一

目標(biāo)列(key,value),源列(key,value,newValue),對源源表執(zhí)行下面的sql操作:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

沒有使用自動schema推斷的話:目標(biāo)表的schema信息是不會變的。僅僅key,value列被更新。

使用了schema推斷的話:表的schema就會演變?yōu)?key,value,newValue)。updateAll操作,會更新value和newValue列。對于insertAll操作會插入整行(key,value,newValue)。

對比二

目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

不使用schema推斷:updateAll和insertAll操作都會拋異常。

使用schema推斷:表的shema會演變?yōu)?key,oldValue,newValue)。updateAll操作會更新key和value列,而oldValue列不變。insertAll操作會插入(key,null,newValue),oldValue會插入null。

對比三

目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().update(Map(    "newValue" -> col("s.newValue")))  .whenNotMatched().insertAll()  .execute()

不使用schema推斷:update操作會拋出異常,因為newValue在目標(biāo)表中并不存在。

使用schema推斷:update操作會拋出異常,因為newValue在目標(biāo)表中并不存在。

對比四:

目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insert(Map(    "key" -> col("s.key"),    "newValue" -> col("s.newValue")))  .execute()

不使用schema推斷:insert操作會拋出異常,因為newValue在目標(biāo)表中并不存在。

使用schema推斷:insert操作依然會拋出異常,因為newValue在目標(biāo)表中并不存在。

5.性能調(diào)優(yōu)

下面幾個方法可以有效減少merge的處理時間:

a.減少匹配查找的數(shù)據(jù)量

默認(rèn)情況下,merge操作會掃描整個delta lake表找到滿足條件的數(shù)據(jù)。可以加些謂詞,以減少數(shù)據(jù)量。比如,數(shù)據(jù)是以country和date進(jìn)行分區(qū)的,而你只想更新特定國家的昨天的數(shù)據(jù)。就可以增加一些條件,比如:

events.date = current_date() AND events.country = 'USA'

這樣就只會處理指定分區(qū)的數(shù)據(jù),大大減少了數(shù)據(jù)掃描量。也可以避免不同分區(qū)之間操作的一些沖突。

b.合并文件

如果數(shù)據(jù)存儲的時候有很多小文件,就會降低數(shù)據(jù)的讀取速度。可以合并小文件成一些大文件,來提升讀取的速度。后面會說到這個問題。

c.控制shuffle的分區(qū)數(shù)

為了計算和更新數(shù)據(jù),merge操作會對數(shù)據(jù)進(jìn)行多次shuffle。shuffle過程中task數(shù)量是由參數(shù)spark.sql.shuffle.partitions來設(shè)置,默認(rèn)是200。該參數(shù)不僅能控制shuffle的并行度,也能決定輸出的文件數(shù)。增加這個值雖然可以增加并行度,但也相應(yīng)的增加了產(chǎn)生小文件數(shù)。

d.寫出數(shù)據(jù)之間進(jìn)行重分區(qū)

對與分區(qū)表,merge操作會產(chǎn)生很多小文件,會比shuffle分區(qū)數(shù)多很多。原因是每個shuffle任務(wù)會為多分區(qū)表產(chǎn)生更多的文件,這可能會是一個性能瓶頸。所以,很多場景中使用表的分區(qū)列對數(shù)據(jù)進(jìn)行寫入前重分區(qū)是很有效的??梢酝ㄟ^設(shè)置spark.delta.merge.repartitionBeforeWrite為true來生效。

關(guān)于delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI