溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

CountDownLatch的簡單應(yīng)用和實(shí)現(xiàn)原理

發(fā)布時(shí)間:2020-05-23 13:22:48 來源:網(wǎng)絡(luò) 閱讀:119 作者:ckllf 欄目:編程語言

  CountDownLatch的簡單實(shí)現(xiàn)

  業(yè)務(wù)背景假設(shè):現(xiàn)在一個(gè)前端頁面的展示需要調(diào)用3個(gè)外部電商平臺接口的數(shù)據(jù),所以在我們提供給前端的這個(gè)接口上,我們要調(diào)用3個(gè)外部電商接口,最后需要對所有的數(shù)據(jù)做一個(gè)整合,方便前端展示。

  一般情況我們都是順序調(diào)用3個(gè)電商接口,得到數(shù)據(jù)后調(diào)用整合方法,假設(shè)每個(gè)電商接口調(diào)用時(shí)間為2秒,如下:

  public static void main(String[] args) throws Exception{

  long startTime = new Date().getTime();

  //調(diào)用第一個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread.sleep(2000);

  System.out.println("獲取電商平臺1的數(shù)據(jù)");

  //調(diào)用第二個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread.sleep(2000);

  System.out.println("獲取電商平臺2的數(shù)據(jù)");

  //調(diào)用第三個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread.sleep(2000);

  System.out.println("獲取電商平臺3的數(shù)據(jù)");

  System.out.println("對三個(gè)電商平臺的數(shù)據(jù)進(jìn)行合并");

  long endTime = new Date().getTime();

  long time = endTime - startTime;

  System.out.println("總耗時(shí)" + time);

  }

  調(diào)用后耗時(shí)6s,如下:

  

CountDownLatch的簡單應(yīng)用和實(shí)現(xiàn)原理


  以上方法耗時(shí)太長了,需要優(yōu)化,優(yōu)化思路:因?yàn)?個(gè)接口沒有先后關(guān)系,所以完全可以并行執(zhí)行,之后再做數(shù)據(jù)的整合,這樣設(shè)計(jì)接口耗時(shí)肯定會節(jié)省很多

  使用CountDownLatch來實(shí)現(xiàn)以上優(yōu)化思路

  CountDownLatch是什么:CountDownLatch是java.util.concurrent包下的類,它在多線程并發(fā)編程里充當(dāng)這計(jì)數(shù)器的功能,通過構(gòu)造函數(shù)維護(hù)一個(gè)int類型的初始值,如果一個(gè)線程調(diào)用await()方法,那么該線程就會進(jìn)入阻塞狀態(tài),直到初始值變?yōu)?后,調(diào)用await()方法的阻塞線程將會被喚醒,執(zhí)行后續(xù)操作,而通過countDown()這個(gè)方法,我們就能夠?qū)崿F(xiàn)初始值的減法,每調(diào)用一次,初始值減一。

  具體實(shí)現(xiàn)代碼如下:

  public static void main(String[] args) throws Exception {

  CountDownLatch countDownLatch = new CountDownLatch(3);

  long startTime = new Date().getTime();

  //調(diào)用第一個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread1 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺一的數(shù)據(jù)");

  countDownLatch.countDown();

  });

  //調(diào)用第二個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread2 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺二的數(shù)據(jù)");

  countDownLatch.countDown();

  });

  //調(diào)用第二個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread3 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺三的數(shù)據(jù)");

  countDownLatch.countDown();

  });

  thread1.start();

  thread2.start();

  thread3.start();

  countDownLatch.await();

  System.out.println("對三個(gè)電商平臺的數(shù)據(jù)進(jìn)行合并");

  long endTime = new Date().getTime();

  long time = endTime - startTime;

  System.out.println("總耗時(shí)" + time);

  }

  因?yàn)殡娚探涌谟?個(gè),所以CountDownLatch的初始值設(shè)為3,之后多線程執(zhí)行3個(gè)電商接口,每執(zhí)行完一個(gè),調(diào)用countDown()方法把初始值減一,同時(shí)主線程調(diào)用await()進(jìn)入阻塞狀態(tài),直到初始值減為0,就被重新喚醒,開始執(zhí)行數(shù)據(jù)的合并邏輯。

  執(zhí)行效果如下:

  

CountDownLatch的簡單應(yīng)用和實(shí)現(xiàn)原理


  可以看到總耗時(shí)節(jié)省了約三分之二

  CountDownLatch的實(shí)現(xiàn)原理

  先看CountDownLatch的構(gòu)造方法:

  public CountDownLatch(int count) {

  if (count < 0) throw new IllegalArgumentException("count < 0");

  this.sync = new Sync(count);

  }

  可以看到它除了做個(gè)初始值的異常判斷外,實(shí)際上是構(gòu)造了一個(gè)Sync的對象,賦值給自己的屬性sync,那么看下Sync對象的源碼:

  private static final class Sync extends AbstractQueuedSynchronizer {

  private static final long serialVersionUID = 4982264981922014374L;

  //Sync對象的構(gòu)造方法

  Sync(int count) {

  setState(count);

  }

  }

  從以上源碼可以看出,Sync對象繼承了AQS,所以調(diào)用CountDownLatch的構(gòu)造方法實(shí)際上就是調(diào)用Sync對象的構(gòu)造方法,然后通過setState(count)方法設(shè)置AQS的state值。

  public abstract class AbstractQueuedSynchronizer

  extends AbstractOwnableSynchronizer

  implements java.io.Serializable {

  private volatile int state;

  protected final void setState(int newState) {

  state = newState;

  }

  }

  再看countDown()方法的實(shí)現(xiàn):

  public void countDown() {

  sync.releaseShared(1);

  }

  實(shí)際上是調(diào)用了Sync對象的releaseShared()方法,參數(shù)固定為1

  public abstract class AbstractQueuedSynchronizer

  extends AbstractOwnableSynchronizer

  implements java.io.Serializable {

  public final boolean releaseShared(int arg) {

  if (tryReleaseShared(arg)) {

  doReleaseShared();

  return true;

  }

  return false;

  }

  //嘗試釋放共享模式的鎖

  protected boolean tryReleaseShared(int arg) {

  throw new UnsupportedOperationException();

  }

  private void doReleaseShared() {

  for (;;) {

  Node h = head;

  if (h != null && h != tail) {

  int ws = h.waitStatus;

  if (ws == Node.SIGNAL) {

  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  continue; // loop to recheck cases

  unparkSuccessor(h);

  }

  else if (ws == 0 &&

  !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  continue; // loop on failed CAS

  }

  if (h == head) // loop if head changed

  break;

  }

  }

  }

  其中方法tryReleaseShared()的具體實(shí)現(xiàn)是在CountDownLatch類,如下:

  public class CountDownLatch {

  protected boolean tryReleaseShared(int releases) {

  // Decrement count; signal when transition to zero

  for (;;) {

  int c = getState();

  if (c == 0)

  return false;

  int nextc = c-1;

  if (compareAndSetState(c, nextc))

  return nextc == 0;

  }

  }

  }

  //獲取計(jì)數(shù)器的值

  protected final int getState() {

  return state;

  }

  }

  通過循環(huán)和compareAndSetState()方法我們可以看出這是一個(gè)自旋的CAS(Compare And Set)操作,先獲取state的值,為0則返回false,否則執(zhí)行減1操作,失敗就重試,直到減為0,則返回true,之后執(zhí)行doReleaseShared()方法

  await()方法的實(shí)現(xiàn)

  public class CountDownLatch {

  public void await() throws InterruptedException {

  sync.acquireSharedInterruptibly(1);

  }

  }

  可以看到await()實(shí)際是調(diào)用Sync對象的acquireSharedInterruptibly()方法:

  public abstract class AbstractQueuedSynchronizer

  extends AbstractOwnableSynchronizer

  implements java.io.Serializable {

  public final void acquireSharedInterruptibly(int arg)

  throws InterruptedException {

  if (Thread.interrupted())

  throw new InterruptedException();

  if (tryAcquireShared(arg) < 0)

  doAcquireSharedInterruptibly(arg);

  }

  private void doAcquireSharedInterruptibly(int arg)

  throws InterruptedException {

  final Node node = addWaiter(Node.SHARED);

  boolean failed = true;

  try {

  for (;;) {

  final Node p = node.predecessor();

  if (p == head) {

  int r = tryAcquireShared(arg);

  if (r >= 0) {

  setHeadAndPropagate(node, r);

  p.next = null; // help GC

  failed = false;

  return;

  }

  }

  if (shouldParkAfterFailedAcquire(p, node) &&

  parkAndCheckInterrupt())

  throw new InterruptedException();

  }

  } finally {

  if (failed)

  cancelAcquire(node);

  }

  }

  }

  其中tryAcquireShared()方法的具體實(shí)現(xiàn)是在CountDownLatch類:

  public class CountDownLatch {

  protected int tryAcquireShared(int acquires) {

  return (getState() == 0) ? 1 : -1;

  }

  }

  通過該方法可以判斷出如果計(jì)數(shù)器值為0則返回1,否則返回-1,然后為0則會執(zhí)行之后的方法,如果繼續(xù)跟下去,最后會發(fā)現(xiàn)還是調(diào)用到了AQS的doReleaseShared()方法,所有阻塞的線程會被放開。

  CountDownLatch和.join()的使用區(qū)別

  CountDownLatch和.join()方法的作用其實(shí)很像,join()方法的使用可參考Java多線程中join()方法的使用,不過CountDownLatch使用起來會比join()方法更有靈活性。假設(shè)電商接口調(diào)用其實(shí)有兩個(gè)步驟,在每個(gè)接口的第一步獲取完數(shù)據(jù)后,還要做個(gè)數(shù)據(jù)記錄,耗時(shí)也是2s,下面給出示例代碼:

  使用join()方法:鄭州人流多少錢 http://mobile.sgyy029.com/

  public static void main(String[] args) throws Exception {

  CountDownLatch countDownLatch = new CountDownLatch(3);

  long startTime = new Date().getTime();

  //調(diào)用第一個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread1 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺一的數(shù)據(jù)");

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺一的數(shù)據(jù)后做個(gè)記錄");

  });

  //調(diào)用第二個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread2 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺二的數(shù)據(jù)");

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺二的數(shù)據(jù)后做個(gè)記錄");

  });

  //調(diào)用第三個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread3 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺三的數(shù)據(jù)");

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺三的數(shù)據(jù)后做個(gè)記錄");

  });

  thread1.start();

  thread2.start();

  thread3.start();

  thread1.join();

  thread2.join();

  thread3.join();

  System.out.println("對三個(gè)電商平臺的數(shù)據(jù)進(jìn)行合并");

  long endTime = new Date().getTime();

  long time = endTime - startTime;

  System.out.println("總耗時(shí)" + time);

  }

  耗時(shí)效果如下:

  可以發(fā)現(xiàn),使用join()方法,必須得等到每個(gè)線程都結(jié)束后才會接著執(zhí)行之后的主線程,這樣總耗時(shí)就會被數(shù)據(jù)記錄的方法拖慢,達(dá)到4311ms

  使用CountDownLatch,在獲取數(shù)據(jù)后就對初始值減1,而不是等到記錄方法完成才減1,如下:

  public static void main(String[] args) throws Exception {

  CountDownLatch countDownLatch = new CountDownLatch(3);

  long startTime = new Date().getTime();

  //調(diào)用第一個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread1 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺一的數(shù)據(jù)");

  countDownLatch.countDown();

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺一的數(shù)據(jù)后做個(gè)記錄");

  });

  //調(diào)用第二個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread2 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺二的數(shù)據(jù)");

  countDownLatch.countDown();

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺二的數(shù)據(jù)后做個(gè)記錄");

  });

  //調(diào)用第二個(gè)電商平臺的接口取得訂單數(shù),用時(shí)2s

  Thread thread3 = new Thread(() -> {

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺三的數(shù)據(jù)");

  countDownLatch.countDown();

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  }

  System.out.println("獲取電商平臺三的數(shù)據(jù)后做個(gè)記錄");

  });

  thread1.start();

  thread2.start();

  thread3.start();

  countDownLatch.await();

  System.out.println("對三個(gè)電商平臺的數(shù)據(jù)進(jìn)行合并");

  long endTime = new Date().getTime();

  long time = endTime - startTime;

  System.out.println("總耗時(shí)" + time);

  }

  耗時(shí)效果如下:

  可以發(fā)現(xiàn)耗時(shí)才2185ms

  以上就是CountDownLatch和join()方法的使用區(qū)別,相比起join()方法要等線程都執(zhí)行完才會執(zhí)行阻塞的線程,CountDownLatch就能夠靈活控制阻塞線程的執(zhí)行時(shí)機(jī),耗時(shí)可以更少。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI