溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何解析Flume與Kafka整合

發(fā)布時(shí)間:2021-12-15 15:46:40 來源:億速云 閱讀:268 作者:柒染 欄目:互聯(lián)網(wǎng)科技

這篇文章給大家介紹如何解析Flume與Kafka整合,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

Flume與Kafka整合


一、概念
1、Flume:Cloudera 開發(fā)的分布式日志收集系統(tǒng),是一種分布式,可靠且可用的服務(wù),用于高效地收集,匯總和移動(dòng)大量日志數(shù)據(jù)。 它具有基于流式數(shù)據(jù)流的簡(jiǎn)單而靈活的架構(gòu)。它具有可靠的可靠性機(jī)制和許多故障轉(zhuǎn)移和恢復(fù)機(jī)制,具有強(qiáng)大的容錯(cuò)性和容錯(cuò)能力。它使用一個(gè)簡(jiǎn)單的可擴(kuò)展數(shù)據(jù)模型,允許在線分析應(yīng)用程序。Flume分為OG、NG版本,其中Flume OG 的最后一個(gè)發(fā)行版本 0.94.0,之后為NG版本。
 
2、Kafka:作為一個(gè)集群運(yùn)行在一臺(tái)或多臺(tái)可以跨越多個(gè)數(shù)據(jù)中心的服務(wù)器上。在Kafka中,客戶端和服務(wù)器之間的通信是通過一種簡(jiǎn)單的,高性能的,語言不可知的TCP協(xié)議完成的。協(xié)議是版本控制的,并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端可以使用多種語言。

3、Kafka通常用于兩大類應(yīng)用,如下:
   A、構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時(shí)流數(shù)據(jù)管道
   B、構(gòu)建實(shí)時(shí)流應(yīng)用程序,用于轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流
   C、Kafka每個(gè)記錄由一個(gè)鍵,一個(gè)值和一個(gè)時(shí)間戳組成。

二、產(chǎn)述背景
    基于大數(shù)據(jù)領(lǐng)域?qū)崿F(xiàn)日志數(shù)據(jù)時(shí)時(shí)采集及數(shù)據(jù)傳遞等需要,據(jù)此需求下試著完成flume+kafka扇入、扇出功能整合,其中扇出包括:復(fù)制流、復(fù)用流等功能性測(cè)試。后續(xù)根據(jù)實(shí)際需要,將完善kafka與spark streaming進(jìn)行整合整理工作。
    注:此文檔僅限于功能性測(cè)試,性能優(yōu)化方面請(qǐng)大家根據(jù)實(shí)際情況增加。

三、部署安裝
1、測(cè)試環(huán)境說明:
   操作系統(tǒng):CentOS 7
   Flume版本:flume-ng-1.6.0-cdh6.7.0
   Kafka版本:kafka_2.11-0.10.0.1
   JDK版本:JDK1.8.0
   Scala版本:2.11.8
2、測(cè)試步驟:
2.1、flume部署
2.1.1、下載安裝介質(zhì),并解壓:

此處)折疊或打開

此處)折疊或打開

此處)折疊或打開

  1. cd /app/apache-flume-1.6.0-cdh6.7.0-bin

  2. vi netcatOrKafka-memory-logger.conf

  3.     netcatagent.sources = netcat_sources

  4.     netcatagent.channels = c1 c2

  5.     netcatagent.sinks = logger_sinks kafka_sinks

  6.     

  7.     netcatagent.sources.netcat_sources.type = netcat

  8.     netcatagent.sources.netcat_sources.bind = 0.0.0.0

  9.     netcatagent.sources.netcat_sources.port = 44444

  10.     

  11.     netcatagent.channels.c1.type = memory

  12.     netcatagent.channels.c1.capacity = 1000

  13.     netcatagent.channels.c1.transactionCapacity = 100

  14.     

  15.     netcatagent.channels.c2.type = memory

  16.     netcatagent.channels.c2.capacity = 1000

  17.     netcatagent.channels.c2.transactionCapacity = 100

  18.     

  19.     netcatagent.sinks.logger_sinks.type = logger

  20.     

  21.     netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

  22.     netcatagent.sinks.kafka_sinks.topic = test

  23.     netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082

  24.     netcatagent.sinks.kafka_sinks.requiredAcks = 0

  25.     ##netcatagent.sinks.kafka_sinks.batchSize = 20

  26.     netcatagent.sinks.kafka_sinks.producer.type=sync

  27.     netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

  28.     netcatagent.sinks.kafka_sinks.partition.key=0

  29.     netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

  30.     netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

  31.     netcatagent.sinks.kafka_sinks.max.message.size=1000000

  32.     

  33.     netcatagent.sources.netcat_sources.selector.type = replicating

  34.     

  35.     netcatagent.sources.netcat_sources.channels = c1 c2

  36.     netcatagent.sinks.logger_sinks.channel = c1

  37.     netcatagent.sinks.kafka_sinks.channel = c2

2.4.2、啟動(dòng)各測(cè)試命令:
   A、啟動(dòng)flume的agent(于192.168.137.130):
      flume-ng agent --name netcatagent \
       --conf $FLUME_HOME/conf \
       --conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
       -Dflume.root.logger=INFO,console
    B、啟動(dòng)kafka消費(fèi)者(于192.168.137.132):
        kafka-console-consumer.sh \
         --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
          --from-beginning --topic test
     C、測(cè)試發(fā)送(于192.168.137.130與于192.168.137.132)
telnet發(fā)送結(jié)果

如何解析Flume與Kafka整合

kafka消費(fèi)結(jié)果

如何解析Flume與Kafka整合

最終logger接收結(jié)果

如何解析Flume與Kafka整合

         
   至此flume+kafka扇出--復(fù)制流測(cè)試(扇入源為:netcat;輸出為:kafka+Flume的Logger)測(cè)試與驗(yàn)證完成。
   
2.5、flume+kafka扇出--復(fù)用流測(cè)試(扇入源為:netcat;輸出為:kafka+Flume的Logger)

   暫無,后續(xù)補(bǔ)充



四、部署安裝及驗(yàn)證過程中出現(xiàn)的問題

    1、做flume+kafka扇入測(cè)試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時(shí),一直未收到kafka數(shù)據(jù)
        主要原因是在做kafka的配置時(shí)在配置文件(server.properties)中寫成內(nèi)容:
       zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
       但在創(chuàng)建topics時(shí),使用的是:
       kafka-topics.sh --create \
   --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
 
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在創(chuàng)建topics的時(shí)增加了/kafka
最終使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令檢查沒有topics信息才發(fā)現(xiàn)此問題
   
   解決辦法:將兩個(gè)信息同步即可
   
2、做flume+kafka扇入測(cè)試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時(shí),啟動(dòng)flume的agent時(shí)報(bào)錯(cuò)。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more


   解決辦法:官網(wǎng)資料存在問題,org.apache.flume.source.kafka,KafkaSource其中不應(yīng)該包括逗號(hào),改為:org.apache.flume.source.kafka.KafkaSource即可。詳細(xì)官網(wǎng)
如何解析Flume與Kafka整合

關(guān)于如何解析Flume與Kafka整合就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI