您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
小編主要對(duì)Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控的方法和架構(gòu)進(jìn)行了介紹,下面探討了一種數(shù)據(jù)管理架構(gòu),該架構(gòu)可以在數(shù)據(jù)到達(dá)時(shí),通過主動(dòng)監(jiān)控和分析來檢測(cè)流式數(shù)據(jù)中損壞或不良的數(shù)據(jù),并且不會(huì)造成瓶頸。
構(gòu)建流式數(shù)據(jù)分析和監(jiān)控流程
在Databricks,我們看到客戶中不斷涌現(xiàn)出許多數(shù)據(jù)處理模式,這些新模式的產(chǎn)生推動(dòng)了可能的極限,在速度和質(zhì)量問題上也不例外。為了幫助解決這一矛盾,我們開始考慮使用正確的工具,不僅可以支持所需的數(shù)據(jù)速度,還可以提供可接受的數(shù)據(jù)質(zhì)量水平。Structured Streaming和Delta Lake非常適合用于數(shù)據(jù)獲取和存儲(chǔ)層,因?yàn)樗麄兡軌蚺浜蟿?chuàng)造一個(gè)具有擴(kuò)展性、容錯(cuò)性和類實(shí)時(shí)的系統(tǒng),并且具有exactly-once處理保證。
為企業(yè)數(shù)據(jù)質(zhì)量分析找到可接受的工具要困難一些,特別是這個(gè)工具需要具有對(duì)數(shù)據(jù)質(zhì)量指標(biāo)的狀態(tài)匯總的能力。另外,還需要能夠?qū)φ麄€(gè)數(shù)據(jù)集進(jìn)行檢查(例如檢測(cè)出多少比例的記錄為空值),這些都會(huì)隨著所提取的數(shù)據(jù)量的增加而增加計(jì)算成本。這對(duì)所有流式系統(tǒng)而言都是需要的,這一要求就排除了很多可用的工具。
在我們最初的解決方案中,我們選擇了Amazon的數(shù)據(jù)質(zhì)量檢測(cè)工具Deequ,因?yàn)樗芴峁┖?jiǎn)單而強(qiáng)大的API,有對(duì)數(shù)據(jù)質(zhì)量指標(biāo)進(jìn)行狀態(tài)聚合的能力,以及對(duì)Scala的支持。將來,其他Spark原生的工具將提供額外的選擇。
我們通過在EC2實(shí)例上運(yùn)行一個(gè)小型的Kafka producer來模擬數(shù)據(jù)流,該實(shí)例將模擬的股票交易信息寫入Kafka topic,并使用原生的Databricks連接器將這些數(shù)據(jù)導(dǎo)入到Delta Lake表當(dāng)中。為了展示Spark Streaming中數(shù)據(jù)質(zhì)量檢查的功能,我們選擇在整個(gè)流程中實(shí)現(xiàn)Deequ的不同功能:
根據(jù)歷史數(shù)據(jù)生成約束條件;
使用foreachBatch算子對(duì)到達(dá)的數(shù)據(jù)進(jìn)行增量質(zhì)量分析;
使用foreachBatch算子對(duì)到達(dá)的數(shù)據(jù)執(zhí)行(較小的)單元測(cè)試,并將質(zhì)量不佳的batch隔離到質(zhì)量不佳記錄表中;
對(duì)于每個(gè)到達(dá)的batch,將最新的狀態(tài)指標(biāo)寫入到Delta表當(dāng)中;
對(duì)整個(gè)數(shù)據(jù)集定期執(zhí)行(較大的)單元測(cè)試,并在MLFlow中跟蹤結(jié)果;
根據(jù)驗(yàn)證結(jié)果發(fā)送通知(如通過電子郵件或Slack);
捕獲MLFlow中的指標(biāo)以進(jìn)行可視化和記錄。
我們結(jié)合了MLFlow來跟蹤一段時(shí)間內(nèi)數(shù)據(jù)性能指標(biāo)的質(zhì)量、Delta表的版本迭代以及結(jié)合了一個(gè)用于通知和告警的Slack連接器。整個(gè)流程可以用如下的圖片進(jìn)行表示:
由于Spark中具有統(tǒng)一的批處理/流式處理接口,因此我們能夠在這個(gè)流程的任何位置提取報(bào)告、告警和指標(biāo),作為實(shí)時(shí)更新或批處理快照。這對(duì)于設(shè)置觸發(fā)器或限制特別有用,因此,如果某個(gè)指標(biāo)超過了閾值,則可以執(zhí)行數(shù)據(jù)質(zhì)量改善措施。還要注意的是,我們并沒有對(duì)初始到達(dá)的原始數(shù)據(jù)造成影響,這些數(shù)據(jù)將立即提交到我們的Delta表,這意味著我們不會(huì)限制數(shù)據(jù)輸入的速率。下游系統(tǒng)可以直接從該表中讀取數(shù)據(jù),如果超過了上述任何觸發(fā)條件或質(zhì)量閾值,則可能會(huì)中斷。此外,我們可以輕松地創(chuàng)建一個(gè)排除質(zhì)量不佳記錄的view以提供一個(gè)干凈的表。
在一個(gè)較高的層次,執(zhí)行我們的數(shù)據(jù)質(zhì)量跟蹤和驗(yàn)證的代碼如下所示:
spark.readStream
.table("trades_delta")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// reassign our current state to the previous next state
val stateStoreCurr = stateStoreNext
// run analysis on the current batch, aggregate with saved state
val metricsResult = AnalysisRunner.run(data=batchDF, ...)
// verify the validity of our current microbatch
val verificationResult = VerificationSuite()
.onData(batchDF)
.addCheck(...).run()
// if verification fails, write batch to bad records table
if (verificationResult.status != CheckStatus.Success) {...}
// write the current results into the metrics table
Metric_results.write
.format("delta")
.mode("overwrite")
.saveAsTable("deequ_metrics")
}
.start()
使用數(shù)據(jù)質(zhì)量工具Deequ
在Databricks中使用Deequ是相對(duì)比較容易的事情,你需要首先定義一個(gè)analyzer,然后在dataframe上運(yùn)行該analyzer。例如,我們可以跟蹤Deequ本地提供的幾個(gè)相關(guān)指標(biāo)檢查,包括檢查數(shù)量和價(jià)格是否為非負(fù)數(shù)、原始IP地址是否不為空以及符號(hào)字段在所有事務(wù)中的唯一性。Deequ的StateProvider對(duì)象在流式數(shù)據(jù)配置中特別有用,它能允許用戶將我們指標(biāo)的狀態(tài)保存在內(nèi)存或磁盤中,并在以后匯總這些指標(biāo)。這意味著每個(gè)處理的批次僅分析該批次中的數(shù)據(jù)記錄,而不會(huì)分析整個(gè)表。即使隨著數(shù)據(jù)大小的增長(zhǎng),這也可以使性能保持相對(duì)穩(wěn)定,這在長(zhǎng)時(shí)間運(yùn)行的生產(chǎn)環(huán)境中很重要,因?yàn)樯a(chǎn)環(huán)境需要在任意數(shù)量的數(shù)據(jù)上保持一致。
MLFlow還可以很好地跟蹤指標(biāo)隨時(shí)間的演變,在我們的notebook中,我們跟蹤在foreachBatch代碼中分析的所有Deequ約束作為指標(biāo),并使用Delta的versionID和時(shí)間戳作為參數(shù)。在Databricks的notebook中,集成的MLFlow服務(wù)對(duì)于指標(biāo)跟蹤特別方便。
通過使用Structured Streaming、Delta Lake和Deequ,我們能夠消除傳統(tǒng)情況下數(shù)據(jù)質(zhì)量和速度之間的權(quán)衡,而專注于實(shí)現(xiàn)兩者的可接受水平。這里特別重要的是靈活性——不僅在如何處理不良記錄(隔離、報(bào)錯(cuò)、告警等),而且在體系結(jié)構(gòu)上(例如何時(shí)以及在何處執(zhí)行檢查?)和生態(tài)上(如何使用我們的數(shù)據(jù)?)。開源技術(shù)(如Delta Lake、Structured Streaming和Deequ)是這種靈活性的關(guān)鍵。隨著技術(shù)的發(fā)展,能夠使用最新最、最強(qiáng)大的解決方案是提升其競(jìng)爭(zhēng)優(yōu)勢(shì)的驅(qū)動(dòng)力。最重要的是,你的數(shù)據(jù)的速度和質(zhì)量一定不能對(duì)立,而要保持一致,尤其是在流式數(shù)據(jù)處理越來越靠近核心業(yè)務(wù)運(yùn)營(yíng)時(shí)。很快,這將不會(huì)是一種選擇,而是一種期望和要求,我們正朝著這個(gè)未來方向一次一小步地不斷前進(jìn)。
以上就是Databricks如何使用Spark Streaming和Delta Lake對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。