溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)采集工具flume各種采集方案案例

發(fā)布時(shí)間:2020-05-28 13:04:12 來源:網(wǎng)絡(luò) 閱讀:715 作者:wx5d370d556f15a 欄目:大數(shù)據(jù)
                               以下是整理flume的各種采集方式 代碼直接用                                        

一、source類型是netcat
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令./flume-ng agent -n a1 -f ../conf/netcat.conf -Dflume.root.logger=INFO,console
二、source類型是spooldir
a1.sources = r1 ##給agent的source起名
a1.sinks = k1 ##給agent的sinks起名
a1.channels = c1 ##給agent的channels起名
a1.sources.r1.type = spooldir ##文件夾
a1.sources.r1.spoolDir = /root/flume ##要采集的目錄
a1.sources.r1.fileHeader = true ##采集過的文件是否需要添加一個(gè)后綴
a1.sinks.k1.type = logger
a1.channels.c1.type = memory ##把緩存數(shù)據(jù)放到內(nèi)存
a1.channels.c1.capacity = 1000 ##管道里面最多可以存放多少事件
a1.channels.c1.transactionCapacity = 100 ##每次對(duì)最接收多少事件
a1.sources.r1.channels = c1 ## 把source和channels連接上
a1.sinks.k1.channel = c1 ##把sinks和channel連接上
命令:…/bin/flume-ng agent -n a1 -f ../conf/spooldir.conf -Dflume.root.logger=INFO,console
三、source 類型是avro
這個(gè)配置source是avro類型,是一個(gè)服務(wù)器,這個(gè)服務(wù)器
開啟一個(gè)端口8088,目是接收數(shù)據(jù)的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = linux1 ##當(dāng)前這一臺(tái)機(jī)器的ip
a1.sources.r1.port = 8088
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令:…/bin/flume-ng agent -n a1 -f ../conf/server.conf -Dflume.root.logger=INFO,console
四、source是socket類型
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type =syslogtcp
a1.sources.r1.bind=linux1
a1.sources.r1.port=8080

a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、sink類型是avro
數(shù)據(jù)發(fā)送者 是一個(gè)客戶端 目的就是發(fā)送數(shù)據(jù)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2 ##當(dāng)前這一臺(tái)機(jī)器的ip
a1.sources.r1.port = 666

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console
六、sink類型是兩個(gè)avro
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2
a1.sources.r1.port = 666

a1.sources.r2.type = netcat
a1.sources.r2.bind = linux2
a1.sources.r2.port = 777

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux1
a1.sinks.k2.port = 8088
a1.sinks.k2.batch-size = 2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
七、sink 類型是hdfs
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666

a1.sinks.k1.type = hdfs #sink到hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
##filePrefix 默認(rèn)值:FlumeData
##寫入hdfs的文件名前綴,可以使用flume提供的日期及%{host}表達(dá)式。
a1.sinks.k1.hdfs.filePrefix = events-
##默認(rèn)值:30
##hdfs sink間隔多長(zhǎng)將臨時(shí)文件滾動(dòng)成最終目標(biāo)文件,單位:秒
##如果設(shè)置成0,則表示不根據(jù)時(shí)間來滾動(dòng)文件
#注:滾動(dòng)(roll)指的是,hdfs sink將臨時(shí)文件重命名成最終目標(biāo)文件,
#并新打開一個(gè)臨時(shí)文件來寫入數(shù)據(jù);
a1.sinks.k1.hdfs.rollInterval = 30
##默認(rèn)值:1024
##當(dāng)臨時(shí)文件達(dá)到該大?。▎挝唬篵ytes)時(shí),滾動(dòng)成目標(biāo)文件;
##如果設(shè)置成0,則表示不根據(jù)臨時(shí)文件大小來滾動(dòng)文件;
a1.sinks.k1.hdfs.rollSize = 0
##默認(rèn)值:10
##當(dāng)events數(shù)據(jù)達(dá)到該數(shù)量時(shí)候,將臨時(shí)文件滾動(dòng)成目標(biāo)文件;
##如果設(shè)置成0,則表示不根據(jù)events數(shù)據(jù)來滾動(dòng)文件;
a1.sinks.k1.hdfs.rollCount = 0
##batchSize 默認(rèn)值:100
##每個(gè)批次刷新到HDFS上的events數(shù)量;
a1.sinks.k1.hdfs.batchSize = 1
##useLocalTimeStamp
##默認(rèn)值:flase
##是否使用當(dāng)?shù)貢r(shí)間。
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認(rèn)是Sequencefile,可用DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
八、sink類型是kafka類型的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Hellokafka
a1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
九、Source 是mysql
a1.sources = r1
a1.sinks = k1
a1.channels = c1

###########sources#################

r1

a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=10000
a1.sources.r1.status.file.path = /root/data/flume/
a1.sources.r1.status.file.name = sqlSource.status
a1.sources.r1.start.from = 0
a1.sources.r1.custom.query = select id,userName from user where id > $@$ order by id asc
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10

a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI