溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka集群優(yōu)化的方法是什么

發(fā)布時(shí)間:2022-01-11 16:16:02 來(lái)源:億速云 閱讀:167 作者:iii 欄目:網(wǎng)絡(luò)管理

這篇文章主要介紹了Kafka集群優(yōu)化的方法是什么的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Kafka集群優(yōu)化的方法是什么文章都會(huì)有所收獲,下面我們一起來(lái)看看吧。

背景

個(gè)推作為專業(yè)的數(shù)據(jù)智能服務(wù)商,已經(jīng)成功服務(wù)了數(shù)十萬(wàn)APP,每日的消息下發(fā)量達(dá)百億級(jí)別,由此產(chǎn)生了海量日志數(shù)據(jù)。為了應(yīng)對(duì)業(yè)務(wù)上的各種需求,我們需要采集并集中化日志進(jìn)行計(jì)算,為此個(gè)推選用了高可用的、高可靠的、分布式的Flume系統(tǒng)以對(duì)海量日志進(jìn)行采集、聚合和傳輸。此外,個(gè)推也不斷對(duì)Flume進(jìn)行迭代升級(jí),以實(shí)現(xiàn)自己對(duì)日志的特定需求。

原有的異地機(jī)房日志匯聚方式,整個(gè)流程相對(duì)來(lái)說比較簡(jiǎn)單,A機(jī)房業(yè)務(wù)產(chǎn)生的日志通過多種方式寫入該機(jī)房Kafka集群,然后B機(jī)房的Flume通過網(wǎng)絡(luò)專線實(shí)時(shí)消費(fèi)A機(jī)房Kafka的日志數(shù)據(jù)后寫入本機(jī)房的Kafka集群,所有機(jī)房的數(shù)據(jù)就是通過相同方式在B機(jī)房Kakfa集群中集中化管理。如圖一所示:

Kafka集群優(yōu)化的方法是什么

圖一:原有異地日志傳輸模式

但是隨著業(yè)務(wù)量的不斷增加,日志數(shù)據(jù)在逐漸增多的過程中對(duì)帶寬要求變高,帶寬的瓶頸問題日益凸顯。按照1G的專線帶寬成本2~3w/月來(lái)計(jì)算,一個(gè)異地機(jī)房一年僅專線帶寬擴(kuò)容成本就高達(dá)30w以上。對(duì)此,如何找到一種成本更加低廉且符合當(dāng)前業(yè)務(wù)預(yù)期的傳輸方案呢?Avro有快速壓縮的二進(jìn)制數(shù)據(jù)形式,并能有效節(jié)約數(shù)據(jù)存儲(chǔ)空間和網(wǎng)絡(luò)傳輸帶寬,從而成為優(yōu)選方案。

優(yōu)化思路

Avro簡(jiǎn)介

Avro是一個(gè)數(shù)據(jù)序列化系統(tǒng)。它是Hadoop的一個(gè)子項(xiàng)目,也是Apache的一個(gè)獨(dú)立的項(xiàng)目,其主要特點(diǎn)如下:

● 豐富的數(shù)據(jù)結(jié)構(gòu);

● 可壓縮、快速的二進(jìn)制數(shù)據(jù)類型;

● 可持久化存儲(chǔ)的文件類型;

● 遠(yuǎn)程過程調(diào)用(RPC);

● 提供的機(jī)制使動(dòng)態(tài)語(yǔ)言可以方便地處理數(shù)據(jù)。

具體可參考官方網(wǎng)站:http://avro.apache.org/

Flume Avro方案

Flume的RPC Source是Avro Source,它被設(shè)計(jì)為高擴(kuò)展的RPC服務(wù)端,能從其他Flume Agent 的Avro Sink或者Flume SDK客戶端,接收數(shù)據(jù)到Flume Agent中,具體流程如圖二所示:

Kafka集群優(yōu)化的方法是什么

圖二:Avro Source流程

針對(duì)該模式,我們的日志傳輸方案計(jì)劃變更為A機(jī)房部署Avro Sink用以消費(fèi)該機(jī)房Kafka集群的日志數(shù)據(jù),壓縮后發(fā)送到B機(jī)房的Avro Source,然后解壓寫入B機(jī)房的Kafka集群,具體的傳輸模式如圖三所示:

Kafka集群優(yōu)化的方法是什么

圖三:Flume Avro傳輸模式

可能存在的問題

我們預(yù)估可能存在的問題主要有以下三點(diǎn):

● 當(dāng)專線故障的時(shí)候,數(shù)據(jù)是否能保證完整性;

● 該模式下CPU和內(nèi)存等硬件的消耗評(píng)估;

● 傳輸性能問題。

驗(yàn)證情況

針對(duì)以上的幾個(gè)問題,我們做了幾項(xiàng)對(duì)比實(shí)驗(yàn)。

環(huán)境準(zhǔn)備情況說明:

1. 兩臺(tái)服務(wù)器192.168.10.81和192.168.10.82,以及每臺(tái)服務(wù)器上對(duì)應(yīng)一個(gè)Kakfa集群,模擬A機(jī)房和B機(jī)房;

2.   兩個(gè)Kafka集群中對(duì)應(yīng)topicA(源端)和topicB(目標(biāo)端)。在topicA中寫入合計(jì)大小11G的日志數(shù)據(jù)用來(lái)模擬原始端日志數(shù)據(jù)。

3.   192.168.10.82上部署一個(gè)Flume,模擬原有傳輸方式。

4.  192.168.10.81服務(wù)器部署Avro Sink,192.168.10.82部署Avro Source,模擬Flume Avro傳輸模式。

原有Flume模式驗(yàn)證(非Avro)

 監(jiān)控Kafka消費(fèi)情況:

Kafka集群優(yōu)化的方法是什么

 81流量統(tǒng)計(jì):

Kafka集群優(yōu)化的方法是什么

  82流量統(tǒng)計(jì):

Kafka集群優(yōu)化的方法是什么

消費(fèi)全部消息耗時(shí):20min

消費(fèi)總?cè)罩緱l數(shù)統(tǒng)計(jì):129,748,260

總流量:13.5G

Avro模式驗(yàn)證

 配置說明:

Avro Sink配置:

#kafkasink 是kafkatokafka的sinks的名字,可配多個(gè),空格分開kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSourcekafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect =192.168.10.81:2181kafkatokafka.sources.kafka_dmc_bullet.topic = topicAkafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms =150000kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms =10000kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavrokafkatokafka.sources.kafka_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet的配置,可配置多個(gè)sink提高壓縮傳輸效率kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSinkkafkatokafka.sinks.kafkasink_dmc_bullet.hostname =192.168.10.82kafkatokafka.sinks.kafkasink_dmc_bullet.port =55555//與source的rpc端口一一對(duì)應(yīng)kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate//壓縮模式kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level =6//壓縮率1~9kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet配的channel,只配一個(gè)kafkatokafka.channels.channel_dmc_bullet.type = memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000#kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000#kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =5000kafkatokafka.channels.channel_dmc_bullet.keep-alive =60

Avro Source配置:

#kafkasink 是kafkatokafka的sinks的名字,可配多個(gè),空格分開kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type= avrokafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.bind =0.0.0.0kafkatokafka.sources.kafka_dmc_bullet.port =55555//rpc端口綁定kafkatokafka.sources.kafka_dmc_bullet.compression-type= deflate//壓縮模式kafkatokafka.sources.kafka_dmc_bullet.batchSize =100#source kafkasink_dmc_bullet的配置kafkatokafka.sinks.kafkasink_dmc_bullet.type= org.apache.flume.sink.kafka.KafkaSinkkafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitionerkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicBkafkatokafka.sinks.kafkasink_dmc_bullet.brokerList =192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =500kafkatokafka.channels.channel_dmc_bullet.type= memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =1000

監(jiān)控Kafka消費(fèi)情況

Kafka集群優(yōu)化的方法是什么

81流量統(tǒng)計(jì):

Kafka集群優(yōu)化的方法是什么

82流量統(tǒng)計(jì):

Kafka集群優(yōu)化的方法是什么

消費(fèi)全部消息耗時(shí):26min

消費(fèi)總?cè)罩緱l數(shù)統(tǒng)計(jì):129,748,260

總流量:1.69G

故障模擬

1.   模擬專線故障,在A、B兩機(jī)房不通的情況下,Avro Sink報(bào)錯(cuò)如下:

Kafka集群優(yōu)化的方法是什么

2. 監(jiān)控Kafka消費(fèi)情況,發(fā)現(xiàn)消費(fèi)者已停止消費(fèi):

Kafka集群優(yōu)化的方法是什么

3. 故障處理恢復(fù)后繼續(xù)消費(fèi)剩余日志,經(jīng)統(tǒng)計(jì),總?cè)罩緱l數(shù)為:129,747,255。

結(jié)論

1.   當(dāng)專線發(fā)生故障時(shí),正在網(wǎng)絡(luò)傳輸中的通道外數(shù)據(jù)可能會(huì)有少部分丟失,其丟失原因?yàn)榫W(wǎng)絡(luò)原因,與Avro模式無(wú)關(guān);故障后停止消費(fèi)的數(shù)據(jù)不會(huì)有任何的丟失問題,由于網(wǎng)絡(luò)原因丟失的數(shù)據(jù)需要評(píng)估其重要性以及是否需要補(bǔ)傳。

2.   流量壓縮率達(dá)80%以上,同時(shí)我們也測(cè)試了等級(jí)為1~9的壓縮率,6跟9非常接近,CPU和內(nèi)存的使用率與原有傳輸模式相差不大,帶寬的優(yōu)化效果比較明顯。

3.   傳輸性能由于壓縮的原因適當(dāng)變?nèi)?,單Sink由原先20分鐘延長(zhǎng)至26分鐘,可適當(dāng)增加Sink的個(gè)數(shù)來(lái)提高傳輸速率。

生產(chǎn)環(huán)境實(shí)施結(jié)果

Kafka集群優(yōu)化的方法是什么

實(shí)施結(jié)果如下:

1.   由于還有其它業(yè)務(wù)的帶寬占用,總帶寬使用率節(jié)省了50%以上,現(xiàn)階段高峰期帶寬速率不超過400Mbps;

2.   每個(gè)Sink傳輸速率的極限大概是3000條每秒,壓縮傳輸速率問題通過增加Sink的方式解決,但會(huì)適當(dāng)增加CPU和內(nèi)存的損耗。

關(guān)于“Kafka集群優(yōu)化的方法是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Kafka集群優(yōu)化的方法是什么”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI