您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
在 java8 中 添加了流Stream,可以讓你以一種聲明的方式處理數(shù)據(jù)。使用起來非常簡單優(yōu)雅。ParallelStream 則是一個并行執(zhí)行的流,采用 ForkJoinPool 并行執(zhí)行任務(wù),提高執(zhí)行速度。<br> <br> 下面我們看看2個簡單的示例:
Arrays.asList(1,2,3,4,5,6) .parallelStream() .forEach((value) -> { String name = Thread.currentThread().getName(); System.out.println("示例1 Thread:" + name + " value:" + value); });
<a name="951e1e2c"></a>
Stream.of(1,2,3,4,5,6) .parallel() .forEach((value) -> { String name = Thread.currentThread().getName(); System.out.println("示例2 Thread:" + name + " value:" + value); });
筆者最近在做一些爬蟲相關(guān)的業(yè)務(wù),其核心工具已開源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,經(jīng)過2個版本的迭代已經(jīng)發(fā)展成了一個強(qiáng)大非賬號爬蟲利器,趕緊來試試吧。
我們采集了大量的代理 ip 用來供爬蟲使用,其中有個定時任務(wù)每 5 分鐘去檢測代理是否失效,代理 ip 檢測比較費時,我們給每個檢測的請求 設(shè)定了 2s 的超時,這樣單線程的話 1000 個 ip 就得消耗半個多小時,當(dāng)然筆者在校驗的時候采用的 parallel Stream 簡化開發(fā)。
然后發(fā)現(xiàn)效果并不明顯,代理 ip 數(shù)量上來之后 5 分鐘完全檢測不完,導(dǎo)致任務(wù)堆積。明明用了并發(fā)流為什么沒有明顯的提高執(zhí)行速度呢?
下面我們來看看剛剛的“示例”打印出的信息:
示例1 Thread:main value:4 示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1 示例1 Thread:main value:6 示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5 示例1 Thread:main value:3 示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2 示例2 Thread:main value:4 示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3 示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5 示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1 示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2 示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6
我們可以看到 Parallel Stream,默認(rèn)采用的是一個 ForkJoinPool.commonPool 的線程池,這樣我們就算使用了 Parallel Stream, 整個 jvm 共用一個 common pool 線程池,一不小心就任務(wù)堆積了,在校驗代理 ip 的時候我們還有采集代理等其他的任務(wù)中也大量使用了并發(fā)流, 這樣也就印證了為什么會任務(wù)堆積了。
使用自定義 ForkJoinPool 執(zhí)行速度。示例代碼如下:
// 示例:自定義線程池 ForkJoinPool forkJoinPool = new ForkJoinPool(8); // 這里是從數(shù)據(jù)庫里查出來的一批代理 ip List<proxylist> records = new ArrayList<>(); // 找出失效的代理 ip List<string> needDeleteList = forkJoinPool.submit(() -> records.parallelStream() .map(ProxyList::getIpPort) .filter(IProxyListTask::isFailed) .collect(Collectors.toList()) ).join(); // 刪除失效的代理
整個代碼依然比較優(yōu)雅,在使用自定義的 ForkJoin 線程池之后,執(zhí)行速度有了明顯的提升。以前 5 分鐘執(zhí)行不完的任務(wù)現(xiàn)在 2 分鐘之內(nèi)就能全部執(zhí)行完畢。
java8 的并發(fā)流在大批量數(shù)據(jù)處理時可簡化多線程的使用,在遇到耗時業(yè)務(wù)或者重度使用并發(fā)流不妨根據(jù)業(yè)務(wù)情況采用自定義線程池來提示處理速度。
看完上述內(nèi)容,你們對如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。