您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“什么是并行流”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
并行流
認(rèn)識(shí)開啟并行流
并行流是什么?是把一個(gè)流內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同線程分別處理每個(gè)不同數(shù)據(jù)塊的流。例如,有下面一個(gè)例子,在List中,需要對(duì)List數(shù)據(jù)進(jìn)行分別計(jì)算,其代碼如下所示:
List<Apple> appleList = new ArrayList<>(); // 假裝數(shù)據(jù)是從庫里查出來的 for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); }
在這里,時(shí)間復(fù)雜度為O(list.size),隨著list的增加,耗時(shí)也在增加。并行流可以解決這個(gè)問題,代碼如下所示:
appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));
這里通過調(diào)parallelStream()說明當(dāng)前流為并行流,然后進(jìn)行并行執(zhí)行。并行流內(nèi)部使用了默認(rèn)的ForkJoinPool線程池,默認(rèn)線程數(shù)為處理器的核心數(shù)。
測試并行流
普通代碼如下所示:
public static void main(String[] args) throws InterruptedException { List<Apple> appleList = initAppleList(); Date begin = new Date(); for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); Thread.sleep(1000); } Date end = new Date(); log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000); }
輸出的內(nèi)容為耗時(shí)4s。
并行代碼如下所示:
List<Apple> appleList = initAppleList(); Date begin = new Date(); appleList.parallelStream().forEach(apple -> { apple.setPrice(5.0 * apple.getWeight() / 1000); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } ); Date end = new Date(); log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
輸出結(jié)果為耗時(shí)1s??梢钥吹胶臅r(shí)大大提升了3s。
并行流拆分會(huì)影響流的速度
對(duì)于并行流來說需要注意以下幾點(diǎn):
對(duì)于 iterate 方法來處理的前 n 個(gè)數(shù)字來說,不管并行與否,它總是慢于循環(huán)的,
而對(duì)于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個(gè)痛點(diǎn)了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的
代碼如下所示:
package lambdasinaction.chap7; import java.util.stream.*; public class ParallelStreams { public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get(); } public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get(); } public static long rangedSum(long n) { return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong(); } public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong(); } } package lambdasinaction.chap7; import java.util.concurrent.*; import java.util.function.*; public class ParallelStreamsHarness { public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(); public static void main(String[] args) { System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs"); System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs"); System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" ); System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs"); System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" ); } public static <T, R> long measurePerf(Function<T, R> f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + result); if (duration < fastest) fastest = duration; } return fastest; } }
共享變量會(huì)造成數(shù)據(jù)出現(xiàn)問題
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } }
并行流的注意
盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來處理數(shù)字,以避免頻繁拆裝箱帶來的額外開銷
要考慮流的操作流水線的總計(jì)算成本,假設(shè) N 是要操作的任務(wù)總數(shù),Q 是每次操作的時(shí)間。N * Q 就是操作的總時(shí)間,Q 值越大就意味著使用并行流帶來收益的可能性越大
對(duì)于較少的數(shù)據(jù)量,不建議使用并行流
容易拆分成塊的流數(shù)據(jù),建議使用并行流
“什么是并行流”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。