溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flume 入門

發(fā)布時(shí)間:2020-06-26 03:10:50 來源:網(wǎng)絡(luò) 閱讀:567 作者:灰白世界 欄目:大數(shù)據(jù)

1Flume概述

1.1 定義

FlumeCloudera提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng);

Flume基于流式架構(gòu),靈活簡(jiǎn)單。

1.2 特點(diǎn)

可以和任意存儲(chǔ)進(jìn)程集成

輸入的的數(shù)據(jù)速率大于寫入目的存儲(chǔ)的速率,Flume會(huì)進(jìn)行緩沖,減小HDFS的壓力

Flume中的事務(wù)基于Channel,使用了兩個(gè)事務(wù)模型(sender+ receiver),確保消息被可靠發(fā)送

Flume使用兩個(gè)獨(dú)立的事務(wù)分別負(fù)責(zé)從SoucrceChannel,以及從ChannelSink 的事件傳遞。一旦事務(wù)中所有的數(shù)據(jù)全部成功提交到Channel,那么Source才認(rèn)為該數(shù)據(jù)讀取完成,同理,只有成功被Sink寫出去的數(shù)據(jù),才會(huì)從Channel中移除

1.3 組成架構(gòu)

Flume 入門

1.3.1Agent

Agent是一個(gè)JVM進(jìn)程,它以事件的形式將數(shù)據(jù)從源頭傳遞到目的地

Agent主要由Source、Channel、Sink組成

1.3.2Source

Source是負(fù)責(zé)接收數(shù)據(jù)到Agent的組件,可以處理各種類型,包括avro、thriftexec、jms、spooling directory、netcatsequence generator、syslog、http、legacy

1.3.3Channel

Channel是位于SourceSink之間的緩沖區(qū),因此,Channel允許SourceSink運(yùn)作在不同的速率上,Channel是線程安全的,可以同時(shí)處理幾個(gè)Source的寫入操作和幾個(gè)Sink的讀取操作。

Flume自帶兩種Channel

Memory Channel:內(nèi)存中的隊(duì)列速度快,適合在不需要關(guān)系數(shù)據(jù)丟失的情境下使用

File Channel:將所有事件寫入磁盤,因此在程序關(guān)閉或機(jī)器宕機(jī)的情況下不會(huì)丟失數(shù)據(jù)

1.3.4Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,并將這些事件批量寫入到存儲(chǔ)或索引系統(tǒng)、或者被發(fā)送到另一個(gè)Flume Agent。

Sink是完全事務(wù)性的,在從Channel批量刪除數(shù)據(jù)之前,每個(gè)SinkChannel啟動(dòng)一個(gè)事務(wù),批量事件一旦成功寫出到存儲(chǔ)系統(tǒng)或下一個(gè)Flume AgentSink就利用Channel提交事務(wù),事務(wù)一旦被提交,該Channel從自己的內(nèi)部緩沖區(qū)刪除事件。

Sink組件目的地包括hdfslogger、avro、thrift、ipcfile、null、HBase、solr、自定義。

1.3.5Event

傳輸單元,Flume數(shù)據(jù)傳輸?shù)幕締卧允录男问綄?shù)據(jù)從源頭送至目的地。

Event由可選的header和載有數(shù)據(jù)的一個(gè)byte array構(gòu)成,Header是容納了key-value字符串對(duì)的HashMap

通常一條數(shù)據(jù)就是一個(gè) Event,每2048個(gè)字節(jié)劃分一個(gè)Event。

1.4 拓?fù)浣Y(jié)構(gòu)

Flume 入門

這種模式是將多個(gè)Flume給順序連接起來了,從最初的Source開始到最終Sink傳送的目的存儲(chǔ)系統(tǒng),此模式不建議橋接過多的Flume數(shù)量, Flume數(shù)量過多不僅會(huì)影響傳輸速率,而且一旦傳輸過程中某個(gè)節(jié)點(diǎn)Flume宕機(jī),會(huì)影響整個(gè)傳輸系統(tǒng)。

Flume 入門

Flum支持將事件流向一個(gè)或者多個(gè)目的地,這種模式將數(shù)據(jù)源復(fù)制到多個(gè)Channel中,每個(gè)Channel都有相同的數(shù)據(jù),Sink可以選擇傳送的不同的目的地。

Flume 入門

Flume支持使用將多個(gè)Sink邏輯上分到一個(gè)Sink組,Flume將數(shù)據(jù)發(fā)送到不同的Sink,主要解決負(fù)載均衡和故障轉(zhuǎn)移問題。

Flume 入門

這種模式是我們最常見的,也非常實(shí)用,日常web應(yīng)用通常分布在上百個(gè)服務(wù)器,大者甚至上千個(gè)、上萬個(gè)服務(wù)器,產(chǎn)生的日志,處理起來也非常麻煩,用Flume的這種組合方式能很好的解決這一問題,每臺(tái)服務(wù)器部署一個(gè)Flume采集日志,傳送到一個(gè)集中收集日志的Flume,再由此Flume上傳到 hdfs、hive、hbase、jms等進(jìn)行日志分析。

1.5Agent原理

Flume 入門

2Flume部署

1、解壓apache-flume-1.7.0-bin.tar.gz/opt/module目錄下

2、修改apache-flume-1.7.0-bi的名稱為flume

3、將flume/conf下的flume-env.sh.template文件修改為flume-env.sh,并配置flume-env.sh中的JAVA_HOME

3 企業(yè)開發(fā)案例

3.1 監(jiān)控端口數(shù)據(jù)

需求分析:

服務(wù)端監(jiān)聽本機(jī)44444端口

服務(wù)端使用netcat工具向44444端口發(fā)送消息

最后將數(shù)據(jù)展示在控制臺(tái)上

實(shí)現(xiàn)步驟:

1、在job文件夾下創(chuàng)建Agent配置文件flume-netcat-logger.conf

[djm@hadoop102 job]$ vim flume-netcat-logger.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console

參數(shù)說明:

--conf conf/表示配置文件存儲(chǔ)在conf/目錄

--name a1表示給 Agent 起名為a1

--conf-file job/flume-netcat.conf Flume本次啟動(dòng)讀取的配置文件是在job文件夾下的 flume-telnet.conf文件

-Dflume.root.logger==INFO,console -D表示Flume運(yùn)行時(shí)動(dòng)態(tài)修改flume.root.logger參數(shù)屬性值,并將控制臺(tái)日志打印級(jí)別設(shè)置為INFO級(jí)別

3.2 實(shí)時(shí)讀取本地文件到HDFS

需求分析:

實(shí)時(shí)監(jiān)控Hive日志,并上傳到HDFS

實(shí)現(xiàn)步驟:

1、在job文件夾下創(chuàng)建Agent配置文件flume-file-hdfs.conf

[djm@hadoop102 job]$ vim flume-file-hdfs.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf

注意:

要想讀取Linux系統(tǒng)中的文件,就得按照Linux命令的規(guī)則執(zhí)行命令,由于Hive日志在Linux系統(tǒng)中所以讀取文件的類型選擇:execexecute執(zhí)行的意思。表示執(zhí)行Linux命令來讀取文件。

3.3 實(shí)時(shí)讀取目錄文件到 HDFS

需求分析:

使用Flume監(jiān)聽整個(gè)目錄的文件

實(shí)現(xiàn)步驟:

1、在job文件夾下創(chuàng)建Agent配置文件flume-dir-hdfs.conf

[djm@hadoop102 job]$ vim flume-dir-hdfs.conf

2、添加如下內(nèi)容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

3、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf

注意:

不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件

3.4 單數(shù)據(jù)源多出口案例(選擇器)

需求分析:

