溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Kafka的知識(shí)點(diǎn)有哪些

發(fā)布時(shí)間:2021-11-16 16:37:27 來(lái)源:億速云 閱讀:176 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Kafka的知識(shí)點(diǎn)有哪些”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Kafka的知識(shí)點(diǎn)有哪些”吧!

  • kafka具有高吞吐量、低延時(shí)的主要原因有三個(gè):

    • 一是其在每次寫(xiě)入數(shù)據(jù)時(shí)只是將數(shù)據(jù)寫(xiě)入到操作系統(tǒng)的頁(yè)緩存中,這就相當(dāng)于只是在內(nèi)存中寫(xiě)入數(shù)據(jù),而繁雜的磁盤(pán)IO工作則交由操作系統(tǒng)自行進(jìn)行;

    • 二是kafka在寫(xiě)入數(shù)據(jù)的時(shí)候是采用追加的方式寫(xiě)入到磁盤(pán)中的,這種方式省略了磁頭的隨機(jī)移動(dòng)而產(chǎn)生的隨機(jī)IO,其效率甚至比內(nèi)存的隨機(jī)讀取都要高;

    • 三是在為kafka配置了較大的頁(yè)緩存時(shí),數(shù)據(jù)大部分的數(shù)據(jù)讀取和寫(xiě)入工作都直接在頁(yè)緩存中進(jìn)行了,讀取和寫(xiě)入的時(shí)候甚至不需要進(jìn)行磁盤(pán)的IO工作,而磁盤(pán)的IO也只在操作系統(tǒng)將數(shù)據(jù)從頁(yè)緩存寫(xiě)入到磁盤(pán)中才會(huì)進(jìn)行。

  • kafka使用的零拷貝技術(shù)是通過(guò)操作系統(tǒng)的sendfile指令來(lái)實(shí)現(xiàn)的。在正常情況下,數(shù)據(jù)從磁盤(pán)讀取然后發(fā)送到網(wǎng)卡中需要經(jīng)過(guò)如下步驟:

    在這個(gè)過(guò)程中,數(shù)據(jù)的復(fù)制總共發(fā)生了四次,其中與DMA相關(guān)的有兩次,分別是從磁盤(pán)上讀取文件數(shù)據(jù)到內(nèi)核緩存和將數(shù)據(jù)從socket相關(guān)的內(nèi)核緩存復(fù)制到網(wǎng)卡中,這兩次復(fù)制是不可避免的。但是在數(shù)據(jù)在內(nèi)核態(tài)與用戶(hù)態(tài)之間的兩次拷貝則非常的消耗CPU的資源,這兩次也是可以避免的,而零拷貝技術(shù)就是省去了這兩次的拷貝過(guò)程,從而將CPU給釋放出來(lái),以節(jié)約資源。sendfile指令指的是,在DMA將磁盤(pán)數(shù)據(jù)拷貝到內(nèi)核態(tài)緩存之后,其會(huì)將該緩存中數(shù)據(jù)的地址值直接當(dāng)做網(wǎng)絡(luò)相關(guān)的內(nèi)核態(tài)的緩存地址,從而直接從該緩存中通過(guò)DMA技術(shù)將數(shù)據(jù)寫(xiě)入到網(wǎng)卡中。通過(guò)這種方式就省去了數(shù)據(jù)在內(nèi)核態(tài)與用戶(hù)態(tài)之間的來(lái)回復(fù)制。

    • 首先通過(guò)DMA(Direct Memory Access)直接存儲(chǔ)器訪(fǎng)問(wèn)技術(shù)讀取磁盤(pán)上的文件數(shù)據(jù),將其存儲(chǔ)到操作系統(tǒng)內(nèi)核態(tài)緩存中;

    • 然后CPU會(huì)將內(nèi)核態(tài)的緩存讀取到應(yīng)用程序的緩存中;

    • 應(yīng)用程序獲取到數(shù)據(jù)之后,會(huì)將該數(shù)據(jù)寫(xiě)入到socket相關(guān)的內(nèi)核態(tài)緩存中;

    • 最后通過(guò)DMA技術(shù)將內(nèi)核態(tài)緩存中的數(shù)據(jù)發(fā)送到網(wǎng)卡中。

  • 有了消息的持久化,kafka實(shí)現(xiàn)了高可靠性;有了負(fù)載均衡和使用文件系統(tǒng)的獨(dú)特設(shè)計(jì),kafka實(shí)現(xiàn)了高吞吐量;有了故障轉(zhuǎn)移,kafka實(shí)現(xiàn)了高可用性。

  • kafka的topic本質(zhì)上指的是一類(lèi)消息,其可以理解為消息的目標(biāo)存儲(chǔ)地址,并且kafka并沒(méi)有采用topic-message的二級(jí)存儲(chǔ)結(jié)構(gòu),而是采用了topic-partition-message的三級(jí)存儲(chǔ)結(jié)構(gòu)。這樣做的原因在于能夠分散數(shù)據(jù)到不同的partition上,然后將partition分散到不同的物理機(jī)器上,這樣就達(dá)到了負(fù)載均衡提升系統(tǒng)吞吐量的目的。如下是topic-partition-message的三級(jí)存儲(chǔ)結(jié)構(gòu)的示意圖:

Kafka的知識(shí)點(diǎn)有哪些

  • kafka中每個(gè)消息都會(huì)被分配一個(gè)位移offset,這個(gè)位移指的是該消息在當(dāng)前partition中的一個(gè)偏移量,對(duì)于每個(gè)新增的數(shù)據(jù),其位移是依次增加的。在consumer端也有一個(gè)位移的概念,這個(gè)位移指的是該consumer在當(dāng)前partition中所消費(fèi)的一個(gè)位置,其一定是小于等于該partition中最新的消息的位移的。

  • 根據(jù)配置,kafka的每個(gè)partition都可以設(shè)置一個(gè)或多個(gè)副本,這些副本分為leader和follower,leader副本會(huì)對(duì)外提供寫(xiě)入和讀取消息的功能,而follower只會(huì)從leader副本上讀取數(shù)據(jù)并保存下來(lái),其不對(duì)外提供服務(wù)。另外,kafka會(huì)保證一個(gè)partition的多個(gè)副本被平均的分配到多個(gè)服務(wù)器上。通過(guò)這種方式,kafka就實(shí)現(xiàn)了服務(wù)的高可用,當(dāng)leaderpartition所在的機(jī)器宕機(jī)之后,kafka就會(huì)在follower副本中選舉出一個(gè)作為leader并對(duì)外提供服務(wù)。在副本中,還有一個(gè)ISR的概念,所謂的ISR就是in-sync-replica,也就是處于完全同步狀態(tài)的副本,有這個(gè)概念的原因主要是follower在同步leader的數(shù)據(jù)的時(shí)候,可能由于網(wǎng)絡(luò)等原因?qū)е聰?shù)據(jù)不是完全同步的,這個(gè)時(shí)候leader就會(huì)通過(guò)ISR來(lái)標(biāo)記哪些副本是處于完全同步狀態(tài)的,這樣在leader宕機(jī)的時(shí)候,就只會(huì)在處于ISR狀態(tài)的副本中選舉新的leader,從而保證數(shù)據(jù)的一致性。

  • 對(duì)于kafka硬盤(pán)的規(guī)劃建議點(diǎn):

    • 追求性?xún)r(jià)比的公司可以使用JBOD磁盤(pán);

    • 使用機(jī)械硬盤(pán)完全可以滿(mǎn)足kafka集群的使用,SSD更好。

  • 磁盤(pán)大小規(guī)劃的影響因素:

    • 新增消息數(shù)

    • 消息留存時(shí)間;

    • 平均消息大??;

    • 副本數(shù);

    • 是否啟用壓縮;

  • 對(duì)于內(nèi)存大小設(shè)置的建議:

    • 盡量分配更多的內(nèi)存給操作系統(tǒng)的page cache;

    • 不要為broker設(shè)置過(guò)大的堆內(nèi)存,最好不超過(guò)6GB;

    • page cache大小至少要大于一個(gè)日志段的大小;

  • 對(duì)CPU規(guī)劃的建議:

    • 使用多核系統(tǒng),CPU數(shù)最好大于8;

    • 如果使用kafka 0.10.0.0之前的版本或clients端與broker端消息版本不一致,則考慮多配置一些資源以防止消息解壓縮操作消耗過(guò)多的CPU。

  • 對(duì)帶寬資源規(guī)劃的建議:

    • 盡量使用高速網(wǎng)絡(luò);

    • 根據(jù)自身網(wǎng)絡(luò)條件和帶寬來(lái)評(píng)估kafka集群機(jī)器數(shù)量;

    • 避免使用跨機(jī)房網(wǎng)絡(luò)。

  • broker參數(shù)

    • broker.id:該參數(shù)指的是當(dāng)前broker的id,每個(gè)broker的id不能重復(fù),建議讀者自行指定該參數(shù),數(shù)值從0依次網(wǎng)上增長(zhǎng)即可,如果不指定該參數(shù),那么broker將會(huì)自動(dòng)生成一個(gè)隨機(jī)數(shù);

    • log.dirs:該參數(shù)指定了kafka持久化消息的目錄,建議讀者根據(jù)自己的磁盤(pán)數(shù)量來(lái)指定相應(yīng)的目錄數(shù),這是因?yàn)槊總€(gè)磁盤(pán)都有一個(gè)磁頭,指定相同數(shù)量的目錄之后,kafka寫(xiě)入日志就可以利用多個(gè)磁頭并行的寫(xiě)入;

    • zookeeper.connect:該參數(shù)指定了zookeeper服務(wù)器的地址,多個(gè)地址可以用逗號(hào)隔開(kāi),形如zk1:port1,zk2:port2,zk3:port3;

    • listeners:broker監(jiān)聽(tīng)器列表,格式為[協(xié)議]://[主機(jī)名]:[端口],[協(xié)議]://[主機(jī)名]:[端口]。該參數(shù)主要用于客戶(hù)端連接broker使用,可以認(rèn)為是broker端開(kāi)放給clients的監(jiān)聽(tīng)端口;

    • advertised.listeners:和listeners類(lèi)似,該參數(shù)也是用于發(fā)布給clientss的監(jiān)聽(tīng)器,不過(guò)該參數(shù)主要用于IaaS環(huán)境;

    • unclean.leader.election.enable:是否開(kāi)啟unclean leader選舉,所謂的unclean leader指的是在leader宕機(jī)時(shí),會(huì)從ISR中選舉一個(gè)follower作為新的leader,但是如果ISR中為空的,說(shuō)明所有的replica都處于未同步狀態(tài),而unclean leader選舉指的就是在這種情況下,是否使用這種未同步狀態(tài)的replica作為新的leader;

    • delete.topic.enable:是否允許kafka刪除topic,默認(rèn)是開(kāi)啟的,也建議開(kāi)啟,因?yàn)榭刂剖欠駝h除可以通過(guò)權(quán)限來(lái)進(jìn)行;

    • log.retention.{hours|minutes|ms}:該參數(shù)指定了每個(gè)partition的日志的留存時(shí)間,默認(rèn)是7天,超過(guò)7天的日志將會(huì)被自動(dòng)刪除,這個(gè)參數(shù)中的三個(gè)選項(xiàng)如果同時(shí)配置,那么優(yōu)先級(jí)是ms>minutes>hours;

    • log.retention.bytes:指定了每個(gè)消息日志最多保存多大的數(shù)據(jù),對(duì)于超過(guò)該參數(shù)的分區(qū)日志而言,其會(huì)被自動(dòng)刪除,該參數(shù)默認(rèn)值為-1;

    • min.insync.replicas:該參數(shù)指定了broker最少響應(yīng)client消息發(fā)送的最少副本數(shù)。需要注意的是,該參數(shù)不能設(shè)置得與當(dāng)前broker副本數(shù)一樣,比如當(dāng)前副本有3個(gè),如果設(shè)置為3,那么客戶(hù)端發(fā)送的消息必須在3個(gè)副本中都保存才算保存成功,此時(shí)如果某個(gè)副本宕機(jī)了,那么客戶(hù)端寫(xiě)入消息之后就始終不會(huì)有目標(biāo)數(shù)量的副本數(shù)響應(yīng),因而該消息始終無(wú)法寫(xiě)入成功。該參數(shù)還可以與客戶(hù)端的acks參數(shù)配合使用以達(dá)到消息持久保存,并且需要注意的是,該參數(shù)只有在客戶(hù)端的acks參數(shù)指定為-1時(shí)才有效;

    • num.network.threads:指定了broker在后臺(tái)用于處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù),默認(rèn)為3。需要注意的是,這里的“處理”其實(shí)只是負(fù)責(zé)轉(zhuǎn)發(fā)請(qǐng)求,它會(huì)將接收到的請(qǐng)求轉(zhuǎn)發(fā)到后面的處理線(xiàn)程中,在真實(shí)環(huán)境中可以通過(guò)NetworkProcessorAvgIdlePercent JMX指標(biāo)來(lái)監(jiān)控,如果該值持續(xù)低于0.3,建議適當(dāng)提高該參數(shù)的值;

    • num.io.threads:這個(gè)參數(shù)控制了broker端實(shí)際處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù),默認(rèn)值是8,可以通過(guò)Request HandlerAvgIdlePercent JMX指標(biāo)來(lái)監(jiān)控該數(shù)據(jù),如果持續(xù)低于0.3,則可以考慮適當(dāng)增加該參數(shù)值;

    • message.max.bytes:該參數(shù)指定了每條消息的最大字節(jié)數(shù),默認(rèn)為977KB,真實(shí)環(huán)境中一般沒(méi)有這么大的消息。

  • topic級(jí)別參數(shù)

    • delete.retention.ms:每個(gè)topic設(shè)置自己的日志留存時(shí)間,以覆蓋全局默認(rèn)值;

    • max.message.bytes:每個(gè)topic設(shè)置自己的消息最大字節(jié)數(shù),以覆蓋全局默認(rèn)值;

    • retention.bytes:每個(gè)topic設(shè)置自己的日志留存大小,以覆蓋全局默認(rèn)值。

  • GC參數(shù)

    • 建議使用G1 GC;

  • JVM參數(shù)

    • 主要是關(guān)于堆內(nèi)存大小的,因?yàn)閗afka主要使用的是堆外內(nèi)存,因而建議堆內(nèi)存不要超過(guò)6GB;

  • OS參數(shù)

    • 文件描述符限制:由于kafka對(duì)打開(kāi)大量的文件,因而建議將kafka的文件描述符限制設(shè)置一個(gè)比較大的值,命令如:ulimit -n 100000;

    • socket緩沖區(qū)大?。阂话愕膬?nèi)網(wǎng)環(huán)境的socket緩沖區(qū)大小為64KB,這對(duì)于內(nèi)網(wǎng)是完全足夠的,因?yàn)閮?nèi)網(wǎng)的往返時(shí)間RTT是非常短的,但是如果消息要經(jīng)過(guò)長(zhǎng)距離傳輸,那么建議提升該值,比如128KB,以防止數(shù)據(jù)堆積;

    • 最好使用Ext4或者XFS文件系統(tǒng):使用這種文件系統(tǒng)會(huì)提供更好的寫(xiě)入性能,尤其是XFS文件系統(tǒng);

    • 關(guān)閉swap:降低對(duì)swap空間的使用,命令為sysctl vm.swappiness=<一個(gè)較小的數(shù)>;

    • 設(shè)置更長(zhǎng)的flush時(shí)間:默認(rèn)情況下,OS的刷盤(pán)時(shí)間是5s,但是這個(gè)事件太短了,建議提升該值為2分鐘,以更大程度的提升OS物理寫(xiě)入操作的性能。

  • Producer在發(fā)送消息時(shí),其會(huì)根據(jù)一定的分區(qū)篩選策略來(lái)選擇將當(dāng)前消息發(fā)送到哪個(gè)分區(qū)。如果當(dāng)前消息中指定了key,那么就會(huì)根據(jù)這個(gè)key的hash值將該消息發(fā)送給某個(gè)分區(qū),也就是說(shuō)具有同一個(gè)key的所有消息將會(huì)發(fā)送到同一個(gè)分區(qū)。如果當(dāng)前消息沒(méi)有key,那么就會(huì)采用輪詢(xún)的方式,依次將消息均勻的發(fā)送給各個(gè)分區(qū)。

  • kafka的producer的工作流程如下:

    ![image-20190826202129576](/Users/zhangxufeng/Library/Application Support/typora-user-images/image-20190826202129576.png)

    首先,producer會(huì)將消息封裝到一個(gè)ProducerRecord中,然后根據(jù)設(shè)置的序列化器將其序列化為二進(jìn)制數(shù)據(jù),并且將其放置到producer端的一個(gè)消息緩沖池中;接著,另一個(gè)線(xiàn)程會(huì)不斷的從消息緩沖池中批量的讀取數(shù)據(jù),將其封裝在一起后一次性的發(fā)送給broker進(jìn)行處理;處理完成之后,由broker返回處理結(jié)果,如果某個(gè)消息處理失敗,那么就戶(hù)根據(jù)設(shè)置的重試次數(shù),對(duì)其進(jìn)行重試;如果沒(méi)有失敗,則將消息結(jié)果返回給客戶(hù)端線(xiàn)程。

  • 在發(fā)送消息的時(shí)候可以指定消息的時(shí)間戳,但是建議producer不要設(shè)置該參數(shù),因?yàn)閗afka保存消息是嚴(yán)格按照時(shí)間戳順序來(lái)排列的,如果隨意指定時(shí)間戳,那么可能會(huì)導(dǎo)致消息混亂,從而找不到消息,并且也可能會(huì)影響消息的保存策略。

  • Kafka的producer的異步回調(diào)函數(shù)中有一個(gè)異常參數(shù),該異常分為可重試異常和不可重試異常??芍卦嚠惓V饕幸韵聨最?lèi):

    不可重試的異常主要有以下幾類(lèi):

    • RecordTooLargeException:發(fā)送的消息尺寸過(guò)大,超過(guò)了規(guī)定的大小上線(xiàn),這種異常一般重試之后也是無(wú)法恢復(fù)的;

    • SerializationException:序列化異常,重試無(wú)法恢復(fù);

    • KafkaException:其他類(lèi)型的異常。

    • NetworkException:網(wǎng)絡(luò)瞬時(shí)故障引起的異常,可重試;

    • LeaderNotAvailableException:這種異常一般發(fā)生在leader換屆選舉的時(shí)候,一般重試之后就會(huì)恢復(fù);

    • NotControllerException:當(dāng)前的controller不可用,一般發(fā)生在controller換屆選舉的時(shí)候,重試之后可恢復(fù);

  • producer的關(guān)閉有兩種方法:不帶超時(shí)時(shí)間的和帶超時(shí)時(shí)間的close()方法。不帶超時(shí)時(shí)間的close()方法會(huì)等待之前發(fā)送的所有的消息都處理完畢之后再關(guān)閉,而帶超時(shí)時(shí)間的close()方法則需要指定一個(gè)超時(shí)時(shí)間,如果在超時(shí)時(shí)間結(jié)束了,消息還未處理完畢,那么就會(huì)終止所有的消息發(fā)送,這種情況下是可能丟失消息的。

  • producer端主要參數(shù):

    • 重試可能會(huì)造成消息的重復(fù)發(fā)送。比如某個(gè)消息已經(jīng)成功寫(xiě)入到了broker端,但是由于網(wǎng)絡(luò)抖動(dòng),導(dǎo)致producer端沒(méi)有接收到響應(yīng),或者響應(yīng)超時(shí),那么producer會(huì)嘗試重新發(fā)送該消息,這樣就會(huì)產(chǎn)生重復(fù)消息。在kafka 0.11.0.0版本中已經(jīng)開(kāi)始支持”精確一次“的處理語(yǔ)義;

    • 重試可能會(huì)造成消息的亂序。在producer發(fā)送消息的時(shí)候,默認(rèn)是會(huì)將5條消息作為一個(gè)批次進(jìn)行發(fā)送,但是如果其中某個(gè)消息寫(xiě)入失敗,而其余四條消息寫(xiě)入成功,此時(shí)該消息就會(huì)被重試,這種情況下,本來(lái)應(yīng)該在該消息后面的消息反而被保存在了該消息前面。

    • bootstrap.servers:該參數(shù)指定了當(dāng)前kafka服務(wù)器的地址,格式如:ip1:port1,ip2:port2,ip3:port3;

    • key.serializer:該參數(shù)指定了key的序列化器,必須是類(lèi)的全限定名;

    • value.serializer:該參數(shù)指定了value的序列化器,必須是類(lèi)的全限定名;

    • acks:這個(gè)參數(shù)的主要作用是控制producer發(fā)送消息后對(duì)消息可靠性的管理級(jí)別。若設(shè)置為0,則表示producer發(fā)送消息后,不管消息是否發(fā)送成功,即使發(fā)生任何異常也無(wú)法接收到該異常,這種情況下,消息的可靠性是最低的,但是吞吐量最高;若設(shè)置為1,則表示消息發(fā)送后,只要在接收消息的broker上成功寫(xiě)入該消息,并且返回響應(yīng)之后即可進(jìn)行后續(xù)的操作,這個(gè)情況下,消息的可靠性和吞吐量都適中,因?yàn)閷?xiě)入消息的broker如果宕機(jī),那么消息還是會(huì)丟失的;若設(shè)置為all或-1,則表示當(dāng)前消息會(huì)被寫(xiě)入到接收消息的broker的日志中,并且還會(huì)等待所有ISR集合中的副本都寫(xiě)入日志成功之后才返回響應(yīng),這種情況下,消息的可靠性是最高的,但是吞吐量最低。

    • buffer.memory:該參數(shù)指定了當(dāng)前producer所使用的發(fā)送緩沖區(qū)的大?。?/p>

    • compression.type:該參數(shù)指定了producer端壓縮消息的類(lèi)型,可選擇的有GZIP、Snappy和LZ4。這三種壓縮方式中,LZ4的效率是最好的。如果為消息設(shè)置了壓縮類(lèi)型,這將會(huì)顯著的減少網(wǎng)絡(luò)IO的開(kāi)銷(xiāo),但是會(huì)增加producer端的CPU負(fù)擔(dān)。另外需要注意的是,如果producer端和broker端設(shè)置的壓縮類(lèi)型不同,那么broker端就會(huì)對(duì)消息進(jìn)行解壓縮,然后又進(jìn)行壓縮后再保存,這將增加broker的CPU負(fù)擔(dān)。

    • retries:該參數(shù)指定了在producer端發(fā)生了諸如網(wǎng)絡(luò)抖動(dòng)或leader選舉等可重試異常時(shí)進(jìn)行重試的次數(shù)。該參數(shù)的默認(rèn)值為0,即不進(jìn)行重試,建議設(shè)置為一個(gè)比0大的數(shù),比如3。需要注意的是,設(shè)置了重試次數(shù)之后,可能會(huì)造成兩個(gè)問(wèn)題:

    • max.in.flight.requests.per.connection:指定了在給broker發(fā)送消息時(shí),同一時(shí)刻發(fā)送的請(qǐng)求數(shù)量,如果指定為1,那么producer一次就只會(huì)發(fā)送一個(gè)請(qǐng)求;

    • retry.backoff.ms:在進(jìn)行重試時(shí),producer會(huì)等待一段時(shí)間再進(jìn)行重試,以防止過(guò)多的重試對(duì)broker造成負(fù)擔(dān),該參數(shù)就是指定這個(gè)重試時(shí)間間隔的,默認(rèn)為100ms,建議設(shè)置得比當(dāng)前kafka集群中分區(qū)leader選舉的時(shí)間稍微長(zhǎng)一點(diǎn),因?yàn)檫@種選舉是最頻繁的;

    • batch.size:在producer發(fā)送消息的時(shí)候,會(huì)緩存一個(gè)批次的消息后再發(fā)送該消息,該參數(shù)就指定了這個(gè)緩沖區(qū)的大小的,默認(rèn)是16384,即16KB,這是一個(gè)比較保守的數(shù)字,建議稍微提高一些該參數(shù),因?yàn)橐粋€(gè)適當(dāng)?shù)木彌_區(qū)大小將會(huì)極大的提升系統(tǒng)的吞吐量;

    • linger.ms:在producer發(fā)送消息的時(shí)候,有的時(shí)候并不會(huì)等待一個(gè)batch都滿(mǎn)了才發(fā)送消息,因?yàn)檫@可能造成極大的延遲,而是會(huì)在等待一個(gè)時(shí)間間隔之后就直接發(fā)送這個(gè)batch的消息。默認(rèn)情況下,該參數(shù)值為0,表示消息需要被立即發(fā)送,但是這樣會(huì)拉低系統(tǒng)的吞吐量,為其設(shè)置一個(gè)適當(dāng)?shù)闹悼梢蕴嵘到y(tǒng)的吞吐量。

    • max.request.size:該參數(shù)指定了producer發(fā)送請(qǐng)求的大小,默認(rèn)為1048576;

    • request.timeout.ms:該參數(shù)指定了producer發(fā)送消息后等待broker響應(yīng)的超時(shí)時(shí)間,默認(rèn)為30S;

  • 自定義Partition的步驟如下:

    • 首先定義一個(gè)類(lèi)實(shí)現(xiàn)Partitioner接口;

    • 然后在Producer的Property中配置partitioner.class屬性,值為自定義Partitioner的類(lèi)的全限定名。

  • 自定義producer的serializer的步驟:

    • 定義數(shù)據(jù)對(duì)象格式;

    • 創(chuàng)建自定義序列化類(lèi),實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口,在serializer方法中實(shí)現(xiàn)序列化邏輯;

    • 在用于構(gòu)造KafkaProducer的Properties對(duì)象中設(shè)置key.serializervalue.serializer。

  • 自定義producer的攔截器使用步驟:

    • onSend():該方法會(huì)保證在消息被序列化以計(jì)算分區(qū)之前調(diào)用,主要是做一些消息發(fā)送前的處理工作;

    • onAcknowledgement():該方法主要是在消息被應(yīng)答之前或者消息發(fā)送失敗時(shí)調(diào)用,并且通常是在producer回調(diào)邏輯觸發(fā)之前調(diào)用;

    • close():該方法主要是關(guān)閉interceptor時(shí)觸發(fā),作用是進(jìn)行一些清理工作。

    • 創(chuàng)建攔截器類(lèi),實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,其有三個(gè)方法:

    • 在構(gòu)造KafkaProducer的Properties對(duì)象中設(shè)置interceptor.classes屬性,值為元素為ProducerInterceptor的list。

  • Producer端的消息可靠性保證。在Producer端發(fā)送消息的時(shí)候,kafka會(huì)將消息放到一個(gè)batch里,然后進(jìn)行整批數(shù)據(jù)進(jìn)行發(fā)送,那么這就會(huì)產(chǎn)生一個(gè)時(shí)間窗口,若在這個(gè)窗口內(nèi)producer宕機(jī),那么就有可能丟失消息。producer的這種批量發(fā)送的機(jī)制會(huì)產(chǎn)生兩個(gè)問(wèn)題:

    producer端的無(wú)消息丟失配置如下:

    block.on.buffer.full=true
    # 該參數(shù)的目的主要是在producer的發(fā)送緩沖區(qū)滿(mǎn)的時(shí)候,必須等到緩沖區(qū)的數(shù)據(jù)處理完畢才會(huì)開(kāi)始下一個(gè)批次的消息處理
    
    acks=all or -1
    # 該參數(shù)的作用是控制broker只有在所有的ISR中的副本都寫(xiě)入消息完畢才會(huì)發(fā)送響應(yīng)給producer
    
    retries=Integer.MAX_VALUE
    # 該參數(shù)指定了重試次數(shù),這樣可以保證可重試的消息一定會(huì)寫(xiě)入成功,而對(duì)于不可重試的異常,kafka會(huì)將其直接返回
    
    max.in.flight.requests.per.connection=1
    # 該參數(shù)指定了producer同一時(shí)刻只會(huì)給一個(gè)broker發(fā)送消息,并且會(huì)等待該消息處理完畢
    
    使用帶回調(diào)機(jī)制的send()發(fā)送消息,即KafkaProducer.send(record, callback);
    # 使用這種機(jī)制是因?yàn)榭梢酝ㄟ^(guò)這種方式來(lái)確保消息處理完成,這里一定不能使用不帶任何參數(shù)的send()方法,因?yàn)樵摲椒ㄊ且粋€(gè)異步方式,其無(wú)法保證消息的順序性
    
    Callback邏輯中顯示地立即關(guān)閉producer,使用close(0)
    # 在Callback中顯示的調(diào)用close(0)方法可以保證producer不會(huì)將未完成的消息發(fā)送出去
    
    unclean.leader.election.enable=false
    # 關(guān)閉該參數(shù)可以保證broker在宕機(jī)重新選舉時(shí),不會(huì)將沒(méi)有完全同步的broker副本選為新的leader,因?yàn)檫@樣會(huì)導(dǎo)致消息丟失
    
    replication.factor=3
    # 該參數(shù)指定了每個(gè)topic的副本數(shù)
    
    min.insync.replicas=2
    # 該參數(shù)指定了ISR中至少需要有多少個(gè)副本處于同步狀態(tài),其必須大于1,因?yàn)?個(gè)的時(shí)候表示只有l(wèi)eader副本,需要注意的是,只有producer的acks為-1或者all時(shí)該參數(shù)才有效
    
    replication.factor > min.insync.replicas
    # 這種配置可以保證當(dāng)前分區(qū)的副本數(shù)大于等于ISR中的副本數(shù),這樣就可以允許至少一個(gè)或多個(gè)副本宕機(jī),否則,如果某個(gè)副本宕機(jī),那么如果producer設(shè)置的acks為-1或all時(shí),始終沒(méi)法保證ISR中的數(shù)量達(dá)到目標(biāo)副本數(shù)
    
    enable.auto.commit=false
    # 設(shè)置自動(dòng)提交為false,這樣可以保證消費(fèi)者消費(fèi)完成之后再提交消費(fèi)信息


    • 如果在發(fā)送batch數(shù)據(jù)之前producer宕機(jī),那么就會(huì)丟失這一部分?jǐn)?shù)據(jù);

    • 如果在發(fā)送batch數(shù)據(jù)的過(guò)程中,某個(gè)數(shù)據(jù)發(fā)送失敗了,該消息就會(huì)被重試,而其后面成功發(fā)送的數(shù)據(jù)是不會(huì)重試的,因而如果重試后成功,那么這條本應(yīng)該在前面的消息,其就會(huì)排到后面了;

  • producer端的時(shí)間消耗主要發(fā)生在壓縮上,而壓縮的效率與batch的大小是有一定關(guān)系的。batch大小越大,壓縮時(shí)間就越長(zhǎng),不過(guò)時(shí)間的增長(zhǎng)不是線(xiàn)性的,而是越來(lái)越平緩的。如果發(fā)現(xiàn)壓縮很慢,說(shuō)明系統(tǒng)的瓶頸在用戶(hù)主進(jìn)程而不是IO線(xiàn)程,因此可以考慮增加多個(gè)用戶(hù)線(xiàn)程同時(shí)發(fā)送消息,這樣通常能顯著的提升producer的吞吐量。

  • 消費(fèi)者組:消費(fèi)者所使用一個(gè)消費(fèi)者組名(即group.id)來(lái)標(biāo)記自己,topic的每條消息都只會(huì)被發(fā)送到每個(gè)訂閱它的消費(fèi)者組的一個(gè)消費(fèi)者實(shí)例上。

  • 消費(fèi)者組的要點(diǎn):a. 一個(gè)consumer group可能有若干個(gè)consumer實(shí)例(一個(gè)group只有一個(gè)示例也是允許的);b. 對(duì)于同一個(gè)group而言,topic的每條消息只能被發(fā)送到group下的一個(gè)consumer實(shí)例上;③topic消息可以被發(fā)送到多個(gè)group中。

  • 對(duì)于使用consumer group的優(yōu)點(diǎn),其主要是實(shí)現(xiàn)消費(fèi)者的高伸縮性、容錯(cuò)性的目的。在正常情況下,消費(fèi)者組里的消費(fèi)者會(huì)被平均的分配各個(gè)partition以進(jìn)行消息的消費(fèi),當(dāng)某個(gè)消費(fèi)者宕機(jī)時(shí),consumer group會(huì)將已經(jīng)崩潰的消費(fèi)者所消費(fèi)的分區(qū)分配給其他的消費(fèi)者進(jìn)行消費(fèi),通過(guò)這種方式實(shí)現(xiàn)集群容錯(cuò)。如果某個(gè)topic的消息比較多,分區(qū)數(shù)也比消費(fèi)者數(shù)量多,此時(shí)如果消費(fèi)者處理速度比生產(chǎn)者生產(chǎn)消息的速度低,那么就會(huì)出現(xiàn)消息積壓的情況,此時(shí)就可以通過(guò)增加消費(fèi)者數(shù)量的方式提升消費(fèi)效率,只需要將新加入的消費(fèi)者實(shí)例的groupId指定為當(dāng)前產(chǎn)生積壓的group的id即可,consumer group就會(huì)自動(dòng)將多余的分區(qū)分配給該實(shí)例。這里說(shuō)的這兩種重分配partition的方式稱(chēng)為再平衡。

  • kafka將保存位移信息沒(méi)有保存在zookeeper的原因有兩點(diǎn):①zookeeper本質(zhì)上是一個(gè)分布式的服務(wù)協(xié)調(diào)工具,其不適合做數(shù)據(jù)存儲(chǔ)服務(wù);②zookeeper對(duì)于讀的性能是非常高的,但是寫(xiě)性能比較低,而對(duì)于kafka這種需要頻繁寫(xiě)入消費(fèi)進(jìn)度的場(chǎng)景,其是不適合的。在新版本中,kafka將每個(gè)topic的消費(fèi)進(jìn)度都保存在了一個(gè)特殊的topic下,即__consumer_offsets。

  • kafka將位移信息提交到__consumer_offsets這個(gè)topic下的原理如下:首先consumer會(huì)將其要提交的信息組成一個(gè)KV形式的消息,其key是groupId+topic+partition的一個(gè)字符串連接的形式,其值則為當(dāng)前的偏移量;然后在消息發(fā)送到該topic之后,這個(gè)topic會(huì)對(duì)消息進(jìn)行壓實(shí)處理,也就是說(shuō),會(huì)取每個(gè)key下面的最大的value,而只保存該value。通過(guò)這種方式,在__consumer_offsets下就為每個(gè)group的某個(gè)topic下的某個(gè)分區(qū)唯一保存了一條數(shù)據(jù),這條數(shù)據(jù)就是最新的offset。

感謝各位的閱讀,以上就是“Kafka的知識(shí)點(diǎn)有哪些”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Kafka的知識(shí)點(diǎn)有哪些這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI