您好,登錄后才能下訂單哦!
這篇文章主要介紹了kafka的編程模型有哪些,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
1.kafka消費(fèi)者編程模型
分區(qū)消費(fèi)模型
組(group)消費(fèi)模型
1.1.1.分區(qū)消費(fèi)架構(gòu)圖,每個(gè)分區(qū)對(duì)應(yīng)一個(gè)消費(fèi)者。
1.1.2.分區(qū)消費(fèi)模型偽代碼描述
指定偏移量,用于從上次消費(fèi)的地方開(kāi)始消費(fèi).
提交offset ,java客戶端會(huì)自動(dòng)提交的集群,所以這一步可選。
1.2.1.組消費(fèi)模型架構(gòu)圖
每個(gè)組都消費(fèi)該topic的全量數(shù)據(jù),一條消息會(huì)發(fā)給groupA和groupB.
1.2.2.組消費(fèi)模型偽代碼:
流數(shù)N:表示一個(gè)consumer組里面有幾個(gè)consumer 實(shí)例,上例中組A創(chuàng)建2個(gè)流,組B創(chuàng)建4個(gè)流。
1.2.3.consumer分配算法
當(dāng)kafka的分區(qū)個(gè)數(shù)大于組A里consumer實(shí)例個(gè)數(shù)時(shí),怎么去分配,以下為分配步驟:
Partition消費(fèi)模型更加靈活但是:
(1)需要自己處理各種異常情況;
(2)需要自己管理offset(以實(shí)現(xiàn)消息傳遞的其他語(yǔ)義);
Group消費(fèi)模型更加簡(jiǎn)單,但是不靈活:
(1)不需要自己處理異常情況,不需要自己管理offset;
(2)只能實(shí)現(xiàn)kafka默認(rèn)的最少一次消息傳遞語(yǔ)義;
知識(shí)補(bǔ)充:消息傳遞的3中語(yǔ)義:
至少一次,(消息不會(huì)丟,消息者至少得到一次,但有可能會(huì)重復(fù),生產(chǎn)者向消費(fèi)者發(fā)送之后,會(huì)等待消費(fèi)者確認(rèn),沒(méi)收到確認(rèn)會(huì)再發(fā)) (kafka 默認(rèn)實(shí)現(xiàn)的語(yǔ)義)。
至多一次,(消息會(huì)丟)
有且只有一次。
fetchSize: 從服務(wù)器獲取單包大小;
bufferSize: kafka客戶端緩沖區(qū)大小;
group.id: 分組消費(fèi)時(shí)分組名 (指定的每個(gè)組將獲得全量的數(shù)據(jù))
同步生產(chǎn)模型
異步生產(chǎn)模型
至少成功一次 , 發(fā)送給kafka消費(fèi)者
打包發(fā)送給kafka broker。
main()
創(chuàng)建到kafka broker的連接:KafkaClient(host,port)
選擇或者自定義生產(chǎn)者負(fù)載均衡算法 partitioner (算法有:hash,輪詢,隨機(jī))
設(shè)置生產(chǎn)者參數(shù) (緩存隊(duì)列長(zhǎng)度,發(fā)送時(shí)間,同步/異步參數(shù)設(shè)置)
根據(jù)負(fù)載均衡算法和設(shè)置的生產(chǎn)者參數(shù)構(gòu)造Producer對(duì)象
while True
getMessage:從上游獲得一條消息
按照kafka要求的消息格式構(gòu)造kafka消息
根據(jù)分區(qū)算法得到分區(qū)
發(fā)送消息
處理異常
同步生產(chǎn)模型:
(1)低消息丟失率;
(2)高消息重復(fù)率(由于網(wǎng)絡(luò)原因,回復(fù)確認(rèn)未收到);
(3)高延遲 (每發(fā)一條消息需要確認(rèn))
(使用在不丟消息場(chǎng)景)
異步生產(chǎn)模型:
(1)低延遲;
(2)高發(fā)送性能;(每秒一個(gè)分區(qū)發(fā)50萬(wàn)條)
(3)高消息丟失率(無(wú)確認(rèn)機(jī)制,發(fā)送端隊(duì)列滿了,消息會(huì)丟掉;整個(gè)隊(duì)列發(fā)送給)
(使用在允許丟消息場(chǎng)景,偶爾丟一條)
//同步配置參數(shù):
默認(rèn)的序列化方式:字節(jié)序列化。
設(shè)定分區(qū)算法:默認(rèn)是對(duì)key進(jìn)行hash分區(qū)算法,可以自定義分區(qū)算法。
確認(rèn)機(jī)制 request.require.acks: 合理設(shè)置為1; 0: 絕不等確認(rèn) 1: leader的一個(gè)副本收到這條消息,并發(fā)回確認(rèn) -1: leader的所有副本都收到這條消息,并發(fā)回確認(rèn)
消息是以key-value的形式發(fā)送的,key必須要設(shè)置。
message.send.max.retries: 發(fā)送失敗重試次數(shù);
retry.backoff.ms :未接到確認(rèn),認(rèn)為發(fā)送失敗的時(shí)間;
producer.type: 同步發(fā)送或者異步發(fā)送;
batch.num.messages: 異步發(fā)送時(shí),累計(jì)最大消息數(shù);
queue.buffering.max.ms:異步發(fā)送時(shí),累計(jì)最大時(shí)間;
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“kafka的編程模型有哪些”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。