您好,登錄后才能下訂單哦!
這篇文章給大家介紹如何解析Flume與Kafka整合,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。
Flume與Kafka整合
一、概念
1、Flume:Cloudera 開發(fā)的分布式日志收集系統(tǒng),是一種分布式,可靠且可用的服務(wù),用于高效地收集,匯總和移動(dòng)大量日志數(shù)據(jù)。 它具有基于流式數(shù)據(jù)流的簡(jiǎn)單而靈活的架構(gòu)。它具有可靠的可靠性機(jī)制和許多故障轉(zhuǎn)移和恢復(fù)機(jī)制,具有強(qiáng)大的容錯(cuò)性和容錯(cuò)能力。它使用一個(gè)簡(jiǎn)單的可擴(kuò)展數(shù)據(jù)模型,允許在線分析應(yīng)用程序。Flume分為OG、NG版本,其中Flume OG 的最后一個(gè)發(fā)行版本 0.94.0,之后為NG版本。
2、Kafka:作為一個(gè)集群運(yùn)行在一臺(tái)或多臺(tái)可以跨越多個(gè)數(shù)據(jù)中心的服務(wù)器上。在Kafka中,客戶端和服務(wù)器之間的通信是通過一種簡(jiǎn)單的,高性能的,語言不可知的TCP協(xié)議完成的。協(xié)議是版本控制的,并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端可以使用多種語言。
3、Kafka通常用于兩大類應(yīng)用,如下:
A、構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時(shí)流數(shù)據(jù)管道
B、構(gòu)建實(shí)時(shí)流應(yīng)用程序,用于轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流
C、Kafka每個(gè)記錄由一個(gè)鍵,一個(gè)值和一個(gè)時(shí)間戳組成。
二、產(chǎn)述背景
基于大數(shù)據(jù)領(lǐng)域?qū)崿F(xiàn)日志數(shù)據(jù)時(shí)時(shí)采集及數(shù)據(jù)傳遞等需要,據(jù)此需求下試著完成flume+kafka扇入、扇出功能整合,其中扇出包括:復(fù)制流、復(fù)用流等功能性測(cè)試。后續(xù)根據(jù)實(shí)際需要,將完善kafka與spark streaming進(jìn)行整合整理工作。
注:此文檔僅限于功能性測(cè)試,性能優(yōu)化方面請(qǐng)大家根據(jù)實(shí)際情況增加。
三、部署安裝
1、測(cè)試環(huán)境說明:
操作系統(tǒng):CentOS 7
Flume版本:flume-ng-1.6.0-cdh6.7.0
Kafka版本:kafka_2.11-0.10.0.1
JDK版本:JDK1.8.0
Scala版本:2.11.8
2、測(cè)試步驟:
2.1、flume部署
2.1.1、下載安裝介質(zhì),并解壓:
此處)折疊或打開
此處)折疊或打開
此處)折疊或打開
cd /app/apache-flume-1.6.0-cdh6.7.0-bin
vi netcatOrKafka-memory-logger.conf
netcatagent.sources = netcat_sources
netcatagent.channels = c1 c2
netcatagent.sinks = logger_sinks kafka_sinks
netcatagent.sources.netcat_sources.type = netcat
netcatagent.sources.netcat_sources.bind = 0.0.0.0
netcatagent.sources.netcat_sources.port = 44444
netcatagent.channels.c1.type = memory
netcatagent.channels.c1.capacity = 1000
netcatagent.channels.c1.transactionCapacity = 100
netcatagent.channels.c2.type = memory
netcatagent.channels.c2.capacity = 1000
netcatagent.channels.c2.transactionCapacity = 100
netcatagent.sinks.logger_sinks.type = logger
netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink
netcatagent.sinks.kafka_sinks.topic = test
netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082
netcatagent.sinks.kafka_sinks.requiredAcks = 0
##netcatagent.sinks.kafka_sinks.batchSize = 20
netcatagent.sinks.kafka_sinks.producer.type=sync
netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8
netcatagent.sinks.kafka_sinks.partition.key=0
netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder
netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition
netcatagent.sinks.kafka_sinks.max.message.size=1000000
netcatagent.sources.netcat_sources.selector.type = replicating
netcatagent.sources.netcat_sources.channels = c1 c2
netcatagent.sinks.logger_sinks.channel = c1
netcatagent.sinks.kafka_sinks.channel = c2
2.4.2、啟動(dòng)各測(cè)試命令:
A、啟動(dòng)flume的agent(于192.168.137.130):
flume-ng agent --name netcatagent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
-Dflume.root.logger=INFO,console
B、啟動(dòng)kafka消費(fèi)者(于192.168.137.132):
kafka-console-consumer.sh \
--zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--from-beginning --topic test
C、測(cè)試發(fā)送(于192.168.137.130與于192.168.137.132)
telnet發(fā)送結(jié)果
kafka消費(fèi)結(jié)果
最終logger接收結(jié)果
至此flume+kafka扇出--復(fù)制流測(cè)試(扇入源為:netcat;輸出為:kafka+Flume的Logger)測(cè)試與驗(yàn)證完成。
2.5、flume+kafka扇出--復(fù)用流測(cè)試(扇入源為:netcat;輸出為:kafka+Flume的Logger)
暫無,后續(xù)補(bǔ)充
四、部署安裝及驗(yàn)證過程中出現(xiàn)的問題
1、做flume+kafka扇入測(cè)試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時(shí),一直未收到kafka數(shù)據(jù)
主要原因是在做kafka的配置時(shí)在配置文件(server.properties)中寫成內(nèi)容:
zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
但在創(chuàng)建topics時(shí),使用的是:
kafka-topics.sh --create \
--zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在創(chuàng)建topics的時(shí)增加了/kafka
最終使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令檢查沒有topics信息才發(fā)現(xiàn)此問題
解決辦法:將兩個(gè)信息同步即可
2、做flume+kafka扇入測(cè)試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時(shí),啟動(dòng)flume的agent時(shí)報(bào)錯(cuò)。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
... 11 more
解決辦法:官網(wǎng)資料存在問題,org.apache.flume.source.kafka,KafkaSource其中不應(yīng)該包括逗號(hào),改為:org.apache.flume.source.kafka.KafkaSource即可。詳細(xì)官網(wǎng)
關(guān)于如何解析Flume與Kafka整合就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。