您好,登錄后才能下訂單哦!
Flume
概述Flume
是Cloudera
提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng);
Flume
基于流式架構(gòu),靈活簡(jiǎn)單。
可以和任意存儲(chǔ)進(jìn)程集成
輸入的的數(shù)據(jù)速率大于寫入目的存儲(chǔ)的速率,Flume
會(huì)進(jìn)行緩沖,減小HDFS
的壓力
Flume
中的事務(wù)基于Channel
,使用了兩個(gè)事務(wù)模型(sender
+ receiver
),確保消息被可靠發(fā)送
Flume
使用兩個(gè)獨(dú)立的事務(wù)分別負(fù)責(zé)從Soucrce
到Channel
,以及從Channel
到Sink
的事件傳遞。一旦事務(wù)中所有的數(shù)據(jù)全部成功提交到Channel
,那么Source
才認(rèn)為該數(shù)據(jù)讀取完成,同理,只有成功被Sink
寫出去的數(shù)據(jù),才會(huì)從Channel
中移除
Agent
Agent
是一個(gè)JVM
進(jìn)程,它以事件的形式將數(shù)據(jù)從源頭傳遞到目的地
Agent
主要由Source
、Channel
、Sink
組成
Source
Source
是負(fù)責(zé)接收數(shù)據(jù)到Agent
的組件,可以處理各種類型,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
Channel
Channel
是位于Source
和Sink
之間的緩沖區(qū),因此,Channel
允許Source
和Sink
運(yùn)作在不同的速率上,Channel
是線程安全的,可以同時(shí)處理幾個(gè)Source
的寫入操作和幾個(gè)Sink
的讀取操作。
Flume
自帶兩種Channel
:
Memory Channel
:內(nèi)存中的隊(duì)列速度快,適合在不需要關(guān)系數(shù)據(jù)丟失的情境下使用
File Channel
:將所有事件寫入磁盤,因此在程序關(guān)閉或機(jī)器宕機(jī)的情況下不會(huì)丟失數(shù)據(jù)
Sink
Sink
不斷地輪詢Channel
中的事件且批量地移除它們,并將這些事件批量寫入到存儲(chǔ)或索引系統(tǒng)、或者被發(fā)送到另一個(gè)Flume Agent
。
Sink
是完全事務(wù)性的,在從Channel
批量刪除數(shù)據(jù)之前,每個(gè)Sink
用Channel
啟動(dòng)一個(gè)事務(wù),批量事件一旦成功寫出到存儲(chǔ)系統(tǒng)或下一個(gè)Flume Agent
,Sink
就利用Channel
提交事務(wù),事務(wù)一旦被提交,該Channel
從自己的內(nèi)部緩沖區(qū)刪除事件。
Sink
組件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、null
、HBase
、solr
、自定義。
Event
傳輸單元,Flume
數(shù)據(jù)傳輸?shù)幕締卧允录男问綄?shù)據(jù)從源頭送至目的地。
Event
由可選的header
和載有數(shù)據(jù)的一個(gè)byte array
構(gòu)成,Header
是容納了key-value
字符串對(duì)的HashMap
。
通常一條數(shù)據(jù)就是一個(gè) Event
,每2048
個(gè)字節(jié)劃分一個(gè)Event
。
這種模式是將多個(gè)Flume
給順序連接起來了,從最初的Source
開始到最終Sink
傳送的目的存儲(chǔ)系統(tǒng),此模式不建議橋接過多的Flume
數(shù)量, Flume
數(shù)量過多不僅會(huì)影響傳輸速率,而且一旦傳輸過程中某個(gè)節(jié)點(diǎn)Flume
宕機(jī),會(huì)影響整個(gè)傳輸系統(tǒng)。
Flum
支持將事件流向一個(gè)或者多個(gè)目的地,這種模式將數(shù)據(jù)源復(fù)制到多個(gè)Channel
中,每個(gè)Channel
都有相同的數(shù)據(jù),Sink
可以選擇傳送的不同的目的地。
Flume
支持使用將多個(gè)Sink
邏輯上分到一個(gè)Sink
組,Flume
將數(shù)據(jù)發(fā)送到不同的Sink
,主要解決負(fù)載均衡和故障轉(zhuǎn)移問題。
這種模式是我們最常見的,也非常實(shí)用,日常web
應(yīng)用通常分布在上百個(gè)服務(wù)器,大者甚至上千個(gè)、上萬個(gè)服務(wù)器,產(chǎn)生的日志,處理起來也非常麻煩,用Flume
的這種組合方式能很好的解決這一問題,每臺(tái)服務(wù)器部署一個(gè)Flume
采集日志,傳送到一個(gè)集中收集日志的Flume
,再由此Flume
上傳到 hdfs
、hive
、hbase
、jms
等進(jìn)行日志分析。
Agent
原理Flume
部署1、解壓apache-flume-1.7.0-bin.tar.gz
到/opt/module
目錄下
2、修改apache-flume-1.7.0-bi
的名稱為flume
3、將flume/conf
下的flume-env.sh.template
文件修改為flume-env.sh
,并配置flume-env.sh
中的JAVA_HOME
需求分析:
服務(wù)端監(jiān)聽本機(jī)44444
端口
服務(wù)端使用netcat
工具向44444
端口發(fā)送消息
最后將數(shù)據(jù)展示在控制臺(tái)上
實(shí)現(xiàn)步驟:
1、在job
文件夾下創(chuàng)建Agent
配置文件flume-netcat-logger.conf
[djm@hadoop102 job]$ vim flume-netcat-logger.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
參數(shù)說明:
--conf conf/
表示配置文件存儲(chǔ)在conf/
目錄
--name a1
表示給 Agent 起名為a1
--conf-file job/flume-netcat.conf Flume
本次啟動(dòng)讀取的配置文件是在job
文件夾下的 flume-telnet.conf
文件
-Dflume.root.logger==INFO,console -D
表示Flume
運(yùn)行時(shí)動(dòng)態(tài)修改flume.root.logger
參數(shù)屬性值,并將控制臺(tái)日志打印級(jí)別設(shè)置為INFO
級(jí)別
HDFS
需求分析:
實(shí)時(shí)監(jiān)控Hive
日志,并上傳到HDFS
中
實(shí)現(xiàn)步驟:
1、在job
文件夾下創(chuàng)建Agent
配置文件flume-file-hdfs.conf
[djm@hadoop102 job]$ vim flume-file-hdfs.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf
注意:
要想讀取Linux
系統(tǒng)中的文件,就得按照Linux
命令的規(guī)則執(zhí)行命令,由于Hive
日志在Linux
系統(tǒng)中所以讀取文件的類型選擇:exec
即execute
執(zhí)行的意思。表示執(zhí)行Linux
命令來讀取文件。
HDFS
需求分析:
使用Flume
監(jiān)聽整個(gè)目錄的文件
實(shí)現(xiàn)步驟:
1、在job
文件夾下創(chuàng)建Agent
配置文件flume-dir-hdfs.conf
[djm@hadoop102 job]$ vim flume-dir-hdfs.conf
2、添加如下內(nèi)容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
3、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf
注意:
不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件
需求分析:
使用Flume-1
監(jiān)控文件變動(dòng),Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-2
Flume-2
負(fù)責(zé)存儲(chǔ)到HDFS
同時(shí)Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-3
,Flume-3
負(fù)責(zé)輸出到Local FileSystem
1、在group1
文件夾下創(chuàng)建Agent
配置文件flume-file-flume.conf
[djm@hadoop102 group1]$ vim flume-file-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink端的avro是一個(gè)數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
3、在group1
文件夾下創(chuàng)建Agent
配置文件flume-flume-hdfs.conf
[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
4、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source端的avro是一個(gè)數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group1
文件夾下創(chuàng)建 Agent 配置文件flume-flume-dir.conf
[djm@hadoop102 group1]$ vim flume-flume-dir.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
注意:
Avro
是一種語言無關(guān)的數(shù)據(jù)序列化和RPC
框架
輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在,并不會(huì)創(chuàng)建新的目錄
必須先啟動(dòng)Sink
存在的job
Sink
組)需求分析:
使用Flume-1
監(jiān)控端口數(shù)據(jù),Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-2
Flume-2
負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上
同時(shí)Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-3
,Flume-3
也負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上
實(shí)現(xiàn)步驟:
1、在group2
文件夾下創(chuàng)建Agent
配置文件flume-netcat-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
3、在group2
文件夾下創(chuàng)建Agent
配置文件flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在 group2
文件夾下創(chuàng)建Agent
配置文件flume-flume-console2.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
需求分析:
hadoop103
上的Flume-1
監(jiān)控文件/opt/module/group.log
hadoop102
上的Flume-2
監(jiān)控某一個(gè)端口的數(shù)據(jù)流
Flume-1
與Flume-2
將數(shù)據(jù)發(fā)送給hadoop104
上的Flume-3
,Flume-3
將最終數(shù)據(jù)打印到控制臺(tái)
實(shí)現(xiàn)步驟:
1、在group3
文件夾下創(chuàng)建Agent
配置文件flume1-logger-flume.conf
[djm@hadoop102 group3]$ vim flume1-logger-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、在group3
文件夾下創(chuàng)建Agent
配置文件flume2-netcat-flume.conf
[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
4、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group3
文件夾下創(chuàng)建Agent
配置文件flume3-flume-logger.conf
[djm@hadoop102 group3]$ vim flume3-flume-logger.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
7、分發(fā)配置文件
[djm@hadoop102 group3]$ xsync /opt/module/flume/job
8、啟動(dòng)任務(wù)
[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
Ganglia
部署1、安裝httpd
服務(wù)與php
yum -y install httpd php
2、安裝其他依賴
yum -y install rrdtool perl-rrdtool rrdtool-devel
3、安裝ganglia
rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web
4、修改ganglia
配置文件
vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
# Require local
Require all granted
# Require ip 10.1.2.3
# Require host example.org
</Location>
特別注意:以下配置是不能起作用的
<Location /ganglia>
Order deny,allow
Allow from all
</Location>
5、修改gmetad
配置文件
vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102
6、修改gmond
配置文件
vim /etc/ganglia/gmond.conf
cluster {
#name = "unspecified"
name = "hadoop102"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
#mcast_join = 239.2.11.71
host = 192.168.10.102
port = 8649
ttl = 1
}
/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
#mcast_join = 239.2.11.71
port = 8649
#bind = 239.2.11.71
bind = 192.168.10.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
6、查看SELinux
狀態(tài)
sestatus
如果不是disabled
,需修改以下配置文件:
vim /etc/selinux/config
或者臨時(shí)關(guān)閉SELinux
:
setenforce 0
7、啟動(dòng)ganglia
systemctl start httpd
systemctl start gmetad
systemctl start gmond
8、打開瀏覽器訪問
http://hadoop102/ganglia/
如果完成以上操作仍出現(xiàn)權(quán)限不足錯(cuò)誤,可修改/var/lib/ganglia
目錄的權(quán)限嘗試
chmod -R 777 /var/lib/ganglia
Source
需求分析:
編碼實(shí)現(xiàn):
1、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義配置文件將來要讀取的字段
private Long delay;
private String field;
/**
* 接收數(shù)據(jù),將數(shù)據(jù)封裝成一個(gè)個(gè)event,寫入channel
* @return
* @throws EventDeliveryException
*/
public Status process() throws EventDeliveryException {
HashMap<String, String> hearderMap = new HashMap<>();
SimpleEvent event = new SimpleEvent();
try {
for (int i = 0; i < 5; i++) {
event.setHeaders(hearderMap);
event.setBody((field + i).getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (InterruptedException e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
* 讀取配置文件
* @param context
*/
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "hello");
}
}
3、打包測(cè)試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創(chuàng)建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysource.conf
添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
Sink
需求分析:
編碼實(shí)現(xiàn):
1、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
Event event;
transaction.begin();
while ((event = channel.take()) == null) {
Thread.sleep(200);
}
LOG.info(prefix + new String(event.getBody()) + suffix);
transaction.commit();
status = Status.READY;
} catch (Throwable e) {
transaction.rollback();
status = Status.BACKOFF;
if (e instanceof Error)
throw (Error) e;
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
prefix = context.getString("prefix");
suffix = context.getString("suffix");
}
}
3、打包測(cè)試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創(chuàng)建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysink.conf
添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
Flume
參數(shù)調(diào)優(yōu)Source
增加Source
個(gè)數(shù)可以增大Source
的讀取數(shù)據(jù)的能力,例如:當(dāng)某一個(gè)目錄產(chǎn)生的文件過多時(shí)需要將這個(gè)文件目錄拆分成多個(gè)文件目錄,同時(shí)配置好多個(gè)Source
以保證Source
有足夠的能力獲取到新產(chǎn)生的數(shù)據(jù)。
batchSize
參數(shù)決定Source
一次批量運(yùn)輸?shù)?code>Channel的Event
條數(shù),適當(dāng)調(diào)大這個(gè)參數(shù)可以提高Source
搬運(yùn)Event
到Channel
時(shí)的性能。
Channel
Type
選擇Memory Channel
時(shí)Channel
的性能最好,但是如果Flume
進(jìn)程意外掛掉可能會(huì)丟失數(shù)據(jù)
Type
選擇File Channel
時(shí)Channel
的容錯(cuò)性更好,但是性能上會(huì)比Memory Channel
差,使用File Channel
時(shí)`dataDirs 配置多個(gè)不同盤下的目錄可以提高性能。
Capacity
參數(shù)決定Channel
可容納最大的Event
條數(shù),TransactionCapacity
參數(shù)決定每次Source
往Channel
里面寫的最大Event
條數(shù)和每次Sink
從Channel
里面讀的最大Event
條數(shù),TransactionCapacity
需要大于Source
和Sink
的batchSize
參數(shù)。
Sink
增加Sink
的個(gè)數(shù)可以增加Sink
消費(fèi)Event
的能力,Sink
也不是越多越好夠用就行,過多的Sink
會(huì)占用系統(tǒng)資源,造成系統(tǒng)資源不必要的浪費(fèi)。
batchSize
參數(shù)決定Sink
一次批量從Channel
讀取的Event
條數(shù),適當(dāng)調(diào)大這個(gè)參數(shù)可以提高Sink
從Channel
搬出Event
的性能。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。