使用Flume-1監(jiān)控文件變動(dòng),Flume-1將變動(dòng)內(nèi)容傳遞給Flume-2

Flume-2負(fù)責(zé)存儲(chǔ)到HDFS

同時(shí)Flume-1將變動(dòng)內(nèi)容傳遞給Flume-3,Flume-3負(fù)責(zé)輸出到Local FileSystem

1、在group1文件夾下創(chuàng)建Agent配置文件flume-file-flume.conf

[djm@hadoop102 group1]$ vim flume-file-flume.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個(gè)數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

3、在group1文件夾下創(chuàng)建Agent配置文件flume-flume-hdfs.conf

[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf

4、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個(gè)數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group1文件夾下創(chuàng)建 Agent 配置文件flume-flume-dir.conf

[djm@hadoop102 group1]$ vim flume-flume-dir.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf

注意:

Avro是一種語言無關(guān)的數(shù)據(jù)序列化和RPC框架

輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在,并不會(huì)創(chuàng)建新的目錄

必須先啟動(dòng)Sink存在的job

3.5 單數(shù)據(jù)源多出口案例(Sink組)

需求分析:

使用Flume-1監(jiān)控端口數(shù)據(jù),Flume-1將變動(dòng)內(nèi)容傳遞給Flume-2

Flume-2負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上

同時(shí)Flume-1將變動(dòng)內(nèi)容傳遞給Flume-3,Flume-3也負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上

實(shí)現(xiàn)步驟:

1、在group2文件夾下創(chuàng)建Agent配置文件flume-netcat-flume.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

3、在group2文件夾下創(chuàng)建Agent配置文件flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在 group2文件夾下創(chuàng)建Agent配置文件flume-flume-console2.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf

3.6 多數(shù)據(jù)源匯總

需求分析:

hadoop103上的Flume-1監(jiān)控文件/opt/module/group.log

hadoop102上的Flume-2監(jiān)控某一個(gè)端口的數(shù)據(jù)流

Flume-1Flume-2將數(shù)據(jù)發(fā)送給hadoop104上的Flume-3Flume-3將最終數(shù)據(jù)打印到控制臺(tái)

實(shí)現(xiàn)步驟:

1、在group3文件夾下創(chuàng)建Agent配置文件flume1-logger-flume.conf

[djm@hadoop102 group3]$ vim flume1-logger-flume.conf 

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、在group3文件夾下創(chuàng)建Agent配置文件flume2-netcat-flume.conf

[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf

4、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group3文件夾下創(chuàng)建Agent配置文件flume3-flume-logger.conf

[djm@hadoop102 group3]$ vim flume3-flume-logger.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

7、分發(fā)配置文件

[djm@hadoop102 group3]$ xsync /opt/module/flume/job

8、啟動(dòng)任務(wù)

[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf

4Ganglia部署

1、安裝httpd服務(wù)與php

yum -y install httpd php

2、安裝其他依賴

yum -y install rrdtool perl-rrdtool rrdtool-devel

3、安裝ganglia

rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web 

4、修改ganglia配置文件

vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>
  # Require local
  Require all granted
  # Require ip 10.1.2.3
  # Require host example.org
</Location> 

特別注意:以下配置是不能起作用的

<Location /ganglia>
  Order deny,allow
  Allow from all
</Location> 

5、修改gmetad配置文件

vim /etc/ganglia/gmetad.conf 
data_source "hadoop102" 192.168.1.102

6、修改gmond配置文件

vim /etc/ganglia/gmond.conf 
cluster {
  #name = "unspecified"
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel { 
#bind_hostname = yes # Highly recommended, soon to be default. 
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
  #mcast_join = 239.2.11.71
  host = 192.168.10.102
  port = 8649
  ttl = 1
}

/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
  #mcast_join = 239.2.11.71
  port = 8649
  #bind = 239.2.11.71
  bind = 192.168.10.102
  retry_bind = true 

# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
} 

6、查看SELinux狀態(tài)

sestatus

如果不是disabled,需修改以下配置文件:

vim /etc/selinux/config

或者臨時(shí)關(guān)閉SELinux

setenforce 0

7、啟動(dòng)ganglia

systemctl start httpd
systemctl start gmetad 
systemctl start gmond

8、打開瀏覽器訪問

http://hadoop102/ganglia/

如果完成以上操作仍出現(xiàn)權(quán)限不足錯(cuò)誤,可修改/var/lib/ganglia目錄的權(quán)限嘗試

chmod -R 777 /var/lib/ganglia

5 自定義Source

需求分析:

Flume 入門

編碼實(shí)現(xiàn):

1、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義配置文件將來要讀取的字段
    private Long delay;

    private String field;

    /**
     * 接收數(shù)據(jù),將數(shù)據(jù)封裝成一個(gè)個(gè)event,寫入channel
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        HashMap<String, String> hearderMap  = new HashMap<>();
        SimpleEvent event = new SimpleEvent();
            try {
                for (int i = 0; i < 5; i++) {
                    event.setHeaders(hearderMap);
                    event.setBody((field + i).getBytes());
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 讀取配置文件
     * @param context
     */
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "hello");
    }
}

3、打包測(cè)試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創(chuàng)建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysource.conf

添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

6 自定義Sink

需求分析:

Flume 入門

編碼實(shí)現(xiàn):

1、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            Event event;
            transaction.begin();
            while ((event = channel.take()) == null) {
                Thread.sleep(200);
            }
            LOG.info(prefix + new String(event.getBody()) + suffix);
            transaction.commit();
            status = Status.READY;
        } catch (Throwable e) {
            transaction.rollback();
            status = Status.BACKOFF;
            if (e instanceof Error)
                throw (Error) e;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix");
    }
}

3、打包測(cè)試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創(chuàng)建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysink.conf

添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

7Flume參數(shù)調(diào)優(yōu)

7.1Source

增加Source個(gè)數(shù)可以增大Source的讀取數(shù)據(jù)的能力,例如:當(dāng)某一個(gè)目錄產(chǎn)生的文件過多時(shí)需要將這個(gè)文件目錄拆分成多個(gè)文件目錄,同時(shí)配置好多個(gè)Source以保證Source有足夠的能力獲取到新產(chǎn)生的數(shù)據(jù)。

batchSize參數(shù)決定Source一次批量運(yùn)輸?shù)?code>Channel的Event條數(shù),適當(dāng)調(diào)大這個(gè)參數(shù)可以提高Source搬運(yùn)EventChannel時(shí)的性能。

7.2Channel

Type選擇Memory Channel時(shí)Channel的性能最好,但是如果Flume進(jìn)程意外掛掉可能會(huì)丟失數(shù)據(jù)

Type選擇File Channel時(shí)Channel的容錯(cuò)性更好,但是性能上會(huì)比Memory Channel差,使用File Channel時(shí)`dataDirs 配置多個(gè)不同盤下的目錄可以提高性能。

Capacity參數(shù)決定Channel可容納最大的Event條數(shù),TransactionCapacity 參數(shù)決定每次SourceChannel里面寫的最大Event條數(shù)和每次SinkChannel里面讀的最大Event條數(shù),TransactionCapacity需要大于SourceSinkbatchSize參數(shù)。

7.3Sink

增加Sink的個(gè)數(shù)可以增加Sink消費(fèi)Event的能力,Sink也不是越多越好夠用就行,過多的Sink會(huì)占用系統(tǒng)資源,造成系統(tǒng)資源不必要的浪費(fèi)。

batchSize參數(shù)決定Sink一次批量從Channel讀取的Event條數(shù),適當(dāng)調(diào)大這個(gè)參數(shù)可以提高SinkChannel搬出Event的性能。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI