溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

flume-1.6.0 高可用測試&&數(shù)據(jù)入Kafka

發(fā)布時(shí)間:2020-07-31 06:32:09 來源:網(wǎng)絡(luò) 閱讀:1300 作者:xiaobin0303 欄目:開發(fā)技術(shù)

機(jī)器列表:

192.168.137.115  slave0     (agent) 
192.168.137.116  slave1     (agent) 
192.168.137.117  slave2     (agent) 
192.168.137.118  slave3     (collector) 
192.168.137.119  slave4     (collector)


在每個(gè)機(jī)器上創(chuàng)建目錄

mkdir -p /home/qun/data/flume/logs

mkdir -p /home/qun/data/flume/data

mkdir -p /home/qun/data/flume/checkpoint


下載flume最新的包:

wget 
tar -zxvf apache-flume-1.6.0-bin.tar.gz


在slave3,slave4配置collectors

touch $FLUME_HOME/conf/server.conf

內(nèi)容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/home/qun/data/flume/checkpoint
a1.channels.c1.dataDirs=/home/qun/data/flume/data
# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = slave3
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = SLAVE3
a1.sources.r1.channels = c1
#set sink to kafka
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic=mytopic
a1.sinks.k1.brokerList=kafkahost:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=100
a1.sinks.k1.channel=c1

 


在slave0,slave1,slave2配置agent

touch $FLUME_HOME/conf/client.conf

內(nèi)容如下

agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1 
#set channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir=/home/qun/data/flume/checkpoint
agent1.channels.c1.dataDirs=/home/qun/data/flume/data
agent1.sources.r1.channels = c1
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir=/home/qun/data/flume/logs
agent1.sources.r1.fileHeader = false
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = slave3
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = slave4
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000


在slave3,slave4上啟動collecters

flume-ng agent -n a1 -c conf -f /home/qun/apache-flume-1.6.0-bin/conf/server.conf -Dflume.root.logger=DEBUG,console


在slave0,slave1,slave2上啟動agent

flume-ng agent -n agent1 -c conf -f /home/qun/apache-flume-1.6.0-bin/conf/client.conf -Dflume.root.logger=DEBUG,console


測試功能


echo "hello flume">>/home/qun/data/flume/logs/test.txt

collector slave3 接收到agent的日志:

16/05/26 12:44:24 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2
16/05/26 12:44:24 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464235734894, queueSize: 0, queueHead: 0
16/05/26 12:44:24 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 786 logWriteOrderID: 1464235734894
16/05/26 12:44:24 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1
16/05/26 12:44:24 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1.meta
16/05/26 12:44:54 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2
16/05/26 12:44:54 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464235734901, queueSize: 0, queueHead: 0
16/05/26 12:44:54 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 1179 logWriteOrderID: 1464235734901



測試collecters Failover

殺死slave3的flume進(jìn)程,kill -9 pid


echo "hello flume">>/home/qun/data/flume/logs/test.txt
collector slave4 接收到agent的日志:
16/05/26 12:08:27 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2
16/05/26 12:08:27 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464234987484, queueSize: 0, queueHead: 0
16/05/26 12:08:27 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 393 logWriteOrderID: 1464234987484
16/05/26 12:08:27 INFO file.LogFile: Closing RandomReader /home/qun/data/flume/data/log-1
16/05/26 12:54:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,host:xiaobin,port:9092 with correlation id 4 for 1 topic(s) Set(mytopic)
16/05/26 12:54:38 INFO producer.SyncProducer: Connected to xiaobin:9092 for producing
16/05/26 12:54:38 INFO producer.SyncProducer: Disconnecting from xiaobin:9092
16/05/26 12:54:38 INFO producer.SyncProducer: Disconnecting from xiaobin:9092
16/05/26 12:54:38 INFO producer.SyncProducer: Connected to xiaobin:9092 for producing
16/05/26 12:54:57 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2
16/05/26 12:54:57 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464234987491, queueSize: 0, queueHead: 0
16/05/26 12:54:57 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 786 logWriteOrderID: 1464234987491
16/05/26 12:54:57 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1
16/05/26 12:54:57 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1.meta


一會兒再寫····


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI