溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移

發(fā)布時(shí)間:2021-07-30 16:51:32 來(lái)源:億速云 閱讀:170 作者:Leah 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說(shuō),跟著小編一起來(lái)看看吧。

1.背景

流式處理中經(jīng)常會(huì)遇到Kafka與其他系統(tǒng)進(jìn)行數(shù)據(jù)同步或者Kafka集群間數(shù)據(jù)遷移的情景。使用EMR Kafka Connect可以方便快速的實(shí)現(xiàn)數(shù)據(jù)同步或者數(shù)據(jù)遷移。

Kafka Connect是一種可擴(kuò)展的、可靠的,用于在Kafka和其他系統(tǒng)之間快速地進(jìn)行流式數(shù)據(jù)傳輸?shù)墓ぞ摺@缈梢允褂肒afka Connect獲取數(shù)據(jù)庫(kù)的binglog數(shù)據(jù),將數(shù)據(jù)庫(kù)的數(shù)據(jù)遷入Kafka集群,以同步數(shù)據(jù)庫(kù)的數(shù)據(jù),或者對(duì)接下游的流式處理系統(tǒng)。同時(shí),Kafka Connect提供的REST API接口可以方便的進(jìn)行Kafka Connect的創(chuàng)建和管理。
Kafka Connect分為standalone和distributed兩種運(yùn)行模式。standalone模式下,所有的worker都在一個(gè)進(jìn)程中運(yùn)行;相比之下,distributed模式更具擴(kuò)展性和容錯(cuò)性,是最常用的方式,也是生產(chǎn)環(huán)境推薦使用的模式。

本文介紹使用EMR Kafka Connect的REST API接口在Kafka集群間進(jìn)行數(shù)據(jù)遷移,使用distributed模式。

2.環(huán)境準(zhǔn)備

創(chuàng)建兩個(gè)EMR集群,集群類型為Kafka。EMR Kafka Connect安裝在task節(jié)點(diǎn)上,進(jìn)行數(shù)據(jù)遷移的目的Kafka集群需要?jiǎng)?chuàng)建task節(jié)點(diǎn)。集群創(chuàng)建好后,task節(jié)點(diǎn)上EMR Kafka Connect服務(wù)會(huì)默認(rèn)啟動(dòng),端口號(hào)為8083。

注意要保證兩個(gè)集群的網(wǎng)路互通,詳細(xì)的創(chuàng)建流程見創(chuàng)建集群https://help.aliyun.com/document_detail/28088.html

3.數(shù)據(jù)遷移

3.1準(zhǔn)備工作

EMR Kafka Connect的配置文件路徑為/etc/ecm/kafka-conf/connect-distributed.properties。

在源Kafka集群創(chuàng)建需要同步的topic,例如

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移

另外,Kafka Connect會(huì)將offsets, configs和任務(wù)狀態(tài)保存在topic中,topic名對(duì)應(yīng)配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三個(gè)配置項(xiàng)。默認(rèn)的,Kafka Connect會(huì)自動(dòng)的使用默認(rèn)的partition和replication factor創(chuàng)建這三個(gè)topic。

3.2創(chuàng)建Kafka Connect

在目的Kafka集群的task節(jié)點(diǎn)(例如emr-worker-3節(jié)點(diǎn)),使用curl命令通過(guò)json數(shù)據(jù)創(chuàng)建一個(gè)Kafka Connect。

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

json數(shù)據(jù)中,name字段代表創(chuàng)建的connect的名稱,此處為connect-test;config字段需要根據(jù)實(shí)際情況進(jìn)行配置,其中的變量說(shuō)明如下表

字段說(shuō)明
topic.whitelist源Kafka集群中需要同步的topic,多個(gè)topic用逗號(hào)隔開,例如connect
topic.rename.format可選配置項(xiàng),目的Kafka集群中同步后的topic,默認(rèn)值為${topic.whitelist}.replica。例如源topic為connect,同步后的topic為connect.replica
src.kafka.bootstrap.servers源Kafka集群broker地址
src.zookeeper.connect源Kafka集群安裝了zookeeper服務(wù)的節(jié)點(diǎn)內(nèi)網(wǎng)IP
dest.zookeeper.connect目的Kafka集群安裝了zookeeper服務(wù)的節(jié)點(diǎn)內(nèi)網(wǎng)IP

3.3查看Kafka Connect

查看所有的Kafka Connect

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移


查看創(chuàng)建的connect-test的狀態(tài)

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移

查看創(chuàng)建的connect-test的狀態(tài)查看task的信息

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移

3.4數(shù)據(jù)同步

在源Kafka集群創(chuàng)建需要同步的數(shù)據(jù)。

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移

3.5查看同步結(jié)果

在目的Kafka集群消費(fèi)同步的數(shù)據(jù)。

EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移

可以看到,在源Kafka集群發(fā)送的100000條數(shù)據(jù)已經(jīng)遷移到了目的Kafka集群。

以上就是EMR-Kafka中怎么利用Connect實(shí)現(xiàn)數(shù)據(jù)遷移,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI