溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flume如何安裝并整合kafka

發(fā)布時間:2021-12-04 11:05:05 來源:億速云 閱讀:220 作者:小新 欄目:大數(shù)據(jù)

這篇文章給大家分享的是有關(guān)flume如何安裝并整合kafka的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

flume

Flume agent之間的通信(參考圖書)

   flume內(nèi)置了專門的RPC sink-source對來處理agent之間的數(shù)據(jù)傳輸。
   source是負責接收數(shù)據(jù)到Flume Agent的組件。包括Avro Source、Thrift source 、HTTP Source、Spooling Directory Source、Syslog Source、Exec Source、JMS Source等。
         channel是位于source和sink之間的緩沖區(qū),是保證數(shù)據(jù)不丟失的關(guān)鍵。
         sink從Channel中讀取事件,每一個sink只能從一個Channel鐘讀取事件,必須給每一個sink配置Channel,否則會從agent中移除。

安裝flume

下載安裝

cd /data/
wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar axf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin

修改環(huán)境變量

vim /etc/profile

#FLUSM
export FLUME_HOME=/data/apache-flume-1.8.0-bin
export PATH=$PATH:${FLUME_HOME}/bin
export HADOOP_HOME=/data/hadoop

source /etc/profile

修改配置文件

cd ${FLUME_HOME}/conf/
cp flume-env.sh.template flume-env.sh
修改 flume-env.sh

export JAVA_HOME=/usr/local/jdk
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
export HADOOP_HOME=/data/hadoop

驗證安裝
flume-ng version
flume如何安裝并整合kafka

使用flume

單節(jié)點agent傳輸信息

cd ${FLUME_HOME}/conf/
添加配置文件
vim avro.conf

#Name the components on this agent
agent.sources = avroSrc
agent.channels = avroChannel

#Describe/configure the source
agent.sources.avroSrc.type = netcat
agent.sources.avroSrc.bind = localhost
agent.sources.avroSrc.port = 62000

#Describe the sink
agent.sinks.avroSink.type = logger

#Use a channel that buffers events in memory
agent.channels.avroChannel.type = memory
agent.channels.avroChannel.capacity = 1000
agent.channels.avroChannel.transactionCapacity = 100

#Bind the source and sink to the channel
agent.sinks = avroSink
agent.sources.avroSrc.channels = avroChannel
agent.sinks.avroSink.channel = avroChannel

“#測試agent.sources.avroSrc.type用avro,然后報錯
#org.apache.avro.AvroRuntimeException: Excessively large list #allocation request detected: 1863125517 items! Connection #closed”

運行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/avro.conf -n agent -Dflume.root.logger=INFO,console

使用Telnet連接測試
telnet localhost 6200
flume如何安裝并整合kafka      
查看
flume如何安裝并整合kafka

exec監(jiān)控本地文件

cd ${FLUME_HOME}/conf/
添加配置文件
vim exec.conf

#example.conf: A single-node Flume configuration

#Name the components on this agent
agentexec.sources = avroexec
agentexec.sinks = sinkexec
agentexec.channels = channelexec

#Describe/configure the sources

#Describe/configure the source
agentexec.sources.avroexec.bind = localhost
agentexec.sources.avroexec.port = 630000
agentexec.sources.avroexec.type = exec
agentexec.sources.avroexec.command = tail -F /tmp/testexec.log
#Describe the sink
agentexec.sinks.sinkexec.type = logger

#Use a channel which buffers events in memory
agentexec.channels.channelexec.type = memory
agentexec.channels.channelexec.capacity = 100000
agentexec.channels.channelexec.transactionCapacity = 10000

#Bind the source and sink to the channel
agentexec.sources.avroexec.channels = channelexec
agentexec.sinks.sinkexec.channel = channelexec

運行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/exec.conf --name agentexec -Dflume.root.logger=INFO,console

測試
flume如何安裝并整合kafka
flume如何安裝并整合kafka
尷尬,只獲取到了一部分(暫時沒有占到解決方法)

spooldir整合kafka監(jiān)控日志

前提:安裝kafka集群
cd ${FLUME_HOME}/conf/
添加配置文件
vim single_agent.conf

#agent name a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1

#set source
#“測試使用將數(shù)據(jù)放在了/tmp目錄下,注意設置”
a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir=/tmp/spooldir
a11.sources.source1.fileHeader = false

#set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= spooldir

#set channel
#“測試使用將數(shù)據(jù)放在了/tmp目錄下,注意設置”
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /tmp/flume_data/checkpoint
a1.channels.channel1.dataDirs= /tmp/flume_data/data

#bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

創(chuàng)建文件存放目錄

mkdir -pv /tmp/spooldir
mkdir -pv /tmp/flume_data/checkpoint
mkdir -pv /tmp/flume_data/data

(所有節(jié)點)啟動kafka集群

kafka-server-start.sh  /data/kafka_2.11-1.0.0/config/server.properties

創(chuàng)建kafka的topic

kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic spooldir --replication-factor 1 --partitions 3

查看topic

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

創(chuàng)建kafka的consumer

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic spooldir --from-beginning

(新窗口)啟動flume的agent

flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console

寫入測試
[root@master conf]# echo "hello ,test flume spooldir source" >> /tmp/spooldir/spool.txt
flume-ng信息
flume如何安裝并整合kafka
kafka信息
flume如何安裝并整合kafka

將日志信息寫入hbase

前提:安裝hbase集群
cd ${FLUME_HOME}/conf/
mkdir hbase && cd hbase
添加配置文件,這里需要兩個agent端
hbase-back.conf用于收集本地數(shù)據(jù),hbase-front.conf用于將數(shù)據(jù)寫入hbase
vim hbase-back.conf

agent.sources =backsrc
agent.channels=memoryChannel
agent.sinks =remotesink
#Describe the sources
agent.sources.backsrc.type = exec
agent.sources.backsrc.command = tail -F /tmp/test/data/data.txt
agent.sources.backsrc.checkperiodic = 1000
agent.sources.backsrc.channels=memoryChannel
#Describe the channels
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.keep-alive = 30
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 1000
#Describe the sinks
agent.sinks.remotesink.type = avro
agent.sinks.remotesink.hostname = master
agent.sinks.remotesink.port = 9999
agent.sinks.remotesink.channel= memoryChannel

vim hbase-front.conf

agent.sources = frontsrc
agent.channels = memoryChannel
agent.sinks = fileSink
#Describe the sources
agent.sources.frontsrc.type = avro
agent.sources.frontsrc.bind = master
agent.sources.frontsrc.port = 9999
agent.sources.frontsrc.channels = memoryChannel
#Describe the channels
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.keep-alive = 30
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity =1000
#Describe the sinks
agent.sinks.fileSink.type = hbase
agent.sinks.fileSink.channel=memoryChannel
agent.sinks.fileSink.table = access_log
agent.sinks.fileSink.columnFamily = t
agent.sinks.fileSink.batchSize= 50
agent.sinks.fileSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.fileSink.zookeeperQuorum = master:2181,slave1:2181,slave2:2181
agent.sinks.fileSink.znodeParent = /hbase
agent.sinks.fileSink.timeout = 90000

創(chuàng)建本地文件和目錄
mkdir -pv /tmp/test/data && touch /tmp/test/data/data.txt
創(chuàng)建hbase中的表
hbase shell
創(chuàng)建表
create 'access_log','t'
查看
list
flume如何安裝并整合kafka
啟動back agent

 flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-back.conf --name agent  -Dflume.root.logger=INFO,console

啟動后會報錯

18/01/22 22:29:28 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: 192.168.3.58, port: 9999
org.apache.flume.FlumeException: NettyAvroRpcClient { host: master, port: 9999 }: RPC connection error

這是因為avro連接沒有完成,現(xiàn)在只啟動了sink端,沒有source端,等啟動了front后就會顯示連接上了
flume如何安裝并整合kafka
啟動front agent

flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-front.conf --name agent -Dflume.root.logger=INFO,console

向本地文件中追加內(nèi)容,然后在hbase中查看

echo "hello ,test flush to hbase">>/tmp/test/data/data.txt

寫入的過程中兩個agent不會打印日志
查看hbase中的數(shù)據(jù)

hbase shell
scan "access_log"

flume向hbase中寫入日志會有一定時間的延遲
flume如何安裝并整合kafka

將日志寫入hadoop

原理和寫入hbase一樣,理解了hbase寫入流程就很好理解寫入其它服務了,詳細配置參考官方文檔。
前提:安裝hadoop集群
cd ${FLUME_HOME}/conf/
mkdir hdfs && cd hdfs
添加配置文件,這里需要兩個agent端
hadoop-back.conf用于收集本地數(shù)據(jù),hadoop-front.conf用于將數(shù)據(jù)寫入hadoop
vim hadoop-back.conf

#Namethe components
hadoop.sources= backsrc
hadoop.sinks= fileSink
hadoop.channels= memoryChannel
#Source
hadoop.sources.backsrc.type= spooldir
hadoop.sources.backsrc.spoolDir= /tmp/data/hadoop
hadoop.sources.backsrc.channels= memoryChannel
hadoop.sources.backsrc.fileHeader = true
#Channel
hadoop.channels.memoryChannel.type= memory
hadoop.channels.memoryChannel.keep-alive = 30
hadoop.channels.memoryChannel.capacity = 1000
hadoop.channels.memoryChannel.transactionCapacity = 1000
#Sink
hadoop.sinks.fileSink.type= avro
hadoop.sinks.fileSink.hostname= master
hadoop.sinks.fileSink.port= 10000
hadoop.sinks.fileSink.channel= memoryChannel

vim hadoop-front.conf

#Namethe components
hadoop.sources= frontsrc
hadoop.channels= memoryChannel
hadoop.sinks= remotesink
#Source
hadoop.sources.frontsrc.type= avro
hadoop.sources.frontsrc.bind= master
hadoop.sources.frontsrc.port= 10000
hadoop.sources.frontsrc.channels= memoryChannel
#Channel
hadoop.channels.memoryChannel.type= memory
hadoop.channels.memoryChannel.keep-alive = 30
hadoop.channels.memoryChannel.capacity = 1000
hadoop.channels.memoryChannel.transactionCapacity =1000
#Sink
hadoop.sinks.remotesink.type= hdfs
hadoop.sinks.remotesink.hdfs.path=hdfs://master/flume
hadoop.sinks.remotesink.hdfs.rollInterval = 0
hadoop.sinks.remotesink.hdfs.idleTimeout = 10000
hadoop.sinks.remotesink.hdfs.fileType= DataStream
hadoop.sinks.remotesink.hdfs.writeFormat= Text
hadoop.sinks.remotesink.hdfs.threadsPoolSize = 20
hadoop.sinks.remotesink.channel= memoryChannel

創(chuàng)建本地目錄并修改權(quán)限

mkdir -pv /tmp/data/hadoop && chmod -R 777 /tmp/data/

創(chuàng)建hdfs中的目錄并修改權(quán)限

hadoop fs -mkdir /flume
hadoop fs -chmod 777 /flume
hadoop fs -ls /

flume如何安裝并整合kafka
向本地目錄中寫入文件

echo "hello, test hadoop" >> /tmp/data/hadoop/hadoop.log
echo "hello, test flume" >> /tmp/data/hadoop/flume.log
echo "hello, test helloworld" >> /tmp/data/hadoop/helloworld.log

查看hdfs中的文件和文件信息

hadoop fs -ls /flume
hadoop fs -cat /flume/FlumeData.1516634328510.tmp

flume如何安裝并整合kafka

感謝各位的閱讀!關(guān)于“flume如何安裝并整合kafka”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI