您好,登錄后才能下訂單哦!
隊(duì)列在數(shù)據(jù)結(jié)構(gòu)中是一種線性表,從一端插入數(shù)據(jù),然后從另一端刪除數(shù)據(jù)。本文目的不是講解各種隊(duì)列算法,而是在應(yīng)用層面講述使用隊(duì)列能解決哪些場(chǎng)景問(wèn)題。
在我開發(fā)過(guò)的系統(tǒng)中,不是所有的業(yè)務(wù)都必須實(shí)時(shí)處理、不是所有的請(qǐng)求都必須實(shí)時(shí)反饋結(jié)果給用戶、不是所有的請(qǐng)求/處理都必須100%處理成功、不知道誰(shuí)依賴“我”的處理結(jié)果、不關(guān)心其他系統(tǒng)如何處理后續(xù)業(yè)務(wù)、不需要強(qiáng)一致性,只需保證最終一致性即可、想要保證數(shù)據(jù)處理的有序性;此時(shí)你應(yīng)該考慮使用隊(duì)列來(lái)解決這些問(wèn)題。在實(shí)際開發(fā)時(shí)我們經(jīng)常使用隊(duì)列進(jìn)行異步處理、系統(tǒng)解耦、數(shù)據(jù)同步、流量削峰、緩沖、限流等。
應(yīng)用場(chǎng)景
異步處理:使用隊(duì)列的一個(gè)主要原因是進(jìn)行異步處理,比如用戶注冊(cè)成功后需要發(fā)送注冊(cè)成功郵件/新用戶積分/優(yōu)惠券等等、緩存過(guò)期時(shí)先返回老的數(shù)據(jù),然后異步更新緩存、異步寫日志等;通過(guò)異步處理,可以提升主流程響應(yīng)速度,而非主流程/非重要業(yè)務(wù)可以異步集中處理,這樣還可以將任務(wù)聚合然后批量處理;因此可以使用消息隊(duì)列/任務(wù)隊(duì)列來(lái)進(jìn)行異步處理。下載
系統(tǒng)解耦:比如用戶成功支付完成訂單后,需要通知生產(chǎn)配貨系統(tǒng)、發(fā)票系統(tǒng)、庫(kù)存系統(tǒng)、推薦系統(tǒng)、搜索系統(tǒng)、風(fēng)控系統(tǒng)等進(jìn)行業(yè)務(wù)處理;而未來(lái)需要添加/支持哪些業(yè)務(wù)是不清楚的,而且這些業(yè)務(wù)處理不需要實(shí)時(shí)處理、不需要強(qiáng)一致,只需要最終一致性即可,因此可以通過(guò)消息隊(duì)列/任務(wù)隊(duì)列進(jìn)行系統(tǒng)解耦。
數(shù)據(jù)同步:比如想把Mysql變更的數(shù)據(jù)同步到Redis、或者將Mysql數(shù)據(jù)同步到Mongodb、或者機(jī)房間數(shù)據(jù)同步、或者主從數(shù)據(jù)同步等,此時(shí)可以考慮使用如databus、canal、otter。使用數(shù)據(jù)總線隊(duì)列進(jìn)行數(shù)據(jù)同步的好處是可以保證數(shù)據(jù)修改的有序性。下載
流量削峰:系統(tǒng)瓶頸一般在數(shù)據(jù)庫(kù)上,比如扣減庫(kù)存、下單等;此時(shí)可以考慮使用隊(duì)列將變更請(qǐng)求暫時(shí)放入隊(duì)列,通過(guò)緩存+隊(duì)列暫存的方式將數(shù)據(jù)庫(kù)流量削峰;還有如秒殺系統(tǒng),下單服務(wù)會(huì)是該系統(tǒng)的瓶頸,此時(shí)會(huì)使用隊(duì)列進(jìn)行排隊(duì)和限流,從而保護(hù)下單服務(wù)。通過(guò)隊(duì)列暫存或者隊(duì)列限流來(lái)削峰。
比如減庫(kù)存,可以考慮這樣設(shè)計(jì):下載
直接在Redis中扣減,然后記錄下扣減日志(FIFO隊(duì)列),通過(guò)Worker去同步到DB。
實(shí)際隊(duì)列的應(yīng)用場(chǎng)景還是非常多的,本文列舉了筆者遇到過(guò)比較多的場(chǎng)景。
緩沖區(qū)隊(duì)列
典型的如Log4j的日志緩沖區(qū),當(dāng)我們使用log4j記錄日志時(shí),可以配置字節(jié)緩沖區(qū),字節(jié)緩存區(qū)滿時(shí)會(huì)立即同步到磁盤(flush操作)。Log4j使用BufferedWriter實(shí)現(xiàn)的;此模式不是異步寫,在緩沖區(qū)滿的時(shí)候還是會(huì)阻塞主線程。如果需要異步模式可以使用AsyncAppender,然后通過(guò)bufferSize控制日志事件緩沖區(qū)大小。
通過(guò)緩沖區(qū)隊(duì)列可以實(shí)現(xiàn):批量處理、異步處理。
任務(wù)隊(duì)列
使用任務(wù)隊(duì)列將一些不需要與主線程同步執(zhí)行的任務(wù)扔到任務(wù)隊(duì)列異步處理即可;筆者用的最多的是線程池任務(wù)隊(duì)列(默認(rèn)LinkedBlockingQueue)和Disruptor任務(wù)隊(duì)列(RingBuffer)。如刷數(shù)據(jù)時(shí),將任務(wù)扔到隊(duì)列異步處理即可,處理成功后再異步通知用戶;還有如刪除SKU操作,用戶請(qǐng)求時(shí)直接將任務(wù)分解并扔到隊(duì)列,異步處理,處理成功后異步通知用戶即可;還有如查詢聚合,將多個(gè)可并行處理的任務(wù)扔到隊(duì)列然后等待最慢的一個(gè)返回。如果使用的是內(nèi)存任務(wù)隊(duì)列請(qǐng)記住可能存在系統(tǒng)重啟等問(wèn)題造成的數(shù)據(jù)丟失。下載
通過(guò)任務(wù)隊(duì)列可以實(shí)現(xiàn):異步處理、任務(wù)分解/聚合處理。
注:JDK7提供了ExecutorService的新的實(shí)現(xiàn)ForkJoinPool,其提供了Work-stealing機(jī)制,可以更好地提升并發(fā)效率。
在使用Executors.newFixedThreadPool時(shí),其沒有設(shè)置隊(duì)列大?。J(rèn)Integer.MAX_VALUE),如果有大量任務(wù)被緩存到LinkedBlockingQueue中等待線程執(zhí)行,會(huì)出現(xiàn)GC慢等問(wèn)題,造成系統(tǒng)響應(yīng)慢甚至OOM。因此在使用線程池時(shí)候,要指定隊(duì)列大小并設(shè)置合理的RejectedExecutionHandler;要記錄請(qǐng)求來(lái)源的參數(shù)方便定位引發(fā)問(wèn)題的源頭。下載
消息隊(duì)列
筆者所在公司使用的是自研的JMQ;開源的有ActiveMQ、Kafka、Redis。使用消息隊(duì)列存儲(chǔ)各業(yè)務(wù)數(shù)據(jù),其他系統(tǒng)根據(jù)需要訂閱即可。常見的模式是:點(diǎn)對(duì)點(diǎn)(一個(gè)消息只有一個(gè)消費(fèi)者)、發(fā)布訂閱(一個(gè)消息可以有多個(gè)消費(fèi)者);而常用的是發(fā)布訂閱模式。
比如用戶注冊(cè)成功、修改商品數(shù)據(jù)、訂單狀態(tài)變更等都應(yīng)該將變更發(fā)送到消息隊(duì)列,從而其他系統(tǒng)根據(jù)需要訂閱該消息,然后按照自己的需求進(jìn)行業(yè)務(wù)邏輯開發(fā)。
在添加新功能時(shí),消息消費(fèi)者只需要訂閱該消息,然后開發(fā)相應(yīng)的業(yè)務(wù)邏輯,消息生產(chǎn)者根本不關(guān)心你怎么使用消息和你做什么業(yè)務(wù)處理。
同步調(diào)用,添加什么新功能都需要到用戶系統(tǒng)提需求。其中一個(gè)服務(wù)出現(xiàn)問(wèn)題了,整個(gè)服務(wù)就不可用了。
消息隊(duì)列,用戶系統(tǒng)只需要發(fā)布用戶注冊(cè)成功的消息即可,相關(guān)系統(tǒng)訂閱該消息,然后執(zhí)行相關(guān)的業(yè)務(wù)邏輯。相關(guān)服務(wù)出問(wèn)題不影響到注冊(cè)主流程。下載
通過(guò)消息隊(duì)列可以實(shí)現(xiàn):異步處理、系統(tǒng)解耦。
請(qǐng)求隊(duì)列
請(qǐng)求隊(duì)列是指如在Web環(huán)境下對(duì)用戶請(qǐng)求排隊(duì),從而進(jìn)行一些特殊控制:流量控制、請(qǐng)求分級(jí)、請(qǐng)求隔離;如將請(qǐng)求按照功能劃分到不同的隊(duì)列,從而使得不同的隊(duì)列出現(xiàn)問(wèn)題后相互不影響;還可以對(duì)請(qǐng)求分級(jí),一些重要請(qǐng)求可以優(yōu)先處理(發(fā)展到一定程度應(yīng)將功能物理分離);還有服務(wù)器處理能力有限,在接近服務(wù)器瓶頸時(shí)需要考慮限流,最簡(jiǎn)單的限流時(shí)丟棄處理不了的請(qǐng)求,此時(shí)可以使用隊(duì)列進(jìn)行流量控制。
數(shù)據(jù)總線隊(duì)列
一般消息隊(duì)列中的消息都是業(yè)務(wù)維度的,比如業(yè)務(wù)鍵或者業(yè)務(wù)狀態(tài)等,比如哪個(gè)SKU變更了,而有些訂閱者需要再查一遍來(lái)獲取最新的修改數(shù)據(jù)(比如緩存同步);通過(guò)現(xiàn)有的消息隊(duì)列方式的缺點(diǎn)是很難只進(jìn)行修改部分的推送和保證數(shù)據(jù)有序性。而此種場(chǎng)景比較適合使用數(shù)據(jù)總線隊(duì)列實(shí)現(xiàn)。如數(shù)據(jù)庫(kù)數(shù)據(jù)修改后需要同步數(shù)據(jù)到緩存,或者需要將一個(gè)機(jī)房數(shù)據(jù)同步到另一個(gè)機(jī)房,只是數(shù)據(jù)維度的同步,此時(shí)應(yīng)該使用數(shù)據(jù)總線隊(duì)列如canal、otter、databus;使用數(shù)據(jù)總線隊(duì)列的好處是可以保證數(shù)據(jù)的有序性。
混合隊(duì)列
下載
此處MQ是使用京東自研的JMQ,消息是可靠持久化存儲(chǔ)的;應(yīng)用會(huì)按照不同的維度發(fā)布消息到JMQ;下游應(yīng)用接收到該消息后會(huì)放入到Redis,使用Redis List來(lái)存儲(chǔ)這些任務(wù);應(yīng)用將Redis消息消費(fèi)處理后,會(huì)按照不同的維度聚合商品消息然后再次發(fā)送出去。
使用Redis隊(duì)列的主要原因是想提升消息堆積能力和并發(fā)處理能力。另外在使用Redis構(gòu)建消息隊(duì)列時(shí)需要考慮網(wǎng)絡(luò)抖動(dòng)造成的消息丟失問(wèn)題,因?yàn)镽edis是沒有回滾事務(wù)的,或者說(shuō)是確認(rèn)機(jī)制。我們使用如下方式防止消息丟失:
try {
id = queueRedis.opsForList().rightPopAndLeftPush(queueName, processingQueueName);
} catch (Exception e) {
//發(fā)生了網(wǎng)絡(luò)異常,需要把processing中的id再放回到waiting queue中
String msg = queueName + " to " + processingQueueName + " rpoplpush error";
LOG.error(msg, e);
//報(bào)警代碼
}
而對(duì)于失敗我們會(huì)進(jìn)行重試三次,重試失敗后放入失敗隊(duì)列,而失敗隊(duì)列是具有防重功能的(從本地隊(duì)列和失敗隊(duì)列排重),使用的是Redis Lua腳本實(shí)現(xiàn):下載
static EventQueueScript ADD_TO_FAIL_QUEUE_REDIS_SCRIPT = new EventQueueScript(
"redis.call('lrem', KEYS[1], 1, ARGV[1]) redis.call('lrem', KEYS[2], 1, ARGV[1]) return redis.call('lpush', KEYS[2], ARGV[1])"
);
Redis作者Antirez開發(fā)的內(nèi)存分布式消息隊(duì)列Disque是未來(lái)更好的內(nèi)存消息隊(duì)列選擇。
其他
優(yōu)先級(jí)隊(duì)列:在實(shí)際開發(fā)時(shí)肯定有些任務(wù)是緊急的,此時(shí)應(yīng)該優(yōu)先處理緊急的任務(wù);所以請(qǐng)考慮對(duì)隊(duì)列進(jìn)行分級(jí)。
副本隊(duì)列:在進(jìn)行一些系統(tǒng)重構(gòu)或者上新的功能時(shí),如果沒有足夠的信心保證業(yè)務(wù)邏輯正確,可以考慮存儲(chǔ)一份隊(duì)列的副本(比如1小時(shí)、1天的),從而當(dāng)業(yè)務(wù)出現(xiàn)問(wèn)題時(shí)可以對(duì)這些消息進(jìn)行回放。
鏡像隊(duì)列:每個(gè)隊(duì)列不會(huì)無(wú)限制訂閱數(shù)量,一定會(huì)有一個(gè)極限的;當(dāng)?shù)竭_(dá)極限時(shí)請(qǐng)考慮使用鏡像隊(duì)列方式解決該問(wèn)題。
隊(duì)列并發(fā)數(shù):不同隊(duì)列實(shí)現(xiàn),隊(duì)列服務(wù)端并發(fā)連接數(shù)是不一樣的;一定不是增大隊(duì)列并發(fā)連接數(shù)消費(fèi)能力也隨著增加;也不會(huì)因?yàn)樵黾恿讼M(fèi)服務(wù)器消費(fèi)并發(fā)能力也隨著增加,需要根據(jù)實(shí)際情況來(lái)設(shè)置合理的并發(fā)連接數(shù)。下載
推還是拉:消息體內(nèi)容不是越全越好,需要根據(jù)具體業(yè)務(wù)設(shè)計(jì)消息體;如有些系統(tǒng)依賴商品變更消息(只有一個(gè)SKU)、有些系統(tǒng)依賴商品狀態(tài)消息(SKU、狀態(tài))、有些系統(tǒng)依賴商品屬性變更消息(SKU、變更的屬性)等,如果讓所有系統(tǒng)都消費(fèi)商品變更消息,那么這些系統(tǒng)都會(huì)調(diào)用商品查詢服務(wù)拉一下最新的商品信息然后進(jìn)行處理。因此要根據(jù)實(shí)際情況來(lái)決定是使用推送方式(將系統(tǒng)需要的所有信息推過(guò)去)還是拉取方式(只推送ID,然后再查一遍)。
消息合并:如果消息寫入量非常大,應(yīng)該考慮將消息合并寫,可以"寫應(yīng)用本地磁盤隊(duì)列"-->“同步本地磁盤隊(duì)列到消息中間件”;同步時(shí)可以根據(jù)需求制定同步策略,如1秒同步1次。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。