溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

DataPipeline在大數(shù)據(jù)平臺(tái)的數(shù)據(jù)流實(shí)踐

發(fā)布時(shí)間:2020-07-12 10:57:37 來(lái)源:網(wǎng)絡(luò) 閱讀:1535 作者:DataPipeline 欄目:大數(shù)據(jù)

文 | 呂鵬 DataPipeline架構(gòu)師

DataPipeline在大數(shù)據(jù)平臺(tái)的數(shù)據(jù)流實(shí)踐

進(jìn)入大數(shù)據(jù)時(shí)代,實(shí)時(shí)作業(yè)有著越來(lái)越重要的地位。本文將從以下幾個(gè)部分進(jìn)行講解DataPipeline在大數(shù)據(jù)平臺(tái)的實(shí)時(shí)數(shù)據(jù)流實(shí)踐。


一、企業(yè)級(jí)數(shù)據(jù)面臨的主要問(wèn)題和挑戰(zhàn)


1.數(shù)據(jù)量不斷攀升

隨著互聯(lián)網(wǎng)+的蓬勃發(fā)展和用戶規(guī)模的急劇擴(kuò)張,企業(yè)數(shù)據(jù)量也在飛速增長(zhǎng),數(shù)據(jù)的量以GB為單位,逐漸的開始以TB/GB/PB/EB,甚至ZB/YB等。同時(shí)大數(shù)據(jù)也在不斷深入到金融、零售、制造等行業(yè),發(fā)揮著越來(lái)越大的作用。

2. 數(shù)據(jù)質(zhì)量的要求不斷地提升

當(dāng)前比較流行的AI、數(shù)據(jù)建模,對(duì)數(shù)據(jù)質(zhì)量要求高。尤其在金融領(lǐng)域,對(duì)于數(shù)據(jù)質(zhì)量的要求是非常高的。

3. 數(shù)據(jù)平臺(tái)架構(gòu)的復(fù)雜化

企業(yè)級(jí)應(yīng)用架構(gòu)的變化隨著企業(yè)規(guī)模而變。規(guī)模小的企業(yè),用戶少、數(shù)據(jù)量也小,可能只需一個(gè)MySQL就搞能搞;中型企業(yè),隨著業(yè)務(wù)量的上升,這時(shí)候可能需要讓主庫(kù)做OLTP,備庫(kù)做OLAP;當(dāng)企業(yè)進(jìn)入規(guī)模化,數(shù)據(jù)量非常大,原有的OLTP可能已經(jīng)不能滿足了,這時(shí)候我們會(huì)做一些策略,來(lái)保證OLTP和OLAP隔離,業(yè)務(wù)系統(tǒng)和BI系統(tǒng)分開互不影響,但做了隔離后同時(shí)帶來(lái)了一個(gè)新的困難,數(shù)據(jù)流的實(shí)時(shí)同步的需求,這時(shí)企業(yè)就需要一個(gè)可擴(kuò)展、可靠的流式傳輸工具。

DataPipeline在大數(shù)據(jù)平臺(tái)的數(shù)據(jù)流實(shí)踐


二、大數(shù)據(jù)平臺(tái)上的實(shí)踐案例


下圖是一個(gè)典型的BI平臺(tái)設(shè)計(jì)場(chǎng)景,以MySQL為例,DataPipeline是如何實(shí)現(xiàn)MySQL的SourceConnector。MySQL作為Source端時(shí):

  • 全量+ 增量;

  • 全量:通過(guò)select 方式,將數(shù)據(jù)加載到kafka中;

  • 增量:實(shí)時(shí)讀取 binlog的方式;

使用binlog時(shí)需要注意開啟row 模式并且image設(shè)置為 full。

1. MySQL SourceConnector 全量+增量實(shí)時(shí)同步的實(shí)現(xiàn)

下面是具體的實(shí)現(xiàn)流程圖,首先開啟repeatable read事務(wù),保證在執(zhí)行讀鎖之前的數(shù)據(jù)可以確實(shí)的讀到。然后進(jìn)行flush table with read lock 操作,添加一個(gè)讀鎖,防止這個(gè)時(shí)候有新的數(shù)據(jù)進(jìn)入影響數(shù)據(jù)的讀取,這時(shí)開始一個(gè)truncation with snapshot,我們可以記錄當(dāng)前binlog的offset 并標(biāo)記一個(gè)snapshot start,這時(shí)的offset 為增量讀取時(shí)開始的offset。當(dāng)事務(wù)開始后可以進(jìn)行全量數(shù)據(jù)的讀取。record marker這時(shí)會(huì)將生成record 寫到 kafka 中,然后commit 這個(gè)事務(wù)。當(dāng)全量數(shù)據(jù)push完畢后我們解除讀鎖并且標(biāo)記snapshot stop,此時(shí)全量數(shù)據(jù)已經(jīng)都進(jìn)入kafka了,之后從之前記錄的offset開始增量數(shù)據(jù)的同步。

DataPipeline在大數(shù)據(jù)平臺(tái)的數(shù)據(jù)流實(shí)踐

2. DataPipeline做了哪些優(yōu)化工作

1)以往在數(shù)據(jù)同步環(huán)節(jié)都分為全量同步和增量同步,全量同步為一個(gè)批處理。在批處理時(shí)我們都是進(jìn)行all or nothing的處理,但當(dāng)大數(shù)據(jù)情況下一個(gè)批量會(huì)占用相當(dāng)長(zhǎng)的時(shí)間,時(shí)間越長(zhǎng)可靠性就越難保障,所以往往會(huì)出現(xiàn)斷掉的情況,這時(shí)一個(gè)重新處理會(huì)讓很多人崩潰。DataPipeline 解決了這一痛點(diǎn),通過(guò)管理數(shù)據(jù)傳輸時(shí)的position 來(lái)做到斷點(diǎn)續(xù)傳,這時(shí)當(dāng)一個(gè)大規(guī)模的數(shù)據(jù)任務(wù)即使發(fā)生了意外,也可以重?cái)嗟舻狞c(diǎn)來(lái)繼續(xù)之前的任務(wù),大大縮短了同步的時(shí)間,提高了同步的效率。

2)在同步多個(gè)任務(wù)的時(shí)候,很難平衡數(shù)據(jù)傳輸對(duì)源端的壓力和目的端的實(shí)時(shí)性,在大數(shù)據(jù)量下的傳輸尤其能夠體現(xiàn),這時(shí)DataPipeline 在此做了大量相關(guān)測(cè)試來(lái)優(yōu)化不同的連接池,開放數(shù)據(jù)傳輸效率的自定義化,供客戶針對(duì)自己的業(yè)務(wù)系統(tǒng)定制合適的傳輸任務(wù),對(duì)于不同種類的數(shù)據(jù)庫(kù)的傳輸進(jìn)行優(yōu)化和調(diào)整,保證數(shù)據(jù)傳輸?shù)母咝浴?/p>

3)自定義異構(gòu)數(shù)據(jù)類型的轉(zhuǎn)化,往往開源類大數(shù)據(jù)傳輸工具如 sqoop 等,對(duì)異構(gòu)數(shù)據(jù)類型的支持不夠靈活,種類也不夠齊全。像金融領(lǐng)域中對(duì)數(shù)據(jù)精度要求較高的場(chǎng)景,在傳統(tǒng)數(shù)據(jù)庫(kù)向大數(shù)據(jù)平臺(tái)傳輸時(shí)造成的精度丟失是很大的一個(gè)問(wèn)題。DataPipeline 對(duì)此做了更多數(shù)據(jù)類型的支持,比如hive 支持的復(fù)雜類型以及 decimal 和 timestamp 等。

3. Sink端之Hive

1)Hive的特性

  • Hive 內(nèi)部表和外部表;

  • 依賴HDFS;

  • 支持事務(wù)和非事務(wù);

  • 多種壓縮格式;

  • 分區(qū)分桶。

2)Hive同步的問(wèn)題

  • 如何保證實(shí)時(shí)的寫入?

  • schema change了怎么辦?

  • 怎么擴(kuò)展我想保存的格式?

  • 怎么實(shí)現(xiàn)多種分區(qū)方式?

  • 同步中斷了怎么辦?

  • 如何保證我的數(shù)據(jù)不丟?

3)KafkaConnect HDFS 的 Hive 同步實(shí)踐

  • 使用外表:Hive外部表,能夠提高寫入效率,直接寫HDFS,減少IO消耗,而內(nèi)表會(huì)比外表多一次IO;

  • Schema change:目前的做法是目的端根據(jù)源端的變化而變化,當(dāng)有增加列刪除列的情況,目標(biāo)端會(huì)跟隨源端改動(dòng);

  • 目前支持的存儲(chǔ)格式:parquet,avro ,csv

  • 插件化的partitioner,提供多種分區(qū)方式,如 Wallclock RecordRecordField:wallclock是使用寫入到hive端時(shí)的系統(tǒng)時(shí)間,Record 使用是讀取時(shí)生成record的時(shí)間,RecordField是使用用戶自定義的時(shí)間戳來(lái)定義分區(qū),未來(lái)會(huì)實(shí)現(xiàn)可自定義化的partitioner 來(lái)滿足不同的需求;

  • Recover 機(jī)制保障中斷后不會(huì)丟失數(shù)據(jù);

  • 使用WAL (Write-AheadLogging)機(jī)制,保證數(shù)據(jù)目的端數(shù)據(jù)一致性。

4)Recover的機(jī)制

recover 是一種恢復(fù)的機(jī)制,在數(shù)據(jù)傳輸?shù)碾A段往往可能出現(xiàn)各種不同的問(wèn)題,如網(wǎng)絡(luò)問(wèn)題等等。當(dāng)出現(xiàn)問(wèn)題后我們需要恢復(fù)數(shù)據(jù)同步,那么recover是怎么保證數(shù)據(jù)正常傳輸不丟失呢?當(dāng)recover開始的時(shí)候,獲取目標(biāo)文件在hdfs 上的租約,如果這時(shí)候需要讀寫的HDFS當(dāng)前文件是被占用的,那我們需要等待它直到可以獲取到租約。當(dāng)我們獲取到租約后就可以開始讀之前寫入時(shí)候的log,如果第一次會(huì)創(chuàng)建一個(gè)新的log,并標(biāo)記一個(gè)begin,然后記錄了當(dāng)時(shí)的kafka offset。這時(shí)候需要清理之前遺留下來(lái)的臨時(shí)數(shù)據(jù),清理掉之后再重新開始同步直到同步結(jié)束會(huì)標(biāo)記一個(gè)end。如果沒(méi)有結(jié)束的話就相當(dāng)于正在進(jìn)行中,正在進(jìn)行中每次都會(huì)提交當(dāng)前同步的offset,來(lái)保證出現(xiàn)意外后會(huì)回滾到之前offset。

DataPipeline在大數(shù)據(jù)平臺(tái)的數(shù)據(jù)流實(shí)踐

5)WAL (Write-Ahead Logging)機(jī)制

Write-Ahead Logging機(jī)制其實(shí)就是核心思想在數(shù)據(jù)寫入到數(shù)據(jù)庫(kù)之前,它先寫臨時(shí)文件,當(dāng)一個(gè)批次結(jié)束后,在將這個(gè)臨時(shí)文件改名為正式文件,確保每次提交后的正式文件一致性,如果中途出現(xiàn)寫入錯(cuò)誤將臨時(shí)文件刪除重新寫入,相當(dāng)于一個(gè)回滾。hive 的同步主要利用這種實(shí)現(xiàn)方式來(lái)保證一致性。首先它同步數(shù)據(jù)寫入到HDFS臨時(shí)文件上,確保一個(gè)批次的數(shù)據(jù)正常后再重命名到正式文件當(dāng)中。正式的文件名會(huì)包含kafka offset,例如一個(gè)avro 文件的文件名為 xxxx+001+0020.avro ,這表示當(dāng)前文件中有offset 1 到 20 的20條數(shù)據(jù)。

4. Sink端之GreenPlum

GreenPlum,是一個(gè)MPP架構(gòu)的數(shù)據(jù)倉(cāng)庫(kù),底層由多個(gè)postgres數(shù)據(jù)庫(kù)作為計(jì)算節(jié)點(diǎn),擅長(zhǎng)OLAP,作為BI數(shù)據(jù)倉(cāng)庫(kù)有著良好的性能。

1)DataPipeline對(duì)GreenPlum 同步實(shí)踐以及優(yōu)化策略

  • greenplum 支持多種數(shù)據(jù)加載方式,目前我們使用copy的加載方式。

  • 批量處理提高sink端寫入效率,不進(jìn)行insert 和 update 的操作,一律使用 delete + copy 的方式批量加載;

  • 多線程加預(yù)加載機(jī)制:

? 每個(gè)需要同步的表單獨(dú)記錄一個(gè)offset,當(dāng)整個(gè)任務(wù)失敗時(shí)可以分開進(jìn)行恢復(fù);

? 使用一個(gè)線程池管理加載數(shù)據(jù)的線程,每個(gè)同步的表單獨(dú)一個(gè)線程來(lái)進(jìn)行加載數(shù)據(jù),多表同時(shí)同步;

? 在加載數(shù)據(jù)的時(shí)間里,提前對(duì)kafka進(jìn)行消費(fèi),緩存處理好的一個(gè)數(shù)據(jù)集,當(dāng)一個(gè)線程加載數(shù)據(jù)結(jié)束后馬上開始新的線程加載數(shù)據(jù),減少處理加載數(shù)據(jù)的時(shí)間;

  • delete + copy的方式可以保證數(shù)據(jù)最終一致性;

  • source 端有主鍵的表可以通過(guò)主鍵來(lái)合并一個(gè)批次需要同步的數(shù)據(jù),如一個(gè)需要同步的批量數(shù)據(jù)中包含一條 insert 的數(shù)據(jù),后面跟著 update 該條數(shù)據(jù),那就無(wú)需同步兩遍,將該數(shù)據(jù)更新到 update 之后的狀態(tài) copy 到 gp 當(dāng)中即可。

同步GreenPlum需要注意:因?yàn)槭峭ㄟ^(guò)copy 寫入文件的,需要文件是結(jié)構(gòu)化數(shù)據(jù),典型的是使用CSV,CSV 寫入時(shí)需注意spiltquote,escapequote,避免出現(xiàn)數(shù)據(jù)錯(cuò)位的現(xiàn)象。update主鍵的問(wèn)題 , 當(dāng)源端是update一個(gè)主鍵時(shí),同時(shí)需要記錄update前的主鍵,并在目標(biāo)端進(jìn)行刪除。還有 \0 特殊字符的問(wèn)題,因?yàn)楹诵氖怯肅語(yǔ)言,所以在同步的時(shí)候\0需要特殊處理掉。


三、DataPipeline未來(lái)的工作


1. 目前我們碰到kafka connect rebalance的一些問(wèn)題,所以我們對(duì)其進(jìn)行了改造。以往的rebalance機(jī)制是假如我們?cè)黾踊蛘邉h除一個(gè)task,會(huì)導(dǎo)致整個(gè)集群rebalance,這樣造成很多無(wú)謂的開銷而且頻繁的rebalance 不利于數(shù)據(jù)同步的任務(wù)的穩(wěn)定。于是我們將rebalance機(jī)制改造成一個(gè)黏性的機(jī)制:

  • 當(dāng)我們?cè)黾右粋€(gè)新的任務(wù)的時(shí)候,我們會(huì)檢查所有的worker使用率比較低的,當(dāng)worker的task比較少,我們只把它加進(jìn)比較少的worker就可以了,也不需要做全量的平衡,當(dāng)然這時(shí)候可能還是有一些不平衡的資源浪費(fèi),這是我們可以容忍的,至少比我們做一次全量的rebalance開銷要??;

  • 假如刪除一個(gè)task,以往的機(jī)制是刪除一個(gè)task的時(shí)候也會(huì)做全量的Rebalance,新的機(jī)制不會(huì)觸發(fā)rebalance。這時(shí)候如果時(shí)間長(zhǎng)也會(huì)造成一個(gè)資源不平衡,這是我們可以自動(dòng)化rebalance一下所有的集群;

  • 假如說(shuō)集群的某個(gè)節(jié)點(diǎn)宕掉了,該節(jié)點(diǎn)的task怎么辦呢?我們不會(huì)馬上就把這個(gè)節(jié)點(diǎn)上的 task分配出去,會(huì)先等待10分鐘,因?yàn)橛械臅r(shí)候它可能只是短暫的連接超時(shí),過(guò)一段時(shí)間后就會(huì)恢復(fù),如果根據(jù)這個(gè)來(lái)做一次rebalance,可能這是不太值的。當(dāng)?shù)却?0分鐘后節(jié)點(diǎn)還是沒(méi)有恢復(fù),我們?cè)僮鰎ebalance,將宕掉的節(jié)點(diǎn)任務(wù)分配到其他節(jié)點(diǎn)上;

2. 源端的數(shù)據(jù)一致性,目前通過(guò)WAL的機(jī)制可以保證目的端的一致性;

3. 大數(shù)據(jù)量下的同步優(yōu)化以及提高同步的穩(wěn)定性。


四、總結(jié)


1. 大數(shù)據(jù)時(shí)代企業(yè)數(shù)據(jù)集成主要面臨各種復(fù)雜的架構(gòu),應(yīng)對(duì)這些復(fù)雜的系統(tǒng)對(duì)ETL的要求也越來(lái)越高。我們能做的就是需要權(quán)衡利弊選取一個(gè)符合業(yè)務(wù)需求的框架;

2. Kafka Connect 比較適合對(duì)數(shù)據(jù)量大,且有實(shí)時(shí)性需求的業(yè)務(wù);

3. 基于Kafka Connect 優(yōu)良特性可以依據(jù)不同的數(shù)據(jù)倉(cāng)庫(kù)特性來(lái)提高數(shù)據(jù)時(shí)效性和同步效率;

4. DataPipeline針對(duì)目前企業(yè)在大規(guī)模實(shí)時(shí)數(shù)據(jù)流的痛點(diǎn),進(jìn)行了相關(guān)的改造和優(yōu)化,首先數(shù)據(jù)端到端一致性的保證是幾乎所有企業(yè)在數(shù)據(jù)同步過(guò)程中碰到的,目前已經(jīng)做到基于kafka connect 的框架中 rebalance 中的優(yōu)化和改造。

—end—

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI