溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度

發(fā)布時間:2021-10-20 17:07:37 來源:億速云 閱讀:510 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

簡介

在 java8 中 添加了流Stream,可以讓你以一種聲明的方式處理數(shù)據(jù)。使用起來非常簡單優(yōu)雅。ParallelStream 則是一個并行執(zhí)行的流,采用 ForkJoinPool 并行執(zhí)行任務(wù),提高執(zhí)行速度。<br>     <br>    下面我們看看2個簡單的示例:

示例1 (list)

Arrays.asList(1,2,3,4,5,6)
	.parallelStream()
	.forEach((value) -&gt; {
		String name = Thread.currentThread().getName();
		System.out.println("示例1 Thread:" + name + " value:" + value);
	});

<a name="951e1e2c"></a>

示例2 (array)

Stream.of(1,2,3,4,5,6)
	.parallel()
	.forEach((value) -&gt; {
		String name = Thread.currentThread().getName();
		System.out.println("示例2 Thread:" + name + " value:" + value);
	});

問題引出

    筆者最近在做一些爬蟲相關(guān)的業(yè)務(wù),其核心工具已開源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,經(jīng)過2個版本的迭代已經(jīng)發(fā)展成了一個強(qiáng)大非賬號爬蟲利器,趕緊來試試吧。

如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度

如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度

我們采集了大量的代理 ip 用來供爬蟲使用,其中有個定時任務(wù)每 5 分鐘去檢測代理是否失效,代理 ip 檢測比較費時,我們給每個檢測的請求 設(shè)定了 2s 的超時,這樣單線程的話 1000 個 ip 就得消耗半個多小時,當(dāng)然筆者在校驗的時候采用的 parallel Stream 簡化開發(fā)。

然后發(fā)現(xiàn)效果并不明顯,代理 ip 數(shù)量上來之后 5 分鐘完全檢測不完,導(dǎo)致任務(wù)堆積。明明用了并發(fā)流為什么沒有明顯的提高執(zhí)行速度呢?

如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度

下面我們來看看剛剛的“示例”打印出的信息:

示例1 Thread:main value:4
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1
示例1 Thread:main value:6
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例1 Thread:main value:3
示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2
示例2 Thread:main value:4
示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3
示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1
示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2
示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6

我們可以看到 Parallel Stream,默認(rèn)采用的是一個 ForkJoinPool.commonPool 的線程池,這樣我們就算使用了 Parallel Stream, 整個 jvm 共用一個 common pool 線程池,一不小心就任務(wù)堆積了,在校驗代理 ip 的時候我們還有采集代理等其他的任務(wù)中也大量使用了并發(fā)流, 這樣也就印證了為什么會任務(wù)堆積了。

解決問題

使用自定義 ForkJoinPool 執(zhí)行速度。示例代碼如下:

// 示例:自定義線程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

// 這里是從數(shù)據(jù)庫里查出來的一批代理 ip
List<proxylist> records = new ArrayList&lt;&gt;();

// 找出失效的代理 ip
List<string> needDeleteList = forkJoinPool.submit(() -&gt; records.parallelStream()
	.map(ProxyList::getIpPort)
	.filter(IProxyListTask::isFailed)
	.collect(Collectors.toList())
).join();

// 刪除失效的代理

整個代碼依然比較優(yōu)雅,在使用自定義的 ForkJoin 線程池之后,執(zhí)行速度有了明顯的提升。以前 5 分鐘執(zhí)行不完的任務(wù)現(xiàn)在 2 分鐘之內(nèi)就能全部執(zhí)行完畢。

結(jié)論

java8 的并發(fā)流在大批量數(shù)據(jù)處理時可簡化多線程的使用,在遇到耗時業(yè)務(wù)或者重度使用并發(fā)流不妨根據(jù)業(yè)務(wù)情況采用自定義線程池來提示處理速度。


看完上述內(nèi)容,你們對如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI