溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

ZooKeeper實(shí)現(xiàn)生產(chǎn)-消費(fèi)者隊(duì)列

發(fā)布時(shí)間:2020-05-19 03:48:30 來(lái)源:網(wǎng)絡(luò) 閱讀:436 作者:Java_老男孩 欄目:編程語(yǔ)言

生產(chǎn)-消費(fèi)者隊(duì)列,用于多節(jié)點(diǎn)的分布式數(shù)據(jù)結(jié)構(gòu),生產(chǎn)和消費(fèi)數(shù)據(jù)。生產(chǎn)者創(chuàng)建一個(gè)數(shù)據(jù)對(duì)象,并放到隊(duì)列中;消費(fèi)者從隊(duì)列中取出一個(gè)數(shù)據(jù)對(duì)象并進(jìn)行處理。在ZooKeeper中,隊(duì)列可以使用一個(gè)容器節(jié)點(diǎn)下創(chuàng)建多個(gè)子節(jié)點(diǎn)來(lái)實(shí)現(xiàn);創(chuàng)建子節(jié)點(diǎn)時(shí),CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper會(huì)自動(dòng)在節(jié)點(diǎn)名稱(chēng)后面添加唯一序列號(hào)。EPHEMERAL_SEQUENTIAL也有同樣的特點(diǎn),區(qū)別在于會(huì)話結(jié)束后是否會(huì)自動(dòng)刪除。

敲小黑板:*_SEQUENTIAL是ZooKeeper的一個(gè)很重要的特性,分布式鎖、選舉制度都依靠這個(gè)特性實(shí)現(xiàn)的。

1????? 對(duì)前續(xù)代碼的重構(gòu)

之前的文章,我們已經(jīng)用實(shí)現(xiàn)了Watcher和Barrier,創(chuàng)建ZooKeeper連接的代碼已經(jīng)復(fù)制了一遍。后續(xù)還需要類(lèi)似的工作,因此先對(duì)原有代碼做一下重構(gòu),讓代碼味道干凈一點(diǎn)。

?ZooKeeper實(shí)現(xiàn)生產(chǎn)-消費(fèi)者隊(duì)列

以下是 process(WatchedEvent)的代碼

final public void process(WatchedEvent event) {

  if (Event.EventType.None.equals(event.getType())) {

    // 連接狀態(tài)發(fā)生變化

    if (Event.KeeperState.SyncConnected.equals(event.getState())) {

      // 連接建立成功

      connectedSemaphore.countDown();

    }

  } else if (Event.EventType.NodeCreated.equals(event.getType())) {

    processNodeCreated(event);

  } else if (Event.EventType.NodeDeleted.equals(event.getType())) {

    processNodeDeleted(event);

  } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {

    processNodeDataChanged(event);

  } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {

    processNodeChildrenChanged(event);

  }

}

以ZooKeeperBarrier為例,看看重構(gòu)之后的構(gòu)造函數(shù)和監(jiān)聽(tīng)Event的代碼

ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)

    throws IOException {

  super(address);

  this.tableSerial = createRootNode(tableSerial);

  this.tableCapacity = tableCapacity;

  this.customerName = customerName;

}

protected void processNodeChildrenChanged(WatchedEvent event) {

  log.info("{} 接收到了通知 : {}", customerName, event.getType());

  // 子節(jié)點(diǎn)有變化

  synchronized (mutex) {

    mutex.notify();

  }

}

2 隊(duì)列的生產(chǎn)者

生產(chǎn)者的關(guān)鍵代碼

String elementName = queueName + "/element";
ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;
CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;
getZooKeeper().create(elementName, value, ids, createMode);

注意,重點(diǎn)是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存儲(chǔ)直到有命令刪除,SEQUENTIAL表示自動(dòng)在后面添加自增的唯一序列號(hào)。這樣,盡管elementName都一樣,但實(shí)際生成的zNode名字在 “element”后面會(huì)添加格式為%010d的10個(gè)數(shù)字,如0000000001。如一個(gè)完整的zNode名可能為/queue/element0000000021。

3 隊(duì)列的消費(fèi)者

消費(fèi)者嘗試從子節(jié)點(diǎn)列表獲取zNode名最小的一個(gè)子節(jié)點(diǎn),如果隊(duì)列為空則等待NodeChildrenChanged事件。關(guān)鍵代碼

/** 隊(duì)列的同步信號(hào) */

private static Integer queueMutex = Integer.valueOf(1);

@Override

protected void processNodeChildrenChanged(WatchedEvent event) {

  synchronized (queueMutex) {

    queueMutex.notify();

  }

}

/**

 * 從隊(duì)列中刪除第一個(gè)對(duì)象

 *

 * @return

 * @throws KeeperException

 * @throws InterruptedException

 */

int consume() throws KeeperException, InterruptedException {

  while (true) {

    synchronized (queueMutex) {

      List<String> list = getZooKeeper().getChildren(queueName, true);

      if (list.size() == 0) {

        queueMutex.wait();

      } else {

        // 獲取第一個(gè)子節(jié)點(diǎn)的名稱(chēng)

        String firstNodeName = getFirstElementName(list);

        // 刪除節(jié)點(diǎn),并返回節(jié)點(diǎn)的值

        return deleteNodeAndReturnValue(firstNodeName);

      }

    }

  }

}

4 測(cè)試日志

把測(cè)試結(jié)果放源碼前面,免得大家被無(wú)聊的代碼晃暈。

測(cè)試代碼創(chuàng)建了兩個(gè)線程,一個(gè)線程是生產(chǎn)者,按隨機(jī)間隔往隊(duì)列中添加對(duì)象;一個(gè)線程是消費(fèi)者,隨機(jī)間隔嘗試從隊(duì)列中取出第一個(gè),如果當(dāng)時(shí)隊(duì)列為空,會(huì)等到直到新的數(shù)據(jù)。

兩個(gè)進(jìn)程都加上隨機(jī)間隔,是為了模擬生產(chǎn)可能比消費(fèi)更快的情況。以下是測(cè)試日志,為了更突出,生產(chǎn)和消費(fèi)的日志我增加了不同的文字樣式。

49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 開(kāi)始ZooKeeper隊(duì)列測(cè)試,本次將測(cè)試 10 個(gè)數(shù)據(jù)

49:48.076 [DEBUG] ZooKeeperQueue.log(201)

+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]

|-- elapsed time                   [開(kāi)始鏈接]   119.863 milliseconds.

|-- elapsed time           [等待連接成功的Event]    40.039 milliseconds.

|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]   159.911 milliseconds.

49:48.082 [DEBUG] ZooKeeperQueue.log(201)

+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]

|-- elapsed time                   [開(kāi)始鏈接]   103.795 milliseconds.

|-- elapsed time           [等待連接成功的Event]    65.899 milliseconds.

|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]   170.263 milliseconds.

49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 1 , 然后等待 1700 毫秒

49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 1 , 然后等待 4000 毫秒

49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 2 , 然后等待 900 毫秒

49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 3 , 然后等待 1300 毫秒

49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 4 , 然后等待 3700 毫秒

49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 2 , 然后等待 2800 毫秒

49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 3 , 然后等待 4500 毫秒

49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 5 , 然后等待 3500 毫秒

49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 6 , 然后等待 4200 毫秒

49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 4 , 然后等待 2400 毫秒

50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 5 , 然后等待 4900 毫秒

50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 7 , 然后等待 4500 毫秒

50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 6 , 然后等待 3600 毫秒

50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 8 , 然后等待 1900 毫秒

50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 9 , 然后等待 3200 毫秒

50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 7 , 然后等待 2900 毫秒

50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對(duì)象 : 10 , 然后等待 4900 毫秒

50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 8 , 然后等待 300 毫秒

50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 9 , 然后等待 4800 毫秒

50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消費(fèi)對(duì)象: 10 , 然后等待 2400 毫秒
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI