溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java并發(fā)系列之CyclicBarrier源碼分析

發(fā)布時間:2020-10-14 00:49:38 來源:腳本之家 閱讀:160 作者:勞夫子 欄目:編程語言

現(xiàn)實生活中我們經(jīng)常會遇到這樣的情景,在進行某個活動前需要等待人全部都齊了才開始。例如吃飯時要等全家人都上座了才動筷子,旅游時要等全部人都到齊了才出發(fā),比賽時要等運動員都上場后才開始。在JUC包中為我們提供了一個同步工具類能夠很好的模擬這類場景,它就是CyclicBarrier類。利用CyclicBarrier類可以實現(xiàn)一組線程相互等待,當所有線程都到達某個屏障點后再進行后續(xù)的操作。下圖演示了這一過程。

Java并發(fā)系列之CyclicBarrier源碼分析

在CyclicBarrier類的內(nèi)部有一個計數(shù)器,每個線程在到達屏障點的時候都會調(diào)用await方法將自己阻塞,此時計數(shù)器會減1,當計數(shù)器減為0的時候所有因調(diào)用await方法而被阻塞的線程將被喚醒。這就是實現(xiàn)一組線程相互等待的原理,下面我們先看看CyclicBarrier有哪些成員變量。

//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
//線程攔截器
private final Condition trip = lock.newCondition();
//每次攔截的線程數(shù)
private final int parties;
//換代前執(zhí)行的任務
private final Runnable barrierCommand;
//表示柵欄的當前代
private Generation generation = new Generation();
//計數(shù)器
private int count;

//靜態(tài)內(nèi)部類Generation
private static class Generation {
  boolean broken = false;
}

上面貼出了CyclicBarrier所有的成員變量,可以看到CyclicBarrier內(nèi)部是通過條件隊列trip來對線程進行阻塞的,并且其內(nèi)部維護了兩個int型的變量parties和count,parties表示每次攔截的線程數(shù),該值在構造時進行賦值。count是內(nèi)部計數(shù)器,它的初始值和parties相同,以后隨著每次await方法的調(diào)用而減1,直到減為0就將所有線程喚醒。CyclicBarrier有一個靜態(tài)內(nèi)部類Generation,該類的對象代表柵欄的當前代,就像玩游戲時代表的本局游戲,利用它可以實現(xiàn)循環(huán)等待。barrierCommand表示換代前執(zhí)行的任務,當count減為0時表示本局游戲結束,需要轉(zhuǎn)到下一局。在轉(zhuǎn)到下一局游戲之前會將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過指定barrierCommand來執(zhí)行自己的任務。接下來我們看看它的構造器。

//構造器1
public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}

//構造器2
public CyclicBarrier(int parties) {
  this(parties, null);
}

CyclicBarrier有兩個構造器,其中構造器1是它的核心構造器,在這里你可以指定本局游戲的參與者數(shù)量(要攔截的線程數(shù))以及本局結束時要執(zhí)行的任務,還可以看到計數(shù)器count的初始值被設置為parties。CyclicBarrier類最主要的功能就是使先到達屏障點的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。

//非定時等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}

//定時等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

可以看到不管是定時等待還是非定時等待,它們都調(diào)用了dowait方法,只不過是傳入的參數(shù)不同而已。下面我們就來看看dowait方法都做了些什么。

//核心等待方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    //檢查當前柵欄是否被打翻
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    //檢查當前線程是否被中斷
    if (Thread.interrupted()) {
      //如果當前線程被中斷會做以下三件事
      //1.打翻當前柵欄
      //2.喚醒攔截的所有線程
      //3.拋出中斷異常
      breakBarrier();
      throw new InterruptedException();
    }
    //每次都將計數(shù)器的值減1
    int index = --count;
    //計數(shù)器的值減為0則需喚醒所有線程并轉(zhuǎn)換到下一代
    if (index == 0) {
      boolean ranAction = false;
      try {
        //喚醒所有線程前先執(zhí)行指定的任務
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        //喚醒所有線程并轉(zhuǎn)到下一代
        nextGeneration();
        return 0;
      } finally {
        //確保在任務未成功執(zhí)行時能將所有線程喚醒
        if (!ranAction) {
          breakBarrier();
        }
      }
    }

    //如果計數(shù)器不為0則執(zhí)行此循環(huán)
    for (;;) {
      try {
        //根據(jù)傳入的參數(shù)來決定是定時等待還是非定時等待
        if (!timed) {
          trip.await();
        }else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        //若當前線程在等待期間被中斷則打翻柵欄喚醒其他線程
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          //若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待, 則直接調(diào)用中斷操作
          Thread.currentThread().interrupt();
        }
      }
      //如果線程因為打翻柵欄操作而被喚醒則拋出異常
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      //如果線程因為換代操作而被喚醒則返回計數(shù)器的值
      if (g != generation) {
        return index;
      }
      //如果線程因為時間到了而被喚醒則打翻柵欄并拋出異常
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

上面貼出的代碼中注釋都比較詳細,我們只挑一些重要的來講??梢钥吹皆赿owait方法中每次都將count減1,減完后立馬進行判斷看看是否等于0,如果等于0的話就會先去執(zhí)行之前指定好的任務,執(zhí)行完之后再調(diào)用nextGeneration方法將柵欄轉(zhuǎn)到下一代,在該方法中會將所有線程喚醒,將計數(shù)器的值重新設為parties,最后會重新設置柵欄代次,在執(zhí)行完nextGeneration方法之后就意味著游戲進入下一局。如果計數(shù)器此時還不等于0的話就進入for循環(huán),根據(jù)參數(shù)來決定是調(diào)用trip.awaitNanos(nanos)還是trip.await()方法,這兩方法對應著定時和非定時等待。如果在等待過程中當前線程被中斷就會執(zhí)行breakBarrier方法,該方法叫做打破柵欄,意味著游戲在中途被掐斷,設置generation的broken狀態(tài)為true并喚醒所有線程。同時這也說明在等待過程中有一個線程被中斷整盤游戲就結束,所有之前被阻塞的線程都會被喚醒。線程醒來后會執(zhí)行下面三個判斷,看看是否因為調(diào)用breakBarrier方法而被喚醒,如果是則拋出異常;看看是否是正常的換代操作而被喚醒,如果是則返回計數(shù)器的值;看看是否因為超時而被喚醒,如果是的話就調(diào)用breakBarrier打破柵欄并拋出異常。這里還需要注意的是,如果其中有一個線程因為等待超時而退出,那么整盤游戲也會結束,其他線程都會被喚醒。下面貼出nextGeneration方法和breakBarrier方法的具體代碼。

//切換柵欄到下一代
private void nextGeneration() {
  //喚醒條件隊列所有線程
  trip.signalAll();
  //設置計數(shù)器的值為需要攔截的線程數(shù)
  count = parties;
  //重新設置柵欄代次
  generation = new Generation();
}

//打翻當前柵欄
private void breakBarrier() {
  //將當前柵欄狀態(tài)設置為打翻
  generation.broken = true;
  //設置計數(shù)器的值為需要攔截的線程數(shù)
  count = parties;
  //喚醒所有線程
  trip.signalAll();
}

上面我們已經(jīng)通過源碼將CyclicBarrier的原理基本都講清楚了,下面我們就通過一個賽馬的例子來深入掌握它的使用。

class Horse implements Runnable {
  
  private static int counter = 0;
  private final int id = counter++;
  private int strides = 0;
  private static Random rand = new Random(47);
  private static CyclicBarrier barrier;
  
  public Horse(CyclicBarrier b) { barrier = b; }
  
  @Override
  public void run() {
    try {
      while(!Thread.interrupted()) {
        synchronized(this) {
          //賽馬每次隨機跑幾步
          strides += rand.nextInt(3);
        }
        barrier.await();
      }
    } catch(Exception e) {
      e.printStackTrace();
    }
  }
  
  public String tracks() {
    StringBuilder s = new StringBuilder();
    for(int i = 0; i < getStrides(); i++) {
      s.append("*");
    }
    s.append(id);
    return s.toString();
  }
  
  public synchronized int getStrides() { return strides; }
  public String toString() { return "Horse " + id + " "; }
  
}

public class HorseRace implements Runnable {
  
  private static final int FINISH_LINE = 75;
  private static List<Horse> horses = new ArrayList<Horse>();
  private static ExecutorService exec = Executors.newCachedThreadPool();
  
  @Override
  public void run() {
    StringBuilder s = new StringBuilder();
    //打印賽道邊界
    for(int i = 0; i < FINISH_LINE; i++) {
      s.append("=");
    }
    System.out.println(s);
    //打印賽馬軌跡
    for(Horse horse : horses) {
      System.out.println(horse.tracks());
    }
    //判斷是否結束
    for(Horse horse : horses) {
      if(horse.getStrides() >= FINISH_LINE) {
        System.out.println(horse + "won!");
        exec.shutdownNow();
        return;
      }
    }
    //休息指定時間再到下一輪
    try {
      TimeUnit.MILLISECONDS.sleep(200);
    } catch(InterruptedException e) {
      System.out.println("barrier-action sleep interrupted");
    }
  }
  
  public static void main(String[] args) {
    CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
    for(int i = 0; i < 7; i++) {
      Horse horse = new Horse(barrier);
      horses.add(horse);
      exec.execute(horse);
    }
  }
  
}

該賽馬程序主要是通過在控制臺不停的打印各賽馬的當前軌跡,以此達到動態(tài)顯示的效果。整場比賽有多個輪次,每一輪次各個賽馬都會隨機走上幾步然后調(diào)用await方法進行等待,當所有賽馬走完一輪的時候?qū)?zhí)行任務將所有賽馬的當前軌跡打印到控制臺上。這樣每一輪下來各賽馬的軌跡都在不停的增長,當其中某個賽馬的軌跡最先增長到指定的值的時候?qū)Y束整場比賽,該賽馬成為整場比賽的勝利者!程序的運行結果如下:

Java并發(fā)系列之CyclicBarrier源碼分析

至此我們難免會將CyclicBarrier與CountDownLatch進行一番比較。這兩個類都可以實現(xiàn)一組線程在到達某個條件之前進行等待,它們內(nèi)部都有一個計數(shù)器,當計數(shù)器的值不斷的減為0的時候所有阻塞的線程將會被喚醒。有區(qū)別的是CyclicBarrier的計數(shù)器由自己控制,而CountDownLatch的計數(shù)器則由使用者來控制,在CyclicBarrier中線程調(diào)用await方法不僅會將自己阻塞還會將計數(shù)器減1,而在CountDownLatch中線程調(diào)用await方法只是將自己阻塞而不會減少計數(shù)器的值。另外,CountDownLatch只能攔截一輪,而CyclicBarrier可以實現(xiàn)循環(huán)攔截。一般來說用CyclicBarrier可以實現(xiàn)CountDownLatch的功能,而反之則不能,例如上面的賽馬程序就只能使用CyclicBarrier來實現(xiàn)??傊?,這兩個類的異同點大致如此,至于何時使用CyclicBarrier,何時使用CountDownLatch,還需要讀者自己去拿捏。

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持億速云。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI