溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flume NG 學(xué)習(xí)筆記(四)Source配置

發(fā)布時(shí)間:2020-06-20 19:49:09 來源:網(wǎng)絡(luò) 閱讀:2143 作者:jackwxh 欄目:開發(fā)技術(shù)

首先、這節(jié)水的東西就比較少了,大部分是例子。

一、Avro Source與Thrift Source

Avro端口監(jiān)聽并接收來自外部的Avro客戶流的事件。當(dāng)內(nèi)置Avro 去Sinks另一個(gè)配對(duì)Flume代理,它就可以創(chuàng)建分層采集的拓?fù)浣Y(jié)構(gòu)。官網(wǎng)說的比較繞,當(dāng)然我的翻譯也很弱,其實(shí)就是flume可以多級(jí)代理,然后代理與代理之間用Avro去連接

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,描述就不解釋了。

Property Name

Default

Description

channels


type

The component type name, needs to be avro

bind

hostname or IP address to listen on

port

Port # to bind to

threads

Maximum number of worker threads to spawn

selector.type



selector.*



interceptors

Space-separated list of interceptors

interceptors.*



compression-type

none

This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource

ssl

FALSE

Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.

keystore

This is the path to a Java keystore file. Required for SSL.

keystore-password

The password for the Java keystore. Required for SSL.

keystore-type

JKS

The type of the Java keystore. This can be “JKS” or “PKCS12”.

ipFilter

FALSE

Set this to true to enable ipFiltering for netty

ipFilter.rules

Define N netty ipFilter pattern rules with this config.

 

官網(wǎng)的例子就不放了,這邊用實(shí)際例子顯示。


[html] view plain copy

  1. #配置文件avro_case2.conf 其實(shí)和第二節(jié)的pull.conf 一模一樣  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = avro  

  9. a1.sources.r1.channels = c1  

  10. a1.sources.r1.bind = 192.168.233.128  

  11. a1.sources.r1.port = 55555  

  12.    

  13. #Describe the sink  

  14. a1.sinks.k1.typelogger  

  15. a1.sinks.k1.channelc1  

  16.    

  17. #Use a channel which buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  



#敲命令

flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console

成功與否就不說明,與第二節(jié)的pull.conf 同。。。

 

#然后在另一個(gè)終端進(jìn)行測(cè)試

flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log

這個(gè)就是模擬第二節(jié)push代理費(fèi)pull代理發(fā)數(shù)據(jù),這里不寫配置直接命令方式測(cè)試。

Flume NG 學(xué)習(xí)筆記(四)Source配置


發(fā)送事件成功,這里和push代理不一樣的是沒有用spool,所以日志文件名不會(huì)被改名稱。

看接受終端顯示

Flume NG 學(xué)習(xí)筆記(四)Source配置


ok數(shù)據(jù)發(fā)送成功。

ThriftSource 與Avro Source 基本一致。只要把source的類型改成thrift即可,例如a1.sources.r1.type = thrift

比較簡單,不做贅述。

二、Exec Source

ExecSource的配置就是設(shè)定一個(gè)Unix(Linux)命令,然后通過這個(gè)命令不斷輸出數(shù)據(jù)。如果進(jìn)程退出,Exec Source也一起退出,不會(huì)產(chǎn)生進(jìn)一步的數(shù)據(jù)。

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,描述就不解釋了。

Property Name

Default

Description

channels


type

The component type name, needs to be exec

command

The command to execute

shell

A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.

restartThrottle

10000

Amount of time (in millis) to wait before attempting a restart

restart

FALSE

Whether the executed cmd should be restarted if it dies

logStdErr

FALSE

Whether the command’s stderr should be logged

batchSize

20

The max number of lines to read and send to the channel at a time

selector.type

replicating

replicating or multiplexing

selector.*


Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*



 

下面是實(shí)際例子顯示。


[html] view plain copy

  1. #配置文件exec_case3.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = exec  

  9. a1.sources.r1.command = tail -F /tmp/logs/test.log  

  10. a1.sources.r1.channels = c1  

  11.    

  12. #Describe the sink  

  13. a1.sinks.k1.typelogger  

  14. a1.sinks.k1.channelc1  

  15.    

  16. #Use a channel which buffers events in memory  

  17. a1.channels.c1.typememory  

  18. a1.channels.c1.capacity1000  

  19. a1.channels.c1.transactionCapacity100  



這里我們用tail –F命令去一直都日志的尾部。

#敲命令

flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console

這邊會(huì)顯示讀取日志的所有數(shù)據(jù)

Flume NG 學(xué)習(xí)筆記(四)Source配置

Flume NG 學(xué)習(xí)筆記(四)Source配置



上圖是日志,這邊我們繼續(xù)往日志里添加數(shù)據(jù)

echo"looklook5" >>  test.log ,會(huì)發(fā)現(xiàn)終端也在輸出數(shù)據(jù)。

 

三、JMS Source

官網(wǎng)說:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.

簡單說的,官網(wǎng)JMSsource 就測(cè)試了ActiveMQ,其他的還沒有。下面是官網(wǎng)的例子:

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=jms

a1.sources.r1.channels=c1

a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory

a1.sources.r1.connectionFactory=GenericConnectionFactory

a1.sources.r1.providerURL=tcp://mqserver:61616

a1.sources.r1.destinationName=BUSINESS_DATA

a1.sources.r1.destinationType=QUEUE

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,描述就不解釋了

Property Name

Default

Description

channels


type

The component type name, needs to be jms

initialContextFactory

Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactory

The JNDI name the connection factory shoulld appear as

providerURL

The JMS provider URL

destinationName

Destination name

destinationType

Destination type (queue or topic)

messageSelector

Message selector to use when creating the consumer

userName

Username for the destination/provider

passwordFile

File containing the password for the destination/provider

batchSize

100

Number of messages to consume in one batch

converter.type

DEFAULT

Class to use to convert messages to flume events. See below.

converter.*

Converter properties.

converter.charset

UTF-8

Default converter only. Charset to use when converting JMS TextMessages to byte arrays.

 

介于這個(gè)源目前還不成熟,那我們等他成熟了再來研究吧,這里偷點(diǎn)懶。

 

四、Spooling Directory Source

Spooling Directory Source在第二節(jié)的時(shí)候已經(jīng)講過,這里復(fù)述一下:監(jiān)測(cè)配置的目錄下新增的文件,并將文件中的數(shù)據(jù)讀取出來。其中,Spool Source有2個(gè)注意地方,第一個(gè)是拷貝到spool目錄下的文件不可以再打開編輯,第二個(gè)是spool目錄下不可包含相應(yīng)的子目錄。這個(gè)主要用途作為對(duì)日志的準(zhǔn)實(shí)時(shí)監(jiān)控。

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選??蛇x項(xiàng)太多,這邊就加一個(gè)fileSuffix,即文件讀取后添加的后綴名,這個(gè)是可以更改。

Property Name

Default

Description

channels


type

The component type name, needs to be spooldir.

spoolDir

The directory from which to read files from.

fileSuffix

.COMPLETED

Suffix to append to completely ingested files

下面給出例子,這個(gè)與第二節(jié)的push.conf 相似


[html] view plain copy

  1. #配置文件:spool_case4.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type =spooldir  

  9. a1.sources.r1.spoolDir =/tmp/logs  

  10. a1.sources.r1.fileHeadertrue  

  11. a1.sources.r1.channels =c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.type = logger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  



這里我們監(jiān)控日志目錄/tmp/logs

#敲命令

flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console

Flume NG 學(xué)習(xí)筆記(四)Source配置


終端將數(shù)據(jù)都顯示出來了。我們查看監(jiān)控日志目錄/tmp/logs

Flume NG 學(xué)習(xí)筆記(四)Source配置


被讀取的文件已經(jīng)被加上后綴名,表示已經(jīng)完成讀取。

 

五、NetCat Source

Netcat source 在某一端口上進(jìn)行偵聽,它將每一行文字變成一個(gè)事件源,也就是數(shù)據(jù)是基于換行符分隔。它的工作就像命令nc -k -l [host] [port] 換句話說,它打開一個(gè)指定端口,偵聽數(shù)據(jù)將每一行文字變成Flume事件,并通過連接通道發(fā)送。

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選

Property Name

Default

Description

channels


type

The component type name, needs to be netcat

bind

Host name or IP address to bind to

port

Port # to bind to

max-line-length

512

Max line length per event body (in bytes)

ack-every-event

TRUE

Respond with an “OK” for every event received

selector.type

replicating

replicating or multiplexing

selector.*


Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*



實(shí)際例子話,第二節(jié)的第一個(gè)例子就是Netcat source,這里不演示了。

 

六、Sequence Generator Source

一個(gè)簡單的序列發(fā)生器,不斷產(chǎn)成與事件計(jì)數(shù)器0和1的增量開始。主要用于測(cè)試(官網(wǎng)說),這里也不做贅述。

 

七、Syslog Sources

讀取syslog數(shù)據(jù),并生成Flume 事件。 這個(gè)Source分成三類SyslogTCP Source、

Multiport Syslog TCP Source(多端口)與SyslogUDP Source。其中TCP Source為每一個(gè)用回車(\ n)來分隔的字符串創(chuàng)建一個(gè)新的事件。而UDP Source將整個(gè)消息作為一個(gè)單一的事件。

 

7.1、Syslog TCPSource

這個(gè)是最初的Syslog Sources

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選,這里可選我省略了。

Property Name

Default

Description

channels


type

The component type name, needs to be syslogtcp

host

Host name or IP address to bind to

port

Port # to bind to

官網(wǎng)案例

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=syslogtcp

a1.sources.r1.port=5140

a1.sources.r1.host=localhost

a1.sources.r1.channels=c1

下面是實(shí)際的例子


[html] view plain copy

  1. #配置文件:syslog_case5.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type =syslogtcp  

  9. a1.sources.r1.port =50000  

  10. a1.sources.r1.host =192.168.233.128  

  11. a1.sources.r1.channels =c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.type = logger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  




這里我們?cè)O(shè)置的偵聽端口為192.168.233.128 50000

#敲命令

flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

打開另一個(gè)終端輸入,往偵聽端口送數(shù)據(jù)

echo "hellolooklook5" | nc 192.168.233.128 50000

然后看之前的終端,將會(huì)有如下顯示:

Flume NG 學(xué)習(xí)筆記(四)Source配置


數(shù)據(jù)已經(jīng)發(fā)送過來了。

 

7.2 Multiport Syslog TCP Source

這是一個(gè)更新,更快,支持多端口版本的SyslogTCP Source。他不僅僅監(jiān)控一個(gè)端口,還可以監(jiān)控多個(gè)端口。官網(wǎng)配置基本差不多,就是可選配置比較多

Property Name

Default

Description

channels


type

The component type name, needs to be multiport_syslogtcp

host

Host name or IP address to bind to.

ports

Space-separated list (one or more) of ports to bind to.

portHeader

If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.

這里說明下需要注意的是這里ports設(shè)置已經(jīng)取代tcp 的port,這個(gè)千萬注意。還有portHeader這個(gè)可以與后面的interceptors 與 channel selectors自定義邏輯路由使用。

下面是官網(wǎng)例子:

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=multiport_syslogtcp

a1.sources.r1.channels=c1

a1.sources.r1.host=0.0.0.0

a1.sources.r1.ports=10001 10002 10003

a1.sources.r1.portHeader=port

下面是實(shí)際例子


[html] view plain copy

  1. #配置文件:syslog_case6.conf  

  2. # Name thecomponents on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = multiport_syslogtcp  

  9. a1.sources.r1.ports = 50000 60000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe thesink  

  14. a1.sinks.k1.typelogger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channelwhich buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  




這里我們偵探192.168.233.128的2個(gè)端口50000與60000

#敲命令

flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

打開另一個(gè)終端輸入,往偵聽端口送數(shù)據(jù)

echo "hellolooklook5" | nc 192.168.233.128 50000

echo "hello looklook6"| nc 192.168.233.128 60000

然后看之前的終端,將會(huì)有如下顯示:

Flume NG 學(xué)習(xí)筆記(四)Source配置


2個(gè)端口的數(shù)據(jù)已經(jīng)發(fā)送過來了。

 

7.3 Syslog UDP Source

關(guān)于這個(gè)官網(wǎng)都懶的介紹了,其實(shí)就是與TCP不同的協(xié)議而已。

官網(wǎng)配置與TCP一致,就不說了。下面是官網(wǎng)例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=syslogudp

a1.sources.r1.port=5140

a1.sources.r1.host=localhost

a1.sources.r1.channels=c1

下面是實(shí)際例子


[html] view plain copy

  1. #配置文件:syslog_case7.conf  

  2. # Name thecomponents on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = syslogudp  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe thesink  

  14. a1.sinks.k1.typelogger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channelwhich buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  



#敲命令

flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

打開另一個(gè)終端輸入,往偵聽端口送數(shù)據(jù)

echo "hellolooklook5" | nc –u 192.168.233.128 50000

#在啟動(dòng)的終端查看console輸出

Flume NG 學(xué)習(xí)筆記(四)Source配置


Ok,數(shù)據(jù)已經(jīng)發(fā)送過來了

 

 

八、HTTP Source

HTTP Source是HTTP POST和GET來發(fā)送事件數(shù)據(jù)的,官網(wǎng)說GET應(yīng)只用于實(shí)驗(yàn)。Flume 事件使用一個(gè)可插拔的“handler”程序來實(shí)現(xiàn)轉(zhuǎn)換,它必須實(shí)現(xiàn)的HTTPSourceHandler接口。此處理程序需要一個(gè)HttpServletRequest和返回一個(gè)flume 事件列表。

所有在一個(gè)POST請(qǐng)求發(fā)送的事件被認(rèn)為是在一個(gè)事務(wù)里,一個(gè)批量插入flume 通道的行為。

下面是官網(wǎng)給出的source的配置,加粗的參數(shù)是必選

Property Name

Default

Description

type


The component type name, needs to be http

port

The port the source should bind to.

bind

0.0.0.0

The hostname or IP address to listen on

handler

org.apache.flume.source.http.JSONHandler

The FQCN of the handler class.

官網(wǎng)例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=http

a1.sources.r1.port=5140

a1.sources.r1.channels=c1

a1.sources.r1.handler=org.example.rest.RestHandler

a1.sources.r1.handler.nickname=random props

下面是實(shí)際用例:



[html] view plain copy

  1. #配置文件:http_case8.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.typehttp  

  9. a1.sources.r1.port50000  

  10. a1.sources.r1.channelsc1  

  11.    

  12. #Describe the sink  

  13. a1.sinks.k1.typelogger  

  14.  a1.sinks.k1.channel = c1  

  15.    

  16. #Use a channel which buffers events in memory  

  17. a1.channels.c1.typememory  

  18. a1.channels.c1.capacity1000  

  19. a1.channels.c1.transactionCapacity100  



#敲命令

flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

#我們用生成JSON 格式的POSTrequest發(fā)數(shù)據(jù)

curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000

#在啟動(dòng)的終端查看console輸出

Flume NG 學(xué)習(xí)筆記(四)Source配置


這里headersbody都正常輸出。

 

九、Twitter 1%firehose Source(實(shí)驗(yàn)的)

官網(wǎng)警告,慎用,說不定下個(gè)版本就木有了

這個(gè)實(shí)驗(yàn)source 是通過時(shí)搜索服務(wù),從Twitter1%樣本信息中獲取事件數(shù)據(jù)。需要Twitter開發(fā)者賬號(hào)。好吧,對(duì)于400網(wǎng)站,我們無可奈何用不到,就不多解釋了。

 


十、自定義Source

一個(gè)自定義 Source其實(shí)是對(duì)Source接口的實(shí)現(xiàn)。當(dāng)我們開始flume代理的時(shí)候必須將自定義 Source和相依賴的jar包放到代理的classpath下面。自定義 Sourcetype就是我們實(shí)現(xiàn)Source接口對(duì)應(yīng)的類全路徑。

這里后面的內(nèi)容里會(huì)詳細(xì)介紹,這里不做贅述。


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI