溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行數(shù)據(jù)湖deltalake流表的讀寫

發(fā)布時(shí)間:2021-12-23 16:47:38 來源:億速云 閱讀:124 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹如何進(jìn)行數(shù)據(jù)湖deltalake流表的讀寫,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常見的與流系統(tǒng)和文件整合帶來的相關(guān)限制,如下:

  • 保證了多個(gè)流(或并發(fā)批處理作業(yè))的僅一次處理。

  • 當(dāng)使用文件作為流源時(shí),可以有效地發(fā)現(xiàn)哪些文件是新文件。

1. 作為stream source

1.1 案例講解

當(dāng)你的structured streaming使用delta lake作為stream source的時(shí)候,應(yīng)用會(huì)處理delta 表中已有的數(shù)據(jù),以及delta 表新增的數(shù)據(jù)。

spark.readStream.format("delta").load("/delta/events")

也可以做一些優(yōu)化,如下:

a.通過maxFilesPerTrigger配置控制structured streaming從delta lake加載的微批文件數(shù)。要知道Structured streaming也是微批的概念。該參數(shù)就是控制每次trigger計(jì)算的最大新增文件數(shù),默認(rèn)是1000,實(shí)際情況要根據(jù)數(shù)據(jù)量和資源數(shù)量進(jìn)行控制。

b.通過maxBytesPerTrigger控制每次trigger處理的最大數(shù)據(jù)量。這是設(shè)置一個(gè)“ soft max”,這意味著一個(gè)批處理大約可以處理此數(shù)量的數(shù)據(jù),并且可能處理的數(shù)量超出這個(gè)限制。如果使用的是Trigger.Once,則 此配置無效。如果將此配置與maxFilesPerTrigger結(jié)合使用,兩個(gè)參數(shù)任意一個(gè)達(dá)到臨屆條件,都會(huì)生效。

1.2 忽略更新和刪除

structured streaming不處理不是追加的輸入數(shù)據(jù),并且如果對(duì)作為source的delta table的表進(jìn)行了任何修改,則structured streaming會(huì)拋出異常。 對(duì)于變更常見的企業(yè)場景,提供了兩種策略,來處理對(duì)delta 表變更給structured streaming 任務(wù)造成的影響:

  • 可以刪除輸出和checkpoint,并重新啟動(dòng)structured streaming對(duì)數(shù)據(jù)計(jì)算,也即是重新計(jì)算一次。

  • 可以設(shè)置以下兩個(gè)選項(xiàng)之一:

    • ignoreDeletes:忽略在分區(qū)表中刪除數(shù)據(jù)的事務(wù)。

    • ignoreChanges:如果由于諸如UPDATE,MERGE INTO,DELETE(在分區(qū)內(nèi))或OVERWRITE之類的數(shù)據(jù)更改操作而不得不在源表中重寫文件,則重新處理更新的文件。因此未更改的行仍可能會(huì)處理并向下游傳輸,因此structured streaming的下游應(yīng)該能夠處理重復(fù)數(shù)據(jù)。刪除不會(huì)傳輸?shù)较掠?。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則流不會(huì)因源表的刪除或更新而中斷。

1.3 案例

假設(shè)有一張表叫做user_events,有三個(gè)字段:date,user_email,action,而且該表以date字段進(jìn)行分區(qū)。structured streaming區(qū)處理這張表,且還有其程序會(huì)對(duì)該delta 表進(jìn)行插入和刪除操作。

假設(shè)僅僅是刪除操作,可以這么配置stream:

events.readStream  .format("delta")  .option("ignoreDeletes", "true")  .load("/delta/user_events")

假設(shè)對(duì)delta表修改操作,可以這么配置stream:

events.readStream  .format("delta")  .option("ignoreChanges", "true")  .load("/delta/user_events")

如果使用UPDATE語句更新了user_email字段某個(gè)值,則包含相關(guān)user_email的文件將被重寫,這個(gè)是delta lake更改操作實(shí)現(xiàn)機(jī)制后面會(huì)講。使用ignoreChanges時(shí),新記錄將與同一文件中的所有其他未更改記錄一起向下游傳輸。 所以下游程序應(yīng)該能夠處理這些傳入的重復(fù)記錄。

2.delta 表作為sink

delta table可以作為Structured Streaming的sink使用。delta lake的事務(wù)日志確保了其能實(shí)現(xiàn)僅一次處理。

2.1 append mode

默認(rèn)是append 模式,僅僅是追加數(shù)據(jù)到delta 表:

events.writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")  .start("/delta/events") // as a path

2.2 complete mode

也可以使用Structured Streaming每個(gè)批次覆蓋一次整張表。在某些聚合場景下會(huì)用到該模式:

  .format("delta")  .load("/delta/events")  .groupBy("customerId")  .count()  .writeStream  .format("delta")  .outputMode("complete")  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")  .start("/delta/eventsByCustomer")

對(duì)于延遲要求更寬松的應(yīng)用程序,可以使用Trigger.Once來節(jié)省計(jì)算資源。once trigger每次處理從開始到最新的數(shù)據(jù),典型的kappa模型,很適合這種場景了。

關(guān)于如何進(jìn)行數(shù)據(jù)湖deltalake流表的讀寫就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI