溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Redis中的分布式Java隊(duì)列的應(yīng)用

發(fā)布時(shí)間:2020-06-17 15:24:58 來(lái)源:億速云 閱讀:411 作者:元一 欄目:編程語(yǔ)言

Redis是一個(gè)開(kāi)源的使用ANSI C語(yǔ)言編寫(xiě)、支持網(wǎng)絡(luò)、可基于內(nèi)存亦可持久化的日志型、Key-Value數(shù)據(jù)庫(kù),并提供多種語(yǔ)言的API。從2010年3月15日起,Redis的開(kāi)發(fā)工作由VMware主持。

在Redis中使用隊(duì)列

像任何消息代理一樣,Redis需要以正確的順序發(fā)送消息。 可以根據(jù)消息的年齡或某些其他預(yù)定義的優(yōu)先級(jí)等級(jí)發(fā)送消息。

為了存儲(chǔ)這些未決消息,Redis開(kāi)發(fā)人員需要隊(duì)列數(shù)據(jù)結(jié)構(gòu)。 Redisson是使用Redis和Java進(jìn)行分布式編程的框架,它提供了許多分布式數(shù)據(jù)結(jié)構(gòu)(包括隊(duì)列)的實(shí)現(xiàn)。

Redisson通過(guò)提供Java API使Redis開(kāi)發(fā)更加容易。 Redisson不需要開(kāi)發(fā)人員學(xué)習(xí)Redis命令,而是包括所有眾所周知的Java接口,例如Queue和BlockingQueue。 Redisson還處理Redis中繁瑣的幕后工作,例如連接管理,故障轉(zhuǎn)移處理和數(shù)據(jù)序列化。

基于Redis的分布式Java隊(duì)列

Redisson提供了Java中基本隊(duì)列數(shù)據(jù)結(jié)構(gòu)的多個(gè)基于Redis的實(shí)現(xiàn),每種實(shí)現(xiàn)都有不同的功能。 這使你可以選擇最適合你目的的隊(duì)列類(lèi)型。

下面,我們將使用Redisson Java框架討論六種不同類(lèi)型的基于Redis的分布式隊(duì)列。

隊(duì)列
Redisson中的RQueue對(duì)象實(shí)現(xiàn)了java.util.Queue接口。 隊(duì)列用于需要從最早的最早的元素開(kāi)始處理(也稱(chēng)為“先進(jìn)先出”或FIFO)的情況。

與普通Java一樣,可以使用peek()方法檢查RQueue的第一個(gè)元素,或者使用poll()方法檢查和刪除RQueue的第一個(gè)元素:

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

阻塞隊(duì)列

Redisson中的RBlockingQueue對(duì)象實(shí)現(xiàn)了java.util.BlockingQueue接口。

BlockingQueues是阻塞線程的隊(duì)列,這些線程試圖從空隊(duì)列中進(jìn)行輪詢(xún),或者試圖在已滿(mǎn)的隊(duì)列中插入元素。 該線程將被阻塞,直到另一個(gè)線程將一個(gè)元素插入到空隊(duì)列中,或從完整隊(duì)列中輪詢(xún)第一個(gè)元素為止。

下面的示例代碼演示了RBlockingQueue的正確實(shí)例化和使用。 特別是,你可以使用參數(shù)指定對(duì)象將等待線程變得可用的時(shí)間來(lái)調(diào)用poll()方法:

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

在故障轉(zhuǎn)移或重新連接到Redis服務(wù)器的過(guò)程中,將自動(dòng)重新預(yù)訂poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。

BoundedBlockingQueue
Redisson中的RBoundedBlockingQueue

對(duì)象實(shí)現(xiàn)了有界的阻塞隊(duì)列結(jié)構(gòu)。 有界阻塞隊(duì)列是容量已受限制(即有限)的阻塞隊(duì)列。

以下代碼演示了如何在Redisson中實(shí)例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用于嘗試設(shè)置阻塞隊(duì)列的容量。 trySetCapacity()返回布爾值“ true”或“ false”,這取決于是否成功設(shè)置了容量或是否已經(jīng)設(shè)置了容量:

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

延遲排隊(duì)

Redisson中的RDelayedQueue對(duì)象允許你在Redis中實(shí)現(xiàn)延遲隊(duì)列。 當(dāng)使用諸如指數(shù)補(bǔ)償?shù)牟呗詫⑾鬟f給消費(fèi)者時(shí),這可能會(huì)很有用。 每次嘗試發(fā)送郵件失敗后,重試之間的時(shí)間將成倍增加。

在與元素一起指定的延遲之后,延遲隊(duì)列中的每個(gè)元素將被轉(zhuǎn)移到目標(biāo)隊(duì)列。 此目標(biāo)隊(duì)列可以是實(shí)現(xiàn)RQueue接口的任何隊(duì)列,例如RBlockingQueue或RBoundedBlockingQueue。

RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在不再需要隊(duì)列之后,通過(guò)使用destroy()方法銷(xiāo)毀延遲的隊(duì)列是一個(gè)好主意。 但是,如果要關(guān)閉Redisson,則沒(méi)有必要。
PriorityQueue

Redisson中的RPriorityQueue對(duì)象實(shí)現(xiàn)了java.util.Queue接口。 優(yōu)先級(jí)隊(duì)列是不是按元素的使用期限而是按照與每個(gè)元素相關(guān)聯(lián)的優(yōu)先級(jí)排序的隊(duì)列。
如下面的示例代碼所示,RPriorityQueue使用比較器對(duì)隊(duì)列中的元素進(jìn)行排序:

RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();

PriorityBlockingQueue
Redisson中的RPriorityBlockingQueue對(duì)象結(jié)合了RPriorityQueue和RBlockingQueue的功能。 與RPriorityQueue一樣,RPriorityBlockingQueue也使用Comparator對(duì)隊(duì)列中的元素進(jìn)行排序。

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();

在故障轉(zhuǎn)移或重新連接到Redis服務(wù)器的過(guò)程中,將自動(dòng)重新預(yù)訂poll(),pollLastAndOfferFirstTo()和take()Java方法。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI