溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JDK 7中的Fork或Join模式是什么

發(fā)布時間:2021-10-29 09:41:15 來源:億速云 閱讀:134 作者:柒染 欄目:編程語言

本篇文章為大家展示了JDK 7中的Fork或Join模式是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

介  紹

隨著多核芯片逐漸成為主流,大多數(shù)軟件開發(fā)人員不可避免地需要了解并行編程的知識。而同時,主流程序語言正在將越來越多的并行特性合并到標準庫或者語言本身之中。我們可以看到,JDK 在這方面同樣走在潮流的前方。在 JDK 標準版 5 中,由 Doug Lea 提供的并行框架成為了標準庫的一部分(JSR-166)。隨后,在 JDK 6 中,一些新的并行特性,例如并行 collection 框架,合并到了標準庫中(JSR-166x)。直到今天,盡管 Java SE 7 還沒有正式發(fā)布,一些并行相關的新特性已經出現(xiàn)在 JSR-166y 中:

1.Fork/Join 模式;

2.TransferQueue,它繼承自 BlockingQueue 并能在隊列滿時阻塞“生產者”;

3.ArrayTasks/ListTasks,用于并行執(zhí)行某些數(shù)組/列表相關任務的類;

4.IntTasks/LongTasks/DoubleTasks,用于并行處理數(shù)字類型數(shù)組的工具類,提供了排序、查找、求和、求最小值、求最大值等功能;

其中,對 Fork/Join 模式的支持可能是對開發(fā)并行軟件來說最通用的新特性。在 JSR-166y 中,Doug Lea 實現(xiàn)ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 時就大量的用到了 Fork/Join 模式。讀者還需要注意一點,因為 JDK 7 還沒有正式發(fā)布,因此本文涉及到的功能和發(fā)布版本有可能不一樣。

Fork/Join 模式有自己的適用范圍。如果一個應用能被分解成多個子任務,并且組合多個子任務的結果就能夠獲得最終的答案,那么這個應用就適合用 Fork/Join 模式來解決。圖 1 給出了一個 Fork/Join 模式的示意圖,位于圖上部的 Task 依賴于位于其下的 Task 的執(zhí)行,只有當所有的子任務都完成之后,調用者才能獲得 Task 0 的返回結果。

圖 1. Fork/Join 模式示意圖

JDK 7中的Fork或Join模式是什么

可以說,F(xiàn)ork/Join 模式能夠解決很多種類的并行問題。通過使用 Doug Lea 提供的 Fork/Join 框架,軟件開發(fā)人員只需要關注任務的劃分和中間結果的組合就能充分利用并行平臺的優(yōu)良性能。其他和并行相關的諸多難于處理的問題,例如負載平衡、同步等,都可以由框架采用統(tǒng)一的方式解決。這樣,我們就能夠輕松地獲得并行的好處而避免了并行編程的困難且容易出錯的缺點。

使用 Fork/Join 模式

在開始嘗試 Fork/Join 模式之前,我們需要從 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下載 JSR-166y 的源代碼,并且我們還需要安裝最新版本的 JDK 6(下載網址請參閱 參考資源)。Fork/Join 模式的使用方式非常直觀。首先,我們需要編寫一個 ForkJoinTask 來完成子任務的分割、中間結果的合并等工作。隨后,我們將這個 ForkJoinTask 交給 ForkJoinPool 來完成應用的執(zhí)行。

通常我們并不直接繼承 ForkJoinTask,它包含了太多的抽象方法。針對特定的問題,我們可以選擇 ForkJoinTask 的不同子類來完成任務。RecursiveAction 是 ForkJoinTask 的一個子類,它代表了一類最簡單的 ForkJoinTask:不需要返回值,當子任務都執(zhí)行完畢之后,不需要進行中間結果的組合。如果我們從 RecursiveAction 開始繼承,那么我們只需要重載 protected void compute() 方法。下面,我們來看看怎么為快速排序算法建立一個 ForkJoinTask 的子類:

清單 1. ForkJoinTask 的子類

classSortTaskextendsRecursiveAction{  finallong[]array;  finalintlo;  finalinthi;  privateintTHRESHOLD=30;   publicSortTask(long[]array){  this.array=array;  this.lo=0;  this.hi=array.length-1;  }   publicSortTask(long[]array,intlo,inthi){  this.array=array;  this.lo=lo;  this.hi=hi;  }   protectedvoidcompute(){  if(hi-lo<THRESHOLD)  sequentiallySort(array,lo,hi);  else{  intpivot=partition(array,lo,hi);  coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,  pivot+1,hi));  }  }   privateintpartition(long[]array,intlo,inthi){  longx=array[hi];  inti=lo-1;  for(intj=lo;j<hi;j++){  if(array[j]<=x){  i++;  swap(array,i,j);  }  }  swap(array,i+1,hi);  returni+1;  }   privatevoidswap(long[]array,inti,intj){  if(i!=j){  longtemp=array[i];  array[i]=array[j];  array[j]=temp;  }  }   privatevoidsequentiallySort(long[]array,intlo,inthi){  Arrays.sort(array,lo,hi+1);  }  }

在清單1中,SortTask 首先通過 partition() 方法將數(shù)組分成兩個部分。隨后,兩個子任務將被生成并分別排序數(shù)組的兩個部分。當子任務足夠小時,再將其分割為更小的任務反而引起性能的降低。因此,這里我們使用一個 THRESHOLD,限定在子任務規(guī)模較小時,使用直接排序,而不是再將其分割成為更小的任務。其中,我們用到了 RecursiveAction 提供的方法 coInvoke()。它表示:啟動所有的任務,并在所有任務都正常結束后返回。如果其中一個任務出現(xiàn)異常,則其它所有的任務都取消。coInvoke() 的參數(shù)還可以是任務的數(shù)組。

現(xiàn)在剩下的工作就是將 SortTask 提交到 ForkJoinPool 了。ForkJoinPool() 默認建立具有與 CPU 可使用線程數(shù)相等線程個數(shù)的線程池。我們在一個 JUnit 的 test 方法中將 SortTask 提交給一個新建的 ForkJoinPool:

清單 2. 新建的 ForkJoinPool

@Test publicvoidtestSort()throwsException{  ForkJoinTasksort=newSortTask(array);  ForkJoinPoolfjpool=newForkJoinPool();  fjpool.submit(sort);  fjpool.shutdown();   fjpool.awaitTermination(30,TimeUnit.SECONDS);   assertTrue(checkSorted(array));  }

在上面的代碼中,我們用到了 ForkJoinPool 提供的如下函數(shù):

1. submit():將 ForkJoinTask 類的對象提交給 ForkJoinPool,F(xiàn)orkJoinPool 將立刻開始執(zhí)行 ForkJoinTask。

2. shutdown():執(zhí)行此方法之后,F(xiàn)orkJoinPool 不再接受新的任務,但是已經提交的任務可以繼續(xù)執(zhí)行。如果希望立刻停止所有的任務,可以嘗試 shutdownNow() 方法。

3. awaitTermination():阻塞當前線程直到 ForkJoinPool 中所有的任務都執(zhí)行結束。

并行快速排序的完整代碼如下所示:

清單 3. 并行快速排序的完整代碼

packagetests;  importstaticorg.junit.Assert.*;  importjava.util.Arrays;  importjava.util.Random;  importjava.util.concurrent.TimeUnit;  importjsr166y.forkjoin.ForkJoinPool;  importjsr166y.forkjoin.ForkJoinTask;  importjsr166y.forkjoin.RecursiveAction;  importorg.junit.Before;  importorg.junit.Test;  classSortTaskextendsRecursiveAction{  finallong[]array;  finalintlo;  finalinthi;  privateintTHRESHOLD=0;//Fordemoonly  publicSortTask(long[]array){  this.array=array;  this.lo=0;  this.hi=array.length-1;  }  publicSortTask(long[]array,intlo,inthi){  this.array=array;  this.lo=lo;  this.hi=hi;  }  protectedvoidcompute(){  if(hi-lo<THRESHOLD)  sequentiallySort(array,lo,hi);  else{  intpivot=partition(array,lo,hi);  System.out.println(" pivot="+pivot+",low="+lo+",high="+hi);  System.out.println("array"+Arrays.toString(array));  coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,  pivot+1,hi));  }  }  privateintpartition(long[]array,intlo,inthi){  longx=array[hi];  inti=lo-1;  for(intj=lo;j<hi;j++){  if(array[j]<=x){  i++;  swap(array,i,j);  }  }  swap(array,i+1,hi);  returni+1;  }  privatevoidswap(long[]array,inti,intj){  if(i!=j){  longtemp=array[i];  array[i]=array[j];  array[j]=temp;  }  }  privatevoidsequentiallySort(long[]array,intlo,inthi){  Arrays.sort(array,lo,hi+1);  }  }  publicclassTestForkJoinSimple{  privatestaticfinalintNARRAY=16;//Fordemoonly  long[]array=newlong[NARRAY];  Randomrand=newRandom();  @Before publicvoidsetUp(){  for(inti=0;i<array.length;i++){  array[i]=rand.nextLong()%100;//Fordemoonly  }  System.out.println("InitialArray:"+Arrays.toString(array));  }  @Test publicvoidtestSort()throwsException{  ForkJoinTasksort=newSortTask(array);  ForkJoinPoolfjpool=newForkJoinPool();  fjpool.submit(sort);  fjpool.shutdown();  fjpool.awaitTermination(30,TimeUnit.SECONDS);  assertTrue(checkSorted(array));  }  booleancheckSorted(long[]a){  for(inti=0;i<a.length-1;i++){  if(a[i]>(a[i+1])){  returnfalse;  }  }  returntrue;  }  }

運行以上代碼,我們可以得到以下結果:

InitialArray:[46,-12,74,-67,76,-13,-91,-96]   pivot=0,low=0,high=7 array[-96,-12,74,-67,76,-13,-91,46]   pivot=5,low=1,high=7 array[-96,-12,-67,-13,-91,46,76,74]   pivot=1,low=1,high=4 array[-96,-91,-67,-13,-12,46,74,76]   pivot=4,low=2,high=4 array[-96,-91,-67,-13,-12,46,74,76]   pivot=3,low=2,high=3 array[-96,-91,-67,-13,-12,46,74,76]   pivot=2,low=2,high=2 array[-96,-91,-67,-13,-12,46,74,76]   pivot=6,low=6,high=7 array[-96,-91,-67,-13,-12,46,74,76]   pivot=7,low=7,high=7 array[-96,-91,-67,-13,-12,46,74,76]

Fork/Join 模式高級特性

使用 RecursiveTask

除了 RecursiveAction,F(xiàn)ork/Join 框架還提供了其他 ForkJoinTask 子類:帶有返回值的 RecursiveTask,使用 finish() 方法顯式中止的 AsyncAction 和 LinkedAsyncAction,以及可使用 TaskBarrier 為每個任務設置不同中止條件的 CyclicAction。

從 RecursiveTask 繼承的子類同樣需要重載 protected void compute() 方法。與 RecursiveAction 稍有不同的是,它可使用泛型指定一個返回值的類型。下面,我們來看看如何使用 RecursiveTask 的子類。

清單 4. RecursiveTask 的子類

classFibonacciextendsRecursiveTask<Integer>{  finalintn;   Fibonacci(intn){  this.n=n;  }   privateintcompute(intsmall){  finalint[]results={1,1,2,3,5,8,13,21,34,55,89};  returnresults[small];  }   publicIntegercompute(){  if(n<=10){  returncompute(n);  }  Fibonaccif1=newFibonacci(n-1);  Fibonaccif2=newFibonacci(n-2);  f1.fork();  f2.fork();  returnf1.join()+f2.join();  }  }

在清單4 中,F(xiàn)ibonacci 的返回值為 Integer 類型。其 compute() 函數(shù)首先建立兩個子任務,啟動子任務執(zhí)行,阻塞以等待子任務的結果返回,相加后得到最終結果。同樣,當子任務足夠小時,通過查表得到其結果,以減小因過多地分割任務引起的性能降低。其中,我們用到了 RecursiveTask 提供的方法 fork() 和 join()。它們分別表示:子任務的異步執(zhí)行和阻塞等待結果完成。

現(xiàn)在剩下的工作就是將 Fibonacci 提交到 ForkJoinPool 了,我們在一個 JUnit 的 test 方法中作了如下處理:

清單 5. 將 Fibonacci 提交到 ForkJoinPool

@Test publicvoidtestFibonacci()throwsInterruptedException,ExecutionException{  ForkJoinTask<Integer>fjt=newFibonacci(45);  ForkJoinPoolfjpool=newForkJoinPool();  Future<Integer>result=fjpool.submit(fjt);   //dosomething  System.out.println(result.get());  }

使用 CyclicAction 來處理循環(huán)任務

CyclicAction 的用法稍微復雜一些。如果一個復雜任務需要幾個線程協(xié)作完成,并且線程之間需要在某個點等待所有其他線程到達,那么我們就能方便的用 CyclicAction 和 TaskBarrier 來完成。圖 2 描述了使用 CyclicAction 和 TaskBarrier 的一個典型場景。

圖 2. 使用 CyclicAction 和 TaskBarrier 執(zhí)行多線程任務

JDK 7中的Fork或Join模式是什么

繼承自 CyclicAction 的子類需要 TaskBarrier 為每個任務設置不同的中止條件。從 CyclicAction 繼承的子類需要重載 protected void compute() 方法,定義在 barrier 的每個步驟需要執(zhí)行的動作。compute() 方法將被反復執(zhí)行直到 barrier 的 isTerminated() 方法返回 True。TaskBarrier 的行為類似于 CyclicBarrier。下面,我們來看看如何使用 CyclicAction 的子類。

清單 6. 使用 CyclicAction 的子類

classConcurrentPrintextendsRecursiveAction{  protectedvoidcompute(){  TaskBarrierb=newTaskBarrier(){  protectedbooleanterminate(intcycle,intregisteredParties){  System.out.println("Cycleis"+cycle+";" +registeredParties+"parties");  returncycle>=10;  }  };  intn=3;  CyclicAction[]actions=newCyclicAction[n];  for(inti=0;i<n;++i){  finalintindex=i;  actions[i]=newCyclicAction(b){  protectedvoidcompute(){  System.out.println("I'mworking"+getCycle()+"" +index);  try{  Thread.sleep(500);  }catch(InterruptedExceptione){  e.printStackTrace();  }  }  };  }  for(inti=0;i<n;++i)  actions[i].fork();  for(inti=0;i<n;++i)  actions[i].join();  }  }

在清單6中,CyclicAction[] 數(shù)組建立了三個任務,打印各自的工作次數(shù)和序號。而在 b.terminate() 方法中,我們設置的中止條件表示重復 10 次計算后中止?,F(xiàn)在剩下的工作就是將 ConcurrentPrint 提交到 ForkJoinPool 了。我們可以在 ForkJoinPool 的構造函數(shù)中指定需要的線程數(shù)目,例如 ForkJoinPool(4) 就表明線程池包含 4 個線程。我們在一個 JUnit 的 test 方法中運行 ConcurrentPrint 的這個循環(huán)任務:

清單 7. 運行 ConcurrentPrint 循環(huán)任務

@Test publicvoidtestBarrier()throwsInterruptedException,ExecutionException{  ForkJoinTaskfjt=newConcurrentPrint();  ForkJoinPoolfjpool=newForkJoinPool(4);  fjpool.submit(fjt);  fjpool.shutdown();  }

RecursiveTask 和 CyclicAction 兩個例子的完整代碼如下所示:

清單 8. RecursiveTask 和 CyclicAction 兩個例子的完整代碼

packagetests;   importjava.util.concurrent.ExecutionException;  importjava.util.concurrent.Future;   importjsr166y.forkjoin.CyclicAction;  importjsr166y.forkjoin.ForkJoinPool;  importjsr166y.forkjoin.ForkJoinTask;  importjsr166y.forkjoin.RecursiveAction;  importjsr166y.forkjoin.RecursiveTask;  importjsr166y.forkjoin.TaskBarrier;   importorg.junit.Test;   classFibonacciextendsRecursiveTask<Integer>{  finalintn;   Fibonacci(intn){  this.n=n;  }   privateintcompute(intsmall){  finalint[]results={1,1,2,3,5,8,13,21,34,55,89};  returnresults[small];  }   publicIntegercompute(){  if(n<=10){  returncompute(n);  }  Fibonaccif1=newFibonacci(n-1);  Fibonaccif2=newFibonacci(n-2);  System.out.println("forknewthreadfor"+(n-1));  f1.fork();  System.out.println("forknewthreadfor"+(n-2));  f2.fork();  returnf1.join()+f2.join();  }  }   classConcurrentPrintextendsRecursiveAction{  protectedvoidcompute(){  TaskBarrierb=newTaskBarrier(){  protectedbooleanterminate(intcycle,intregisteredParties){  System.out.println("Cycleis"+cycle+";" +registeredParties+"parties");  returncycle>=10;  }  };  intn=3;  CyclicAction[]actions=newCyclicAction[n];  for(inti=0;i<n;++i){  finalintindex=i;  actions[i]=newCyclicAction(b){  protectedvoidcompute(){  System.out.println("I'mworking"+getCycle()+"" +index);  try{  Thread.sleep(500);  }catch(InterruptedExceptione){  e.printStackTrace();  }  }  };  }  for(inti=0;i<n;++i)  actions[i].fork();  for(inti=0;i<n;++i)  actions[i].join();  }  }   publicclassTestForkJoin{  @Test publicvoidtestBarrier()throwsInterruptedException,ExecutionException{  System.out.println(" testingTaskBarrier...");  ForkJoinTaskfjt=newConcurrentPrint();  ForkJoinPoolfjpool=newForkJoinPool(4);  fjpool.submit(fjt);  fjpool.shutdown();  }   @Test publicvoidtestFibonacci()throwsInterruptedException,ExecutionException{  System.out.println(" testingFibonacci...");  finalintnum=14;//Fordemoonly  ForkJoinTask<Integer>fjt=newFibonacci(num);  ForkJoinPoolfjpool=newForkJoinPool();  Future<Integer>result=fjpool.submit(fjt);   //dosomething  System.out.println("Fibonacci("+num+")="+result.get());  }  }

運行以上代碼,我們可以得到以下結果:

testingTaskBarrier...  I'mworking02  I'mworking00  I'mworking01  Cycleis0;3parties  I'mworking12  I'mworking10  I'mworking11  Cycleis1;3parties  I'mworking20  I'mworking21  I'mworking22  Cycleis2;3parties  I'mworking30  I'mworking32  I'mworking31  Cycleis3;3parties  I'mworking42  I'mworking40  I'mworking41  Cycleis4;3parties  I'mworking51  I'mworking50  I'mworking52  Cycleis5;3parties  I'mworking60  I'mworking62  I'mworking61  Cycleis6;3parties  I'mworking72  I'mworking70  I'mworking71  Cycleis7;3parties  I'mworking81  I'mworking80  I'mworking82  Cycleis8;3parties  I'mworking90  I'mworking92   testingFibonacci...  forknewthreadfor13  forknewthreadfor12  forknewthreadfor11  forknewthreadfor10  forknewthreadfor12  forknewthreadfor11  forknewthreadfor10  forknewthreadfor9  forknewthreadfor10  forknewthreadfor9  forknewthreadfor11  forknewthreadfor10  forknewthreadfor10  forknewthreadfor9  Fibonacci(14)=610

結  論

從以上的例子中可以看到,通過使用 Fork/Join 模式,軟件開發(fā)人員能夠方便地利用多核平臺的計算能力。盡管還沒有做到對軟件開發(fā)人員完全透明,F(xiàn)ork/Join 模式已經極大地簡化了編寫并發(fā)程序的瑣碎工作。對于符合 Fork/Join 模式的應用,軟件開發(fā)人員不再需要處理各種并行相關事務,例如同步、通信等,以難以調試而聞名的死鎖和 data race 等錯誤也就不會出現(xiàn),提升了思考問題的層次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,僅僅關注如何劃分任務和組合中間結果,將剩下的事情丟給 Fork/Join 框架。

在實際工作中利用 Fork/Join 模式,可以充分享受多核平臺為應用帶來的免費午餐。

上述內容就是JDK 7中的Fork或Join模式是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI