您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Kafka Connect及FileConnector的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
Kafka是一個使用越來越廣的消息系統(tǒng),尤其是在大數(shù)據(jù)開發(fā)中(實(shí)時數(shù)據(jù)處理和分析)。為何集成其他系統(tǒng)和解耦應(yīng)用,經(jīng)常使用Producer來發(fā)送消息到Broker,并使用Consumer來消費(fèi)Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統(tǒng)與Kafka的集成。Kafka Connect運(yùn)用用戶快速定義并實(shí)現(xiàn)各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數(shù)據(jù)導(dǎo)入/導(dǎo)出Kafka很方便。
如圖中所示,左側(cè)的Sources負(fù)責(zé)從其他異構(gòu)系統(tǒng)中讀取數(shù)據(jù)并導(dǎo)入到Kafka中;右側(cè)的Sinks是把Kafka中的數(shù)據(jù)寫入到其他的系統(tǒng)中。
Kafka Connector很多,包括開源和商業(yè)版本的。如下列表中是常用的開源的Connector
Connectors | References |
---|---|
Jdbc | Source, Sink |
Elastic Search | Sink1, Sink2, Sink3 |
Cassandra | Source1, Source 2, Sink1, Sink2 |
MongoDB | Source |
HBase | Sink |
Syslog | Source |
MQTT (Source) | Source |
Twitter (Source) | Source, Sink |
S3 | Sink1, Sink2 |
商業(yè)版的可以通過Confluent.io獲得
本例演示如何使用Kafka Connect把Source(test.txt)轉(zhuǎn)為流數(shù)據(jù)再寫入到Destination(test.sink.txt)中。如下圖所示:
本例使用到了兩個Connector:
FileStreamSource:從test.txt中讀取并發(fā)布到Broker中
FileStreamSink:從Broker中讀取數(shù)據(jù)并寫入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
3.3.3 打開console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line
關(guān)于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。