溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么理解Java 8并行流

發(fā)布時(shí)間:2021-10-29 17:30:19 來源:億速云 閱讀:125 作者:iii 欄目:編程語言

本篇內(nèi)容主要講解“怎么理解Java 8并行流”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“怎么理解Java 8并行流”吧!

并行流

認(rèn)識(shí)和開啟并行流

什么是并行流: 并行流就是將一個(gè)流的內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)不同數(shù)據(jù)塊的流。例如有這么一個(gè)需求:

有一個(gè) List 集合,而 list 中每個(gè) apple 對(duì)象只有重量,我們也知道 apple 的單價(jià)是 5元/kg,現(xiàn)在需要計(jì)算出每個(gè) apple 的單價(jià),傳統(tǒng)的方式是這樣:

List<Apple> appleList = new ArrayList<>(); // 假裝數(shù)據(jù)是從庫里查出來的  for (Apple apple : appleList) {      apple.setPrice(5.0 * apple.getWeight() / 1000);  }

我們通過迭代器遍歷 list 中的 apple 對(duì)象,完成了每個(gè) apple 價(jià)格的計(jì)算。而這個(gè)算法的時(shí)間復(fù)雜度是 O(list.size()) 隨著 list 大小的增加,耗時(shí)也會(huì)跟著線性增加。并行流可以大大縮短這個(gè)時(shí)間。

并行流處理該集合的方法如下:

appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));

和普通流的區(qū)別是這里調(diào)用的 parallelStream() 方法。當(dāng)然也可以通過 stream.parallel() 將普通流轉(zhuǎn)換成并行流。推薦看下:Java 8 創(chuàng)建 Stream 的 10 種方式,更多可以關(guān)注Java技術(shù)棧公眾號(hào)回復(fù)java獲取系列教程。

并行流也能通過 sequential() 方法轉(zhuǎn)換為順序流,但要注意:流的并行和順序轉(zhuǎn)換不會(huì)對(duì)流本身做任何實(shí)際的變化,僅僅是打了個(gè)標(biāo)記而已。并且在一條流水線上對(duì)流進(jìn)行多次并行 / 順序的轉(zhuǎn)換,生效的是最后一次的方法調(diào)用

并行流如此方便,它的線程從那里來呢?有多少個(gè)?怎么配置呢?

并行流內(nèi)部使用了默認(rèn)的 ForkJoinPool 線程池。默認(rèn)的線程數(shù)量就是處理器的核心數(shù),而配置系統(tǒng)核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變線程池大小。不過該值是全局變量。

改變他會(huì)影響所有并行流。目前還無法為每個(gè)流配置專屬的線程數(shù)。一般來說采用處理器核心數(shù)是不錯(cuò)的選擇

測試并行流的性能

為了更容易的測試性能,我們?cè)诿看斡?jì)算完蘋果價(jià)格后,讓線程睡 1s,表示在這期間執(zhí)行了其他 IO 相關(guān)的操作,并輸出程序執(zhí)行耗時(shí),順序執(zhí)行的耗時(shí):

public static void main(String[] args) throws InterruptedException {      List<Apple> appleList = initAppleList();      Date begin = new Date();      for (Apple apple : appleList) {          apple.setPrice(5.0 * apple.getWeight() / 1000);          Thread.sleep(1000);      }      Date end = new Date();      log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);  }

怎么理解Java 8并行流

并行版本

List<Apple> appleList = initAppleList();  Date begin = new Date();  appleList.parallelStream()  .forEach(apple ->           {               apple.setPrice(5.0 * apple.getWeight() / 1000);               try {                   Thread.sleep(1000);               } catch (InterruptedException e) {                   e.printStackTrace();               }           }          );  Date end = new Date();  log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);

耗時(shí)情況

怎么理解Java 8并行流

跟我們的預(yù)測一致,我的電腦是 四核I5 處理器,開啟并行后四個(gè)處理器每人執(zhí)行一個(gè)線程,最后 1s 完成了任務(wù)!

并行流可以隨便用嗎?

可拆分性影響流的速度

通過上面的測試,有的人會(huì)輕易得到一個(gè)結(jié)論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內(nèi)部迭代來實(shí)現(xiàn)了。

事實(shí)真的是這樣嗎?并行流真的如此完美嗎?答案當(dāng)然是否定的。大家可以復(fù)制下面的代碼,在自己的電腦上測試。測試完后可以發(fā)現(xiàn),并行流并不總是最快的處理方式。

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2.  對(duì)于 iterate 方法來處理的前 n 個(gè)數(shù)字來說,不管并行與否,它總是慢于循環(huán)的,非并行版本可以理解為流化操作沒有循環(huán)更偏向底層導(dǎo)致的慢??刹⑿邪姹臼菫槭裁绰兀窟@里有兩個(gè)需要注意的點(diǎn):

    2.  iterate 生成的是裝箱的對(duì)象,必須拆箱成數(shù)字才能求和

    3.  我們很難把 iterate 分成多個(gè)獨(dú)立的塊來并行執(zhí)行

        這個(gè)問題很有意思,我們必須意識(shí)到某些流操作比其他操作更容易并行化。對(duì)于 iterate 來說,每次應(yīng)用這個(gè)函數(shù)都要依賴于前一次應(yīng)用的結(jié)果。因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因?yàn)椴⑿谢俅卧黾恿碎_支。

    4.  而對(duì)于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個(gè)痛點(diǎn)了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的

package lambdasinaction.chap7;  import java.util.stream.*;  public class ParallelStreams {      public static long iterativeSum(long n) {          long result = 0;          for (long i = 0; i <= n; i++) {              result += i;          }          return result;      }      public static long sequentialSum(long n) {          return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();      }      public static long parallelSum(long n) {          return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();      }      public static long rangedSum(long n) {          return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();      }      public static long parallelRangedSum(long n) {          return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();      }  }  package lambdasinaction.chap7;  import java.util.concurrent.*;  import java.util.function.*;  public class ParallelStreamsHarness {      public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();      public static void main(String[] args) {          System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");          System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");          System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );          System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");          System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );      }      public static <T, R> long measurePerf(Function<T, R> f, T input) {          long fastest = Long.MAX_VALUE;          for (int i = 0; i < 10; i++) {              long start = System.nanoTime();              R result = f.apply(input);              long duration = (System.nanoTime() - start) / 1_000_000;              System.out.println("Result: " + result);              if (duration < fastest) fastest = duration;          }          return fastest;      }  }

共享變量修改的問題

并行流雖然輕易的實(shí)現(xiàn)了多線程,但是仍未解決多線程中共享變量的修改問題。下面代碼中存在共享變量 total,分別使用順序流和并行流計(jì)算前n個(gè)自然數(shù)的和

public static long sideEffectSum(long n) {      Accumulator accumulator = new Accumulator();      LongStream.rangeClosed(1, n).forEach(accumulator::add);      return accumulator.total;  }  public static long sideEffectParallelSum(long n) {      Accumulator accumulator = new Accumulator();      LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);      return accumulator.total;  }  public static class Accumulator {      private long total = 0;      public void add(long value) {          total += value;      }  }

順序執(zhí)行每次輸出的結(jié)果都是:50000005000000,而并行執(zhí)行的結(jié)果卻五花八門了。這是因?yàn)槊看卧L問 totle 都會(huì)存在數(shù)據(jù)競爭,關(guān)于數(shù)據(jù)競爭的原因,大家可以看看關(guān)于 volatile 的博客。因此當(dāng)代碼中存在修改共享變量的操作時(shí),是不建議使用并行流的。

并行流的使用注意

在并行流的使用上有下面幾點(diǎn)需要注意:

  •  盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來處理數(shù)字,以避免頻繁拆裝箱帶來的額外開銷

  •  要考慮流的操作流水線的總計(jì)算成本,假設(shè) N 是要操作的任務(wù)總數(shù),Q 是每次操作的時(shí)間。N * Q 就是操作的總時(shí)間,Q 值越大就意味著使用并行流帶來收益的可能性越大

例如:前端傳來幾種類型的資源,需要存儲(chǔ)到數(shù)據(jù)庫。每種資源對(duì)應(yīng)不同的表。我們可以視作類型數(shù)為 N,存儲(chǔ)數(shù)據(jù)庫的網(wǎng)絡(luò)耗時(shí) + 插入操作耗時(shí)為 Q。一般情況下網(wǎng)絡(luò)耗時(shí)都是比較大的。因此該操作就比較適合并行處理。當(dāng)然當(dāng)類型數(shù)目大于核心數(shù)時(shí),該操作的性能提升就會(huì)打一定的折扣了。更好的優(yōu)化方法在日后的博客會(huì)為大家奉上

  •  對(duì)于較少的數(shù)據(jù)量,不建議使用并行流

  •  容易拆分成塊的流數(shù)據(jù),建議使用并行流

以下是一些常見的集合框架對(duì)應(yīng)流的可拆分性能表:

可拆分性
ArrayList極佳
LinkedList
IntStream.range極佳
Stream.iterate
HashSet
TreeSet

到此,相信大家對(duì)“怎么理解Java 8并行流”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI