您好,登錄后才能下訂單哦!
這篇文章主要介紹“Springboot異步消息處理的方法”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Springboot異步消息處理的方法”文章能幫助大家解決問(wèn)題。
在工作中經(jīng)常會(huì)碰到需要進(jìn)行異步消息處理的業(yè)務(wù)場(chǎng)景,根據(jù)消息性質(zhì)的不同有完全不同的處理方式。
1、消息不獨(dú)立
不獨(dú)立的消息通常是有順序依賴關(guān)系,這時(shí)消息處理機(jī)制將退化為線性隊(duì)列處理模式,只能由一個(gè)消費(fèi)者去單線程處理消息。
2、消息完全獨(dú)立
完全獨(dú)立的消息,可以由多個(gè)消費(fèi)者(線程)并發(fā)同時(shí)處理,可以達(dá)到最大的并發(fā)處理能力。
3、消息不完全獨(dú)立
通常這種情況是,同源消息(來(lái)自同一生產(chǎn)者)要求有序,異源消息順序無(wú)關(guān)。
這個(gè)場(chǎng)景的消息處理會(huì)相對(duì)復(fù)雜點(diǎn),為了保證同源消息有序,很容易想到對(duì)同一來(lái)源的消息綁定固定的消費(fèi)者線程,這樣做很簡(jiǎn)單但存在很大問(wèn)題。
如果生產(chǎn)者數(shù)量很大,綁定線程數(shù)可能不夠,當(dāng)然可以復(fù)用線程資源,同一線程綁定多個(gè)消息來(lái)源進(jìn)行處理,這樣做又會(huì)有另一個(gè)問(wèn)題:消息源之間的相互影響。
考慮以下場(chǎng)景:
生產(chǎn)者P1產(chǎn)生大量消息進(jìn)入隊(duì)列后被分配給消費(fèi)線程C1處理(C1可能需要處理很長(zhǎng)時(shí)間),這時(shí)生產(chǎn)者P2產(chǎn)生了一個(gè)消息,不幸的是也被分配給了消費(fèi)線程C1處理
那么生產(chǎn)者P2的消息處理將被P1的大量消息給阻塞住,導(dǎo)致了P1和P2之間的相互影響,而且也不能充分利用其它消費(fèi)線程導(dǎo)致不均衡。
所以,我們必須考慮避免這樣的問(wèn)題。做到消費(fèi)處理的及時(shí)性(盡快)、隔離性(避免相互干擾)、均衡性(最大化并發(fā)處理)
在實(shí)現(xiàn)中,會(huì)有兩種模式,比較容易想到的是線程派發(fā)模型(PUSH方式),具體做法通常如下:
1. 有一個(gè)全局消息派發(fā)者,輪詢隊(duì)列取出消息。
2. 根據(jù)消息來(lái)源,派發(fā)給合適的消費(fèi)線程處理。
派發(fā)的算法機(jī)制簡(jiǎn)單的可以類似像基于消息來(lái)源的Hash,復(fù)雜的可以根據(jù)各個(gè)消費(fèi)線程的當(dāng)前負(fù)載,等待隊(duì)列長(zhǎng)短、消息的復(fù)雜度進(jìn)行綜合分析選擇派發(fā)。
簡(jiǎn)單Hash肯定會(huì)碰到上述場(chǎng)景描述的問(wèn)題,但復(fù)雜派發(fā)計(jì)算很明顯實(shí)現(xiàn)起來(lái)非常麻煩和復(fù)雜,效率也不一定好,在均衡性方面也很難做到十分平衡。
第二種模式采用PULL方式,線程按需拉取,具體做法如下:
1. 消息源直接將產(chǎn)生的消息放入對(duì)應(yīng)該源的臨時(shí)隊(duì)列中(如下所示每個(gè)session代表一個(gè)不同的消息來(lái)源),再將session置入一個(gè)阻塞隊(duì)列通知線程處理
2. 多個(gè)消費(fèi)線程同時(shí)輪詢隊(duì)列,爭(zhēng)搶消息(保證只有一個(gè)線程取到
3. 檢查隊(duì)列指示器是否正被其他線程處理(實(shí)現(xiàn)時(shí)需要在線程級(jí)別基于同源消息的檢測(cè)同步)
4. 若未被其他線程處理,則在同步區(qū)置處理中指示狀態(tài),退出同步區(qū)后對(duì)臨時(shí)隊(duì)列中的消息進(jìn)行處理
5. 處理完成后,最后再次進(jìn)入同步區(qū)置處理指示狀態(tài)為空閑
下面用一段代碼來(lái)描述下消費(fèi)線程處理流程:
public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
關(guān)于“Springboot異步消息處理的方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。