您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說(shuō),跟著小編一起來(lái)看看吧。
流式處理中經(jīng)常會(huì)遇到Kafka與其他系統(tǒng)進(jìn)行數(shù)據(jù)同步或者Kafka集群間數(shù)據(jù)遷移的情景。使用EMR Kafka Connect可以方便快速的實(shí)現(xiàn)數(shù)據(jù)同步或者數(shù)據(jù)遷移。
Kafka Connect是一種可擴(kuò)展的、可靠的,用于在Kafka和其他系統(tǒng)之間快速地進(jìn)行流式數(shù)據(jù)傳輸?shù)墓ぞ摺@缈梢允褂肒afka Connect獲取數(shù)據(jù)庫(kù)的binglog數(shù)據(jù),將數(shù)據(jù)庫(kù)的數(shù)據(jù)遷入Kafka集群,以同步數(shù)據(jù)庫(kù)的數(shù)據(jù),或者對(duì)接下游的流式處理系統(tǒng)。同時(shí),Kafka Connect提供的REST API接口可以方便的進(jìn)行Kafka Connect的創(chuàng)建和管理。
Kafka Connect分為standalone和distributed兩種運(yùn)行模式。standalone模式下,所有的worker都在一個(gè)進(jìn)程中運(yùn)行;相比之下,distributed模式更具擴(kuò)展性和容錯(cuò)性,是最常用的方式,也是生產(chǎn)環(huán)境推薦使用的模式。
本文介紹使用EMR Kafka Connect的REST API接口在Kafka集群間進(jìn)行數(shù)據(jù)遷移,使用distributed模式。
創(chuàng)建兩個(gè)EMR集群,集群類型為Kafka。EMR Kafka Connect安裝在task節(jié)點(diǎn)上,進(jìn)行數(shù)據(jù)遷移的目的Kafka集群需要?jiǎng)?chuàng)建task節(jié)點(diǎn)。集群創(chuàng)建好后,task節(jié)點(diǎn)上EMR Kafka Connect服務(wù)會(huì)默認(rèn)啟動(dòng),端口號(hào)為8083。
注意要保證兩個(gè)集群的網(wǎng)路互通,詳細(xì)的創(chuàng)建流程見創(chuàng)建集群https://help.aliyun.com/document_detail/28088.html
EMR Kafka Connect的配置文件路徑為/etc/ecm/kafka-conf/connect-distributed.properties
。
在源Kafka集群創(chuàng)建需要同步的topic,例如
另外,Kafka Connect會(huì)將offsets, configs和任務(wù)狀態(tài)保存在topic中,topic名對(duì)應(yīng)配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三個(gè)配置項(xiàng)。默認(rèn)的,Kafka Connect會(huì)自動(dòng)的使用默認(rèn)的partition和replication factor創(chuàng)建這三個(gè)topic。
在目的Kafka集群的task節(jié)點(diǎn)(例如emr-worker-3節(jié)點(diǎn)),使用curl命令通過(guò)json數(shù)據(jù)創(chuàng)建一個(gè)Kafka Connect。
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors
json數(shù)據(jù)中,name字段代表創(chuàng)建的connect的名稱,此處為connect-test;config字段需要根據(jù)實(shí)際情況進(jìn)行配置,其中的變量說(shuō)明如下表
字段 | 說(shuō)明 |
---|---|
topic.whitelist | 源Kafka集群中需要同步的topic,多個(gè)topic用逗號(hào)隔開,例如connect |
topic.rename.format | 可選配置項(xiàng),目的Kafka集群中同步后的topic,默認(rèn)值為${topic.whitelist}.replica。例如源topic為connect,同步后的topic為connect.replica |
src.kafka.bootstrap.servers | 源Kafka集群broker地址 |
src.zookeeper.connect | 源Kafka集群安裝了zookeeper服務(wù)的節(jié)點(diǎn)內(nèi)網(wǎng)IP |
dest.zookeeper.connect | 目的Kafka集群安裝了zookeeper服務(wù)的節(jié)點(diǎn)內(nèi)網(wǎng)IP |
查看所有的Kafka Connect
查看創(chuàng)建的connect-test的狀態(tài)
查看創(chuàng)建的connect-test的狀態(tài)查看task的信息
3.4數(shù)據(jù)同步
在源Kafka集群創(chuàng)建需要同步的數(shù)據(jù)。
3.5查看同步結(jié)果
在目的Kafka集群消費(fèi)同步的數(shù)據(jù)。
可以看到,在源Kafka集群發(fā)送的100000條數(shù)據(jù)已經(jīng)遷移到了目的Kafka集群。
以上就是EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。