溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

flume 整合kafka

發(fā)布時間:2020-07-21 16:39:15 來源:網(wǎng)絡(luò) 閱讀:552 作者:crazy_charles 欄目:開發(fā)技術(shù)

1,安裝并成功能運行flume

2,安裝并成功能運行kafka

3,安裝并成功能運行zookeeper

4,開始整合flume收集的數(shù)據(jù),寫入kafka

a,修改flume的配置文加:

vim  flume_kafka.conf

agent1.sources = r1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.r1.type = exec

agent1.sources.r1.command=tail -f /opt/logs/usercenter.log


# Use a channel which buffers events in memory

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

agent1.sources.r1.channels = c1

agent1.sinks.k1.channel = c1


# # Describe the sink  這部分就是輸入到kafka的寫法

##############################################

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = test

agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 20

##############################################


b,下載第三方插件

下載flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin


lib目錄下的

flume 整合kafka

package下的

flume 整合kafka

 

都放到flumelib目錄

如果,報錯,請看這個文檔

http://wenda.chinahadoop.cn/question/4079?notification_id=290954&rf=false&item_id=10382#!answer_10382


修改原有的flume-conf文件


在插件包里有一個flume-conf.properties,把這個文件放到flume的conf文件夾里

然后修改以下內(nèi)容

producer.sources.s.type = exec
 producer.sources.s.command = tail -f -n+1  /opt/logs/test.log
 producer.sources.s.channels = c
……
producer.sinks.r.custom.topic.name=test
……
consumer.sources.s.custom.topic.name=test

c:啟動服務(wù)

 

啟動zookeeper集群

zkServer.sh start

zkServer.sh start

zkServer.sh start

還需要創(chuàng)建一個新的地址

zookeeper/bin/zkCli.sh

create /kafka  test

啟動kafka broker 集群

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh config/server.properties


創(chuàng)建kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test


啟動kafka consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test --from-beginning


啟動flume

bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.properties --name producer -Dflume.root.logger=INFO,console


測試

echo "this is a test" >> /opt/logs/test.log

此時只要能在consumer里現(xiàn)“this is a test”就表示成功


錯誤總結(jié):

http://472053211.blog.51cto.com/3692116/1655844

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI