您好,登錄后才能下訂單哦!
首先、這節(jié)水的東西就比較少了,大部分是例子。
Avro端口監(jiān)聽并接收來自外部的Avro客戶流的事件。當(dāng)內(nèi)置Avro 去Sinks另一個(gè)配對(duì)Flume代理,它就可以創(chuàng)建分層采集的拓?fù)浣Y(jié)構(gòu)。官網(wǎng)說的比較繞,當(dāng)然我的翻譯也很弱,其實(shí)就是flume可以多級(jí)代理,然后代理與代理之間用Avro去連接
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,描述就不解釋了。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | FALSE | Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. |
ipFilter | FALSE | Set this to true to enable ipFiltering for netty |
ipFilter.rules | – | Define N netty ipFilter pattern rules with this config. |
官網(wǎng)的例子就不放了,這邊用實(shí)際例子顯示。
[html] view plain copy
#配置文件avro_case2.conf 其實(shí)和第二節(jié)的pull.conf 一模一樣
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.233.128
a1.sources.r1.port = 55555
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console
成功與否就不說明,與第二節(jié)的pull.conf 同。。。
#然后在另一個(gè)終端進(jìn)行測(cè)試
flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log
這個(gè)就是模擬第二節(jié)push代理費(fèi)pull代理發(fā)數(shù)據(jù),這里不寫配置直接命令方式測(cè)試。
發(fā)送事件成功,這里和push代理不一樣的是沒有用spool,所以日志文件名不會(huì)被改名稱。
看接受終端顯示
ok數(shù)據(jù)發(fā)送成功。
ThriftSource 與Avro Source 基本一致。只要把source的類型改成thrift即可,例如a1.sources.r1.type = thrift
比較簡單,不做贅述。
ExecSource的配置就是設(shè)定一個(gè)Unix(Linux)命令,然后通過這個(gè)命令不斷輸出數(shù)據(jù)。如果進(jìn)程退出,Exec Source也一起退出,不會(huì)產(chǎn)生進(jìn)一步的數(shù)據(jù)。
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,描述就不解釋了。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | FALSE | Whether the executed cmd should be restarted if it dies |
logStdErr | FALSE | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
下面是實(shí)際例子顯示。
[html] view plain copy
#配置文件exec_case3.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/logs/test.log
a1.sources.r1.channels = c1
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
這里我們用tail –F命令去一直都日志的尾部。
#敲命令
flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console
這邊會(huì)顯示讀取日志的所有數(shù)據(jù)
上圖是日志,這邊我們繼續(xù)往日志里添加數(shù)據(jù)
echo"looklook5" >> test.log ,會(huì)發(fā)現(xiàn)終端也在輸出數(shù)據(jù)。
官網(wǎng)說:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.
簡單說的,官網(wǎng)JMSsource 就測(cè)試了ActiveMQ,其他的還沒有。下面是官網(wǎng)的例子:
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=jms
a1.sources.r1.channels=c1
a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory=GenericConnectionFactory
a1.sources.r1.providerURL=tcp://mqserver:61616
a1.sources.r1.destinationName=BUSINESS_DATA
a1.sources.r1.destinationType=QUEUE
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,描述就不解釋了
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory shoulld appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
介于這個(gè)源目前還不成熟,那我們等他成熟了再來研究吧,這里偷點(diǎn)懶。
Spooling Directory Source在第二節(jié)的時(shí)候已經(jīng)講過,這里復(fù)述一下:監(jiān)測(cè)配置的目錄下新增的文件,并將文件中的數(shù)據(jù)讀取出來。其中,Spool Source有2個(gè)注意地方,第一個(gè)是拷貝到spool目錄下的文件不可以再打開編輯,第二個(gè)是spool目錄下不可包含相應(yīng)的子目錄。這個(gè)主要用途作為對(duì)日志的準(zhǔn)實(shí)時(shí)監(jiān)控。
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選??蛇x項(xiàng)太多,這邊就加一個(gè)fileSuffix,即文件讀取后添加的后綴名,這個(gè)是可以更改。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
下面給出例子,這個(gè)與第二節(jié)的push.conf 相似
[html] view plain copy
#配置文件:spool_case4.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =spooldir
a1.sources.r1.spoolDir =/tmp/logs
a1.sources.r1.fileHeader= true
a1.sources.r1.channels =c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這里我們監(jiān)控日志目錄/tmp/logs
#敲命令
flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console
終端將數(shù)據(jù)都顯示出來了。我們查看監(jiān)控日志目錄/tmp/logs
被讀取的文件已經(jīng)被加上后綴名,表示已經(jīng)完成讀取。
Netcat source 在某一端口上進(jìn)行偵聽,它將每一行文字變成一個(gè)事件源,也就是數(shù)據(jù)是基于換行符分隔。它的工作就像命令nc -k -l [host] [port] 換句話說,它打開一個(gè)指定端口,偵聽數(shù)據(jù)將每一行文字變成Flume事件,并通過連接通道發(fā)送。
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | TRUE | Respond with an “OK” for every event received |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
實(shí)際例子話,第二節(jié)的第一個(gè)例子就是Netcat source,這里不演示了。
一個(gè)簡單的序列發(fā)生器,不斷產(chǎn)成與事件計(jì)數(shù)器0和1的增量開始。主要用于測(cè)試(官網(wǎng)說),這里也不做贅述。
讀取syslog數(shù)據(jù),并生成Flume 事件。 這個(gè)Source分成三類SyslogTCP Source、
Multiport Syslog TCP Source(多端口)與SyslogUDP Source。其中TCP Source為每一個(gè)用回車(\ n)來分隔的字符串創(chuàng)建一個(gè)新的事件。而UDP Source將整個(gè)消息作為一個(gè)單一的事件。
這個(gè)是最初的Syslog Sources
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,這里可選我省略了。
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
官網(wǎng)案例
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=syslogtcp
a1.sources.r1.port=5140
a1.sources.r1.host=localhost
a1.sources.r1.channels=c1
下面是實(shí)際的例子
[html] view plain copy
#配置文件:syslog_case5.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =syslogtcp
a1.sources.r1.port =50000
a1.sources.r1.host =192.168.233.128
a1.sources.r1.channels =c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這里我們?cè)O(shè)置的偵聽端口為192.168.233.128 50000
#敲命令
flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
打開另一個(gè)終端輸入,往偵聽端口送數(shù)據(jù)
echo "hellolooklook5" | nc 192.168.233.128 50000
然后看之前的終端,將會(huì)有如下顯示:
數(shù)據(jù)已經(jīng)發(fā)送過來了。
這是一個(gè)更新,更快,支持多端口版本的SyslogTCP Source。他不僅僅監(jiān)控一個(gè)端口,還可以監(jiān)控多個(gè)端口。官網(wǎng)配置基本差不多,就是可選配置比較多
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be multiport_syslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
portHeader | – | If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
這里說明下需要注意的是這里ports設(shè)置已經(jīng)取代tcp 的port,這個(gè)千萬注意。還有portHeader這個(gè)可以與后面的interceptors 與 channel selectors自定義邏輯路由使用。
下面是官網(wǎng)例子:
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=multiport_syslogtcp
a1.sources.r1.channels=c1
a1.sources.r1.host=0.0.0.0
a1.sources.r1.ports=10001 10002 10003
a1.sources.r1.portHeader=port
下面是實(shí)際例子
[html] view plain copy
#配置文件:syslog_case6.conf
# Name thecomponents on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.ports = 50000 60000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe thesink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
# Use a channelwhich buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
這里我們偵探192.168.233.128的2個(gè)端口50000與60000
#敲命令
flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
打開另一個(gè)終端輸入,往偵聽端口送數(shù)據(jù)
echo "hellolooklook5" | nc 192.168.233.128 50000
echo "hello looklook6"| nc 192.168.233.128 60000
然后看之前的終端,將會(huì)有如下顯示:
2個(gè)端口的數(shù)據(jù)已經(jīng)發(fā)送過來了。
關(guān)于這個(gè)官網(wǎng)都懶的介紹了,其實(shí)就是與TCP不同的協(xié)議而已。
官網(wǎng)配置與TCP一致,就不說了。下面是官網(wǎng)例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=syslogudp
a1.sources.r1.port=5140
a1.sources.r1.host=localhost
a1.sources.r1.channels=c1
下面是實(shí)際例子
[html] view plain copy
#配置文件:syslog_case7.conf
# Name thecomponents on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe thesink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
# Use a channelwhich buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
打開另一個(gè)終端輸入,往偵聽端口送數(shù)據(jù)
echo "hellolooklook5" | nc –u 192.168.233.128 50000
#在啟動(dòng)的終端查看console輸出
Ok,數(shù)據(jù)已經(jīng)發(fā)送過來了
八、HTTP Source
HTTP Source是HTTP POST和GET來發(fā)送事件數(shù)據(jù)的,官網(wǎng)說GET應(yīng)只用于實(shí)驗(yàn)。Flume 事件使用一個(gè)可插拔的“handler”程序來實(shí)現(xiàn)轉(zhuǎn)換,它必須實(shí)現(xiàn)的HTTPSourceHandler接口。此處理程序需要一個(gè)HttpServletRequest和返回一個(gè)flume 事件列表。
所有在一個(gè)POST請(qǐng)求發(fā)送的事件被認(rèn)為是在一個(gè)事務(wù)里,一個(gè)批量插入flume 通道的行為。
下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選
Property Name | Default | Description |
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
官網(wǎng)例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=5140
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.example.rest.RestHandler
a1.sources.r1.handler.nickname=random props
下面是實(shí)際用例:
[html] view plain copy
#配置文件:http_case8.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= http
a1.sources.r1.port= 50000
a1.sources.r1.channels= c1
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
#敲命令
flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
#我們用生成JSON 格式的POSTrequest發(fā)數(shù)據(jù)
curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000
#在啟動(dòng)的終端查看console輸出
這里headers與body都正常輸出。
九、Twitter 1%firehose Source(實(shí)驗(yàn)的)
官網(wǎng)警告,慎用,說不定下個(gè)版本就木有了
這個(gè)實(shí)驗(yàn)source 是通過時(shí)搜索服務(wù),從Twitter的1%樣本信息中獲取事件數(shù)據(jù)。需要Twitter開發(fā)者賬號(hào)。好吧,對(duì)于400網(wǎng)站,我們無可奈何用不到,就不多解釋了。
十、自定義Source
一個(gè)自定義 Source其實(shí)是對(duì)Source接口的實(shí)現(xiàn)。當(dāng)我們開始flume代理的時(shí)候必須將自定義 Source和相依賴的jar包放到代理的classpath下面。自定義 Source的type就是我們實(shí)現(xiàn)Source接口對(duì)應(yīng)的類全路徑。
這里后面的內(nèi)容里會(huì)詳細(xì)介紹,這里不做贅述。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。