溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

發(fā)布時(shí)間:2020-07-26 06:11:45 來源:網(wǎng)絡(luò) 閱讀:439 作者:DataPipeline 欄目:大數(shù)據(jù)

一、為什么選擇Kafka

為什么選Kafka?鑒于龐大的數(shù)據(jù)量,需要將其做成分布式,這時(shí)需要將Q里面的數(shù)據(jù)分到許多機(jī)器上進(jìn)行存儲,除此之外還有分布式的計(jì)算需求。同時(shí)需要支持多語言,如Java、GO、php等,另外還有高可用的需求。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

二、Kafka集群

Realtime的Kafka集群通過Mirror Maker將數(shù)據(jù)全部同步到Analysis的Kafka集群。

Realtime的Kafka集群主要負(fù)責(zé)在線實(shí)時(shí)讀寫,Analysis負(fù)責(zé)很多工作,諸如數(shù)據(jù)的導(dǎo)入導(dǎo)出,數(shù)據(jù)的多次流出給集群和網(wǎng)絡(luò)硬盤帶來了較大壓力。為保證線上的穩(wěn)定性,要保證兩邊是隔開的。另外關(guān)于Topic目前有五萬多,每秒可能會有100多萬的數(shù)據(jù)流入流出。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

三、Kafka的用戶使用問題

1.參數(shù)配置問題, Kafka有很多參數(shù)需要配置,常用的集群配置,延遲,重要性等,需要封裝。

  1. 開發(fā)測試不方便,使用者通常會有這樣的需求:我的數(shù)據(jù)寫進(jìn)去沒,消費(fèi)沒,他寫的數(shù)據(jù)長什么樣子,結(jié)構(gòu)化的數(shù)據(jù)還需自己寫代碼來解析,等等。這些問題沒有工具和平臺來解決,會大大降低開發(fā)效率。

  2. Topic申請不方便,topic是不能開放自己創(chuàng)建的,我們曾在測試環(huán)境開放過Topic,發(fā)現(xiàn)一周內(nèi)漲到了好幾萬,而且參數(shù)千奇百怪,有全用默認(rèn)參數(shù)的,有根據(jù)文檔,時(shí)間先來10個(gè)9的,也有partition直接來100的。工單方式對管理者很不友好,需要登上服務(wù)器敲命令,效率低下,且容易出錯(cuò)。

  3. 結(jié)構(gòu)化數(shù)據(jù)查詢不方便,瓜子的結(jié)構(gòu)化使用的是AVRO, 序列化之后的數(shù)據(jù)很難查看原始數(shù)據(jù)。

  4. 消費(fèi)異常定位不便,比如消費(fèi)的數(shù)據(jù)或者位置不對,如果想要回滾重新消費(fèi)或跳過臟數(shù)據(jù)就面臨各種問題。從哪個(gè)offset開始重新消費(fèi)呢?或者跳到之后的哪個(gè)offset呢?另外就是滾動重啟了一個(gè)服務(wù),結(jié)果發(fā)現(xiàn)消費(fèi)的數(shù)據(jù)少了一批,很有可能是某一個(gè)隱藏的consumer同時(shí)在用這個(gè)consumer group,但是找了一圈沒找到哪個(gè)服務(wù)還沒關(guān)掉。

  5. 不知道下游,如果寫了生產(chǎn)者生產(chǎn)的Topic數(shù)據(jù),卻不知道有哪些consumer,如果要對Topic信息發(fā)生改變時(shí),不知該通知誰,這是很復(fù)雜的事情。要么先上,下游出問題了自己叫,要么躊躇不前,先收集下游,當(dāng)然實(shí)際情況一般是前者,經(jīng)常雞飛狗跳。

  6. 運(yùn)維復(fù)雜,日常運(yùn)維包括topic partition增加,幫助定位臟數(shù)據(jù)(因?yàn)樗麄儾恢烙信K數(shù)據(jù)),幫助排除配置問題等等。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

四、解決方案:Kafka platform

為解決上述問題,瓜子上線了Kafka platform,主要面向用戶和管理兩方面的功能。

面向用戶包括:查看數(shù)據(jù),了解消費(fèi)情況,方便地添加監(jiān)控報(bào)警,以及如果出現(xiàn)問題后,快速查錯(cuò)的工具。

管理方面包括: 權(quán)限管理, 申請審批,還有一些常用操作。比如,seek offset, 或是刪掉一個(gè)Topic,對partitions進(jìn)行擴(kuò)容等。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

  1. 數(shù)據(jù)查詢

可以通過offset查詢對應(yīng)offset的數(shù)據(jù),也可以通過進(jìn)入Kafka的大致時(shí)間,查詢那段內(nèi)的數(shù)據(jù),可以看到每條信息的partition,offset,入Kafka的時(shí)間,AVRO的版本信息等。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

  1. 消費(fèi)查詢

通過下圖顯示的界面可以查看一條消息,了解哪些consumer group已經(jīng)消費(fèi)了,哪些沒有消費(fèi)。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

同時(shí)可以查看它現(xiàn)在正在被哪個(gè)IP進(jìn)行消費(fèi),這時(shí)我們可以方便地定位到有個(gè)consumer沒有關(guān)閉,它是在哪臺機(jī)器上,這些來自于我們的實(shí)踐經(jīng)驗(yàn)。還可以看到每個(gè)consumer group的消費(fèi)延遲情況,精確到條數(shù),partition的延遲。也可以看到partition消息總數(shù),可以排查一些消息不均的問題。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

下圖為監(jiān)控報(bào)警,可以了解Topic的流入、流出數(shù)據(jù),每秒寫入多少條消息,多大的size,每秒流出的情況。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

報(bào)警是對Topic建一些流量報(bào)警,或是一些延遲報(bào)警,建好之后只需要訂閱一下即可,非常方便。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

五、瓜子結(jié)構(gòu)化數(shù)據(jù)流

目前有許多使用場景,比如前端埋點(diǎn),tracking日志,Mysql數(shù)據(jù)同步,操作日志,一些諸如服務(wù)之間的交換,基于SQL的streaming,APM的數(shù)據(jù),還有基于SQL的ETL等,都可以通過結(jié)構(gòu)化將其快速同步到大數(shù)據(jù)中做后續(xù)分析。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

我們是通過confluent提供的一整套解決方案來實(shí)現(xiàn)的。其中最主要的兩個(gè)組件是:Schema Registry和Kafka Connect。Schema Registry用于存儲schema信息,Kafka connect用于數(shù)據(jù)轉(zhuǎn)移。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

目前,瓜子除日志部分外,90%以上為結(jié)構(gòu)化。為什么選擇Avro?因?yàn)锳vro速度快,并且跨語言支持,所有的Schema AVSC都是用JSON做的,對JSON支持的特別好,如果可能沒人想為一個(gè)schema定義再學(xué)一門語言吧。而且通過JSON無需code generation。

但僅有avro還不夠,我們在使用中會面臨更多的問題,比如:

  • 統(tǒng)一的schema中心,這與配置中心的必要性是一樣的道理,沒有統(tǒng)一的地方,口口相傳,配置亂飛不是我們想看到的。

  • 多版本的需求,schema是肯定會有更新需求的,也肯定有回滾需求,也會有兼容需求,所以多版本是需要滿足的。

  • 版本兼容性檢查,設(shè)想一下上游改了一個(gè)schema的列名,下游寫到hive的時(shí)候就蒙了,歷史數(shù)據(jù)咋辦啊,現(xiàn)在這列數(shù)據(jù)又怎么處理。所以得有版本兼容,而且最好滿足下游所有組件的需求。

  • schema得有注釋,給人展示的schema最好能有給人讀的注釋,很多人昨天定義的enum,今天就忘了,這個(gè)事情很常見。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

為解決這些問題,我們引入了confluence的Schema Registry。Confluence的Schema registry,通過RESTful接口,提供了類似配置中心的能力,還有完善的UI,支持版本兼容性檢查,支持多版本等,完全滿足了我們的需求。而且自帶HA,通過Kafka存儲配置信息,保證一致性。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

五、瓜子的實(shí)踐

當(dāng)然,僅有這些還不夠,我們在實(shí)踐中遇到了很多問題,比如schema注冊不可能完全開放,歷史告訴我們完全的自由意味著混亂。為在實(shí)踐中利用好avro,我們前后改了兩個(gè)方案,來保證schema可控。

  1. 最初的方案

為實(shí)現(xiàn)統(tǒng)一管控,所有schema會通過git來管理,如果需要使用可以fork該git。如果要做一個(gè)上線,更新或添加一個(gè)schema,可以通過提merge request提交給管理員。管理員檢查沒有問題后直接通過gitlab-ci自動注冊,管理員只需完成確認(rèn)的操作。

但這樣會出現(xiàn)一些問題,首先是上線流程太長,要上線或更新一個(gè)schema時(shí),需要提交merge request,要等管理員收到郵件后才可查看,屆時(shí)如果管理員發(fā)現(xiàn)schema寫的不對,還需重新再走一次流程,中間可能花一天時(shí)間。且回滾復(fù)雜,沒有權(quán)限管理。而且很多人會犯同樣的錯(cuò)誤,客服表示相當(dāng)?shù)睦速M(fèi)時(shí)間。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

六、平臺化解決方案

通過平臺化解決方案,我們提供了一個(gè)類似于git的頁面,可在上面直接提交schema,在下面直接點(diǎn)擊校驗(yàn),在評估新上線的schema是否有問題后,等待后臺審批即可。其中可以加諸如權(quán)限管理等一些功能。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

七、為什么用到Kafka connect

Kafka connect專注copy數(shù)據(jù),把一個(gè)數(shù)據(jù)從data source轉(zhuǎn)到Kafka,再從Kafka轉(zhuǎn)到其它地方。它支持批和流,同時(shí)支持實(shí)時(shí)和批處理,比如5min同步一次。

另外,它支持多個(gè)系統(tǒng)之間互相copy,數(shù)據(jù)源可能是Mysql、SQL Server 也可能是Oracle 。sink可以是Hbase、Hive等。它自己定義了一套plugin接口,可以自己寫很多數(shù)據(jù)源和不支持的sink。

并且他自己做到了分布式并行,支持完善的HA和load balance,還提供方便的RESTful 接口。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

在沒有Kafka connect之前,運(yùn)維ETL非常麻煩。拿canal來說,canal有server和client,都需手動部署,如果你有100臺canal節(jié)點(diǎn)1000個(gè)數(shù)據(jù)庫,想想看吧,管理員如何知道哪臺機(jī)器上跑了哪些庫表,新增的任務(wù)又放在哪臺機(jī)來運(yùn)行。

此外,如果Mysql修改了一個(gè)字段,還需要讓程序員去機(jī)器上看一下那張表是如何修改的,相應(yīng)的所有下游都需相應(yīng)的完成表結(jié)構(gòu)修改之后, 才能跑起來,響應(yīng)速度非常慢。

目前Kafka connect已經(jīng)解決了這些問題。其具備一個(gè)非常重要的特性,如果上游數(shù)據(jù)根據(jù)AVRO兼容性進(jìn)行的修改,connect會在下游同樣做一些兼容性的修改,自動更改下游表結(jié)構(gòu),減輕了運(yùn)維負(fù)擔(dān)。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

我們來看看Kafka connect 的架構(gòu),Kafka connect會把所有信息存到Kafka 中,其中config topic存元數(shù)據(jù),Stutas topic指當(dāng)前哪些節(jié)點(diǎn)正在跑什么樣的job,offset topic指當(dāng)前比如某個(gè)Topic的某個(gè)partitions到底消費(fèi)到哪條數(shù)據(jù)。

WorKer都是無狀態(tài)的,在上面可以跑許多task,同樣一個(gè)task1,可能對應(yīng)5個(gè)partitions,如果只給它三個(gè)并發(fā),它會分布在三臺機(jī)器上。如果一臺機(jī)器掛了,這些job都會分配到另外兩臺機(jī)器,而且是實(shí)時(shí)同步的。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

八、瓜子Plugins

瓜子對Kafka connect的很多plugins做了修改。

  1. Maxwell

其中我們把canal通過maxwell替換,并且把maxwell做成了Kafka connect的plugin。

原生的Maxwell不支持AVRO,瓜子通過debezium思想對Maxwell進(jìn)行了修改,使其支持avro格式,并用Mysql管理meta,并且支持Mysql的數(shù)據(jù)庫切換。

  1. HDFS

我們采用的是confluence公司的hdfs插件,但是其本身存在很多問題,比如寫入hive的時(shí)候會把當(dāng)做partition的列也寫到主表數(shù)據(jù)中,雖然不影響hive的使用,但是影響presto讀取hive,這里我們改了源碼,去掉了主表中的這些列。

Hdfs在插件重啟時(shí)會去hdfs中讀取所有文件來確定從哪個(gè)offset繼續(xù),這里會有兩個(gè)問題:耗時(shí)太長,切換集群時(shí)offset無法接續(xù),我們也對他做了修改。

plugin寫入hive時(shí)支持用Kafka的timestamp做分區(qū),也支持用數(shù)據(jù)內(nèi)的某些列做分區(qū),但是不支持兩者同時(shí)用,我們也修改了一下。

  1. HBase

Hbase的plugin只支持最原始的導(dǎo)出,我們會有些特殊的需求,比如對rowkey自定義一下,通常mysql主鍵是自增ID,hbase不推薦用自增ID做rowkey,我們會有reverse的需求,還有多列聯(lián)合做rowkey的需求等,這個(gè)我們也改了源碼,支持通過配置自定義rowkey生成。

原始plugin不支持kerberos,而我們online hbase是帶權(quán)限的,所以也改了一下

還有一些小功能,比如把所有類型都先轉(zhuǎn)成string再存,支持delete,支持json等。

  1. KUDU

我們對kudu的使用很多,kudu開源的plugin有些bug,我們發(fā)現(xiàn)后也fix了一下。

Kudu的數(shù)據(jù)來源都是mysql,但是經(jīng)常會有mysql刷庫的情況,這時(shí)量就會很大,kudu sink會有較大的延時(shí),我們改了一下plugin,添加了自適應(yīng)流量控制,自動擴(kuò)充成多線程處理,也會在流量小時(shí),自動縮容。

九、瓜子數(shù)據(jù)庫的Data Pipeline

瓜子的數(shù)據(jù)倉庫完全是基于Kafka、AVRO的結(jié)構(gòu)化數(shù)據(jù)來做的。數(shù)據(jù)源非常多,需要將多個(gè)業(yè)務(wù)線的幾千張表同步到數(shù)倉,對外提供服務(wù)。

整個(gè)數(shù)據(jù)倉庫采用Lambda架構(gòu),分為T+1的存量處理和T+0.1的增量處理兩個(gè)流程。

先說T+1的存量處理部分,目前瓜子將所有mysql表通過Maxwell插件放到Kafka中,再通過Kafka connect寫到Hbase里,Hbase每天晚上做一次snapshot(快照),寫到hive中,然后經(jīng)過多輪ETL:DWB-->DWD-->DW-->DM,最后將DM層數(shù)據(jù)導(dǎo)入Kudu中,對外提供BI分析服務(wù),當(dāng)然離線olap分析還是通過presto直接訪問Hive查詢。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

再說T+0.1的增量流程,同T+1一樣,數(shù)據(jù)通過maxwell進(jìn)入Kafka,這部分流程共用,然后增量數(shù)據(jù)會實(shí)時(shí)通過kudu的插件寫入kudu中,再通過impala做ETL,生成數(shù)據(jù)對外提供T+0.1的查詢,對外提供的查詢是通過另一套impala來做的。 未來我們還會考慮通過Flink直接讀取Kafka中數(shù)據(jù)來做實(shí)時(shí)ETL,提高實(shí)時(shí)性。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

這是我們數(shù)倉架構(gòu)的整體架構(gòu)圖

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI