溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控

發(fā)布時(shí)間:2021-12-17 09:10:38 來源:億速云 閱讀:449 作者:柒染 欄目:云計(jì)算

本篇文章給大家分享的是有關(guān)Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

小編主要對(duì)Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控的方法和架構(gòu)進(jìn)行了介紹,下面探討了一種數(shù)據(jù)管理架構(gòu),該架構(gòu)可以在數(shù)據(jù)到達(dá)時(shí),通過主動(dòng)監(jiān)控和分析來檢測(cè)流式數(shù)據(jù)中損壞或不良的數(shù)據(jù),并且不會(huì)造成瓶頸。

構(gòu)建流式數(shù)據(jù)分析和監(jiān)控流程

在Databricks,我們看到客戶中不斷涌現(xiàn)出許多數(shù)據(jù)處理模式,這些新模式的產(chǎn)生推動(dòng)了可能的極限,在速度和質(zhì)量問題上也不例外。為了幫助解決這一矛盾,我們開始考慮使用正確的工具,不僅可以支持所需的數(shù)據(jù)速度,還可以提供可接受的數(shù)據(jù)質(zhì)量水平。Structured Streaming和Delta Lake非常適合用于數(shù)據(jù)獲取和存儲(chǔ)層,因?yàn)樗麄兡軌蚺浜蟿?chuàng)造一個(gè)具有擴(kuò)展性、容錯(cuò)性和類實(shí)時(shí)的系統(tǒng),并且具有exactly-once處理保證。

為企業(yè)數(shù)據(jù)質(zhì)量分析找到可接受的工具要困難一些,特別是這個(gè)工具需要具有對(duì)數(shù)據(jù)質(zhì)量指標(biāo)的狀態(tài)匯總的能力。另外,還需要能夠?qū)φ麄€(gè)數(shù)據(jù)集進(jìn)行檢查(例如檢測(cè)出多少比例的記錄為空值),這些都會(huì)隨著所提取的數(shù)據(jù)量的增加而增加計(jì)算成本。這對(duì)所有流式系統(tǒng)而言都是需要的,這一要求就排除了很多可用的工具。

在我們最初的解決方案中,我們選擇了Amazon的數(shù)據(jù)質(zhì)量檢測(cè)工具Deequ,因?yàn)樗芴峁┖?jiǎn)單而強(qiáng)大的API,有對(duì)數(shù)據(jù)質(zhì)量指標(biāo)進(jìn)行狀態(tài)聚合的能力,以及對(duì)Scala的支持。將來,其他Spark原生的工具將提供額外的選擇。

Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控

流式數(shù)據(jù)質(zhì)量監(jiān)控的實(shí)現(xiàn)

我們通過在EC2實(shí)例上運(yùn)行一個(gè)小型的Kafka producer來模擬數(shù)據(jù)流,該實(shí)例將模擬的股票交易信息寫入Kafka topic,并使用原生的Databricks連接器將這些數(shù)據(jù)導(dǎo)入到Delta Lake表當(dāng)中。為了展示Spark Streaming中數(shù)據(jù)質(zhì)量檢查的功能,我們選擇在整個(gè)流程中實(shí)現(xiàn)Deequ的不同功能:

  • 根據(jù)歷史數(shù)據(jù)生成約束條件;

  • 使用foreachBatch算子對(duì)到達(dá)的數(shù)據(jù)進(jìn)行增量質(zhì)量分析;

  • 使用foreachBatch算子對(duì)到達(dá)的數(shù)據(jù)執(zhí)行(較小的)單元測(cè)試,并將質(zhì)量不佳的batch隔離到質(zhì)量不佳記錄表中;

  • 對(duì)于每個(gè)到達(dá)的batch,將最新的狀態(tài)指標(biāo)寫入到Delta表當(dāng)中;

  • 對(duì)整個(gè)數(shù)據(jù)集定期執(zhí)行(較大的)單元測(cè)試,并在MLFlow中跟蹤結(jié)果;

  • 根據(jù)驗(yàn)證結(jié)果發(fā)送通知(如通過電子郵件或Slack);

  • 捕獲MLFlow中的指標(biāo)以進(jìn)行可視化和記錄。

我們結(jié)合了MLFlow來跟蹤一段時(shí)間內(nèi)數(shù)據(jù)性能指標(biāo)的質(zhì)量、Delta表的版本迭代以及結(jié)合了一個(gè)用于通知和告警的Slack連接器。整個(gè)流程可以用如下的圖片進(jìn)行表示:

Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控

由于Spark中具有統(tǒng)一的批處理/流式處理接口,因此我們能夠在這個(gè)流程的任何位置提取報(bào)告、告警和指標(biāo),作為實(shí)時(shí)更新或批處理快照。這對(duì)于設(shè)置觸發(fā)器或限制特別有用,因此,如果某個(gè)指標(biāo)超過了閾值,則可以執(zhí)行數(shù)據(jù)質(zhì)量改善措施。還要注意的是,我們并沒有對(duì)初始到達(dá)的原始數(shù)據(jù)造成影響,這些數(shù)據(jù)將立即提交到我們的Delta表,這意味著我們不會(huì)限制數(shù)據(jù)輸入的速率。下游系統(tǒng)可以直接從該表中讀取數(shù)據(jù),如果超過了上述任何觸發(fā)條件或質(zhì)量閾值,則可能會(huì)中斷。此外,我們可以輕松地創(chuàng)建一個(gè)排除質(zhì)量不佳記錄的view以提供一個(gè)干凈的表。

在一個(gè)較高的層次,執(zhí)行我們的數(shù)據(jù)質(zhì)量跟蹤和驗(yàn)證的代碼如下所示:

spark.readStream.table("trades_delta").writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
   // reassign our current state to the previous next state    val stateStoreCurr = stateStoreNext
   // run analysis on the current batch, aggregate with saved state    val metricsResult = AnalysisRunner.run(data=batchDF, ...)        // verify the validity of our current microbatch    val verificationResult = VerificationSuite()        .onData(batchDF)        .addCheck(...).run()
   // if verification fails, write batch to bad records table    if (verificationResult.status != CheckStatus.Success) {...}
   // write the current results into the metrics table    Metric_results.write    .format("delta")    .mode("overwrite")    .saveAsTable("deequ_metrics")}.start()

使用數(shù)據(jù)質(zhì)量工具Deequ

在Databricks中使用Deequ是相對(duì)比較容易的事情,你需要首先定義一個(gè)analyzer,然后在dataframe上運(yùn)行該analyzer。例如,我們可以跟蹤Deequ本地提供的幾個(gè)相關(guān)指標(biāo)檢查,包括檢查數(shù)量和價(jià)格是否為非負(fù)數(shù)、原始IP地址是否不為空以及符號(hào)字段在所有事務(wù)中的唯一性。Deequ的StateProvider對(duì)象在流式數(shù)據(jù)配置中特別有用,它能允許用戶將我們指標(biāo)的狀態(tài)保存在內(nèi)存或磁盤中,并在以后匯總這些指標(biāo)。這意味著每個(gè)處理的批次僅分析該批次中的數(shù)據(jù)記錄,而不會(huì)分析整個(gè)表。即使隨著數(shù)據(jù)大小的增長(zhǎng),這也可以使性能保持相對(duì)穩(wěn)定,這在長(zhǎng)時(shí)間運(yùn)行的生產(chǎn)環(huán)境中很重要,因?yàn)樯a(chǎn)環(huán)境需要在任意數(shù)量的數(shù)據(jù)上保持一致。

MLFlow還可以很好地跟蹤指標(biāo)隨時(shí)間的演變,在我們的notebook中,我們跟蹤在foreachBatch代碼中分析的所有Deequ約束作為指標(biāo),并使用Delta的versionID和時(shí)間戳作為參數(shù)。在Databricks的notebook中,集成的MLFlow服務(wù)對(duì)于指標(biāo)跟蹤特別方便。

通過使用Structured Streaming、Delta Lake和Deequ,我們能夠消除傳統(tǒng)情況下數(shù)據(jù)質(zhì)量和速度之間的權(quán)衡,而專注于實(shí)現(xiàn)兩者的可接受水平。這里特別重要的是靈活性——不僅在如何處理不良記錄(隔離、報(bào)錯(cuò)、告警等),而且在體系結(jié)構(gòu)上(例如何時(shí)以及在何處執(zhí)行檢查?)和生態(tài)上(如何使用我們的數(shù)據(jù)?)。開源技術(shù)(如Delta Lake、Structured Streaming和Deequ)是這種靈活性的關(guān)鍵。隨著技術(shù)的發(fā)展,能夠使用最新最、最強(qiáng)大的解決方案是提升其競(jìng)爭(zhēng)優(yōu)勢(shì)的驅(qū)動(dòng)力。最重要的是,你的數(shù)據(jù)的速度和質(zhì)量一定不能對(duì)立,而要保持一致,尤其是在流式數(shù)據(jù)處理越來越靠近核心業(yè)務(wù)運(yùn)營(yíng)時(shí)。很快,這將不會(huì)是一種選擇,而是一種期望和要求,我們正朝著這個(gè)未來方向一次一小步地不斷前進(jìn)。

以上就是Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI