溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么進(jìn)行kafka的異步模式分析

發(fā)布時(shí)間:2021-12-15 09:30:12 來(lái)源:億速云 閱讀:206 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了怎么進(jìn)行kafka的異步模式分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

啥是異步模式

kafka的生產(chǎn)者可以選擇使用異步方式發(fā)送數(shù)據(jù),所謂異步方式,就是我們調(diào)用 send() 方法,并指定一個(gè)回調(diào)函數(shù), 服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)。

kafka在客戶端里暴露了兩個(gè)send方法,我們可以自己選擇同步或者異步模式。我們來(lái)看一個(gè)kafka的生產(chǎn)者發(fā)送示例,有個(gè)直觀的感受。這個(gè)示例是一個(gè)同步的模式。

ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “測(cè)試”);//Topic Key Value
try{
Future future = producer.send(record);
future.get();//獲取執(zhí)行結(jié)果
} catch(Exception e) {
e.printStackTrace();
}
 

我們從源碼層面來(lái)繼續(xù)看下。

首先kafka定義了一個(gè)接口,

怎么進(jìn)行kafka的異步模式分析

然后KafkaProducer實(shí)現(xiàn)了這兩個(gè)方法,我們看下異步方法的實(shí)現(xiàn)邏輯。

怎么進(jìn)行kafka的異步模式分析

可以看到最終是調(diào)用doSend方法,調(diào)用的時(shí)候傳入一個(gè)回調(diào)。這個(gè)回調(diào)就是監(jiān)聽方法的執(zhí)行結(jié)果的。

 

異步模式也會(huì)阻塞的

很多人會(huì)認(rèn)為,既然是異步模式,不管結(jié)果是成功還是失敗,肯定方法調(diào)用會(huì)馬上返回的。那我只能告訴你,不好意思,不一定是這樣。我自己就曾經(jīng)踩過這個(gè)坑。

我們當(dāng)時(shí)有個(gè)業(yè)務(wù)流程需要在執(zhí)行完成后發(fā)送kakfa消息給某個(gè)業(yè)務(wù)方,為了盡量減少影響我這個(gè)主流程的執(zhí)行時(shí)間,采用了異步方式發(fā)送kafka消息。在使用中,因?yàn)榕溴e(cuò)了kafka的TOPIC信息,發(fā)現(xiàn)流程阻塞發(fā)送消息這里長(zhǎng)達(dá)6秒(kafka默認(rèn)的發(fā)送超時(shí)時(shí)間)。

究竟為啥異步方式還會(huì)阻塞呢?我們繼續(xù)看源碼。

怎么進(jìn)行kafka的異步模式分析

不管是同步模式還是異步模式,最終都會(huì)調(diào)用到doSend方法,注意看上圖中的waitOnMetadata方法,我上面說(shuō)的阻塞的情況就是阻塞在這個(gè)方法里。那我們繼續(xù)看這個(gè)方法。

怎么進(jìn)行kafka的異步模式分析

通過代碼中的注釋我們大概能了解這個(gè)方法的功能,不過我這里還是要解釋下。(防止有人看不懂英文,哈哈)

waitOnMetadata獲取當(dāng)前的集群元數(shù)據(jù)信息,如果緩存有,并且分區(qū)沒有超過指定分區(qū)范圍則緩存返回,否則觸發(fā)更新,等待新的metadata。這個(gè)等待的操作在下面這行代碼:

metadata.awaitUpdate(version, remainingWaitMs);
 

然后就繼續(xù)跟嘍,

怎么進(jìn)行kafka的異步模式分析

這個(gè)方法很好理解,就是一直在等一個(gè)條件,這個(gè)條件達(dá)到了就返回,否則一直等待超時(shí)退出。而這個(gè)條件就是當(dāng)前的版本號(hào)要大于上個(gè)版本號(hào)。

那么誰(shuí)來(lái)更新版本號(hào)呢?就是我們前面提到的sender線程。當(dāng)我們的topic配置錯(cuò)誤的時(shí)候?qū)е耺etadata一直無(wú)法更新,然后一直等到超時(shí)。

上述內(nèi)容就是怎么進(jìn)行kafka的異步模式分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI