溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Kafka Connect及FileConnector的示例分析

發(fā)布時間:2021-12-15 09:19:52 來源:億速云 閱讀:201 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)Kafka Connect及FileConnector的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

一. Kafka Connect簡介

Kafka是一個使用越來越廣的消息系統(tǒng),尤其是在大數(shù)據(jù)開發(fā)中(實(shí)時數(shù)據(jù)處理和分析)。為何集成其他系統(tǒng)和解耦應(yīng)用,經(jīng)常使用Producer來發(fā)送消息到Broker,并使用Consumer來消費(fèi)Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統(tǒng)與Kafka的集成。Kafka Connect運(yùn)用用戶快速定義并實(shí)現(xiàn)各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數(shù)據(jù)導(dǎo)入/導(dǎo)出Kafka很方便。

Kafka Connect及FileConnector的示例分析

如圖中所示,左側(cè)的Sources負(fù)責(zé)從其他異構(gòu)系統(tǒng)中讀取數(shù)據(jù)并導(dǎo)入到Kafka中;右側(cè)的Sinks是把Kafka中的數(shù)據(jù)寫入到其他的系統(tǒng)中。

二. 各種Kafka Connector

Kafka Connector很多,包括開源和商業(yè)版本的。如下列表中是常用的開源的Connector

ConnectorsReferences
JdbcSource, Sink
Elastic SearchSink1, Sink2, Sink3
CassandraSource1, Source 2, Sink1, Sink2
MongoDBSource
HBaseSink
SyslogSource
MQTT (Source)Source
Twitter (Source)Source, Sink
S3Sink1, Sink2

商業(yè)版的可以通過Confluent.io獲得

三. 示例

3.1 FileConnector Demo

 本例演示如何使用Kafka Connect把Source(test.txt)轉(zhuǎn)為流數(shù)據(jù)再寫入到Destination(test.sink.txt)中。如下圖所示:

Kafka Connect及FileConnector的示例分析

本例使用到了兩個Connector:

  • FileStreamSource:從test.txt中讀取并發(fā)布到Broker中

  • FileStreamSink:從Broker中讀取數(shù)據(jù)并寫入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
3.2 運(yùn)行Demo

需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

3.2.1 啟動Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
3.2.2 啟動Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

3.3.3 打開console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
3.3.4 寫入到test.txt文件中,并觀察3.3.3中的變化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打開的窗口輸出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line

關(guān)于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI