您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關(guān)Java中ThreadPoolExecutor線程池的使用方法,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
Executors
Executors 是一個Java中的工具類. 提供工廠方法來創(chuàng)建不同類型的線程池.
從上圖中也可以看出, Executors的創(chuàng)建線程池的方法, 創(chuàng)建出來的線程池都實現(xiàn)了 ExecutorService接口. 常用方法有以下幾個:
newFixedThreadPool(int Threads): 創(chuàng)建固定數(shù)目線程的線程池, 超出的線程會在隊列中等待.
newCachedThreadPool(): 創(chuàng)建一個可緩存線程池, 如果線程池長度超過處理需要, 可靈活回收空閑線程(60秒), 若無可回收,則新建線程.
newSingleThreadExecutor(): 創(chuàng)建一個單線程化的線程池, 它只會用唯一的工作線程來執(zhí)行任務(wù), 保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行. 如果某一個任務(wù)執(zhí)行出錯, 將有另一個線程來繼續(xù)執(zhí)行.
newScheduledThreadPool(int corePoolSize): 創(chuàng)建一個支持定時及周期性的任務(wù)執(zhí)行的線程池, 多數(shù)情況下可用來替代Timer類.
Executors 例子
newCachedThreadPool
線程最大數(shù)為 Integer.MAX_VALUE, 當我們往線程池添加了 n 個任務(wù), 這 n 個任務(wù)都是一起執(zhí)行的.
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); cachedThreadPool.execute(new Runnable() { @Override public void run() { for (;;) { try { Thread.currentThread().sleep(1000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }); cachedThreadPool.execute(new Runnable() { @Override public void run() { for (;;) { try { Thread.currentThread().sleep(1000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } });
ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1); cachedThreadPool.execute(new Runnable() { @Override public void run() { for (;;) { try { Thread.currentThread().sleep(1000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }); cachedThreadPool.execute(new Runnable() { @Override public void run() { for (;;) { try { Thread.currentThread().sleep(1000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } });
三秒執(zhí)行一次, 只有執(zhí)行完這一次后, 才會執(zhí)行.
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.schedule(new Runnable() { @Override public void run() { for (;;) { try { Thread.currentThread().sleep(2000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } }, 3, TimeUnit.SECONDS);
順序執(zhí)行各個任務(wù), 第一個任務(wù)執(zhí)行完, 才會執(zhí)行下一個.
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { @Override public void run() { for (;;) { try { System.out.println(Thread.currentThread().getName()); Thread.currentThread().sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); executorService.execute(new Runnable() { @Override public void run() { for (;;) { try { System.out.println(Thread.currentThread().getName()); Thread.currentThread().sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } });
Executors存在什么問題
在阿里巴巴Java開發(fā)手冊中提到,使用Executors創(chuàng)建線程池可能會導(dǎo)致OOM(OutOfMemory ,內(nèi)存溢出),但是并沒有說明為什么,那么接下來我們就來看一下到底為什么不允許使用Executors?
我們先來一個簡單的例子,模擬一下使用Executors導(dǎo)致OOM的情況.
/** * @author Hollis */ public class ExecutorsDemo { private static ExecutorService executor = Executors.newFixedThreadPool(15); public static void main(String[] args) { for (int i = 0; i < Integer.MAX_VALUE; i++) { executor.execute(new SubThread()); } } } class SubThread implements Runnable { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { //do nothing } } }
通過指定JVM參數(shù):-Xmx8m -Xms8m 運行以上代碼,會拋出OOM:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
以上代碼指出,ExecutorsDemo.java 的第16行,就是代碼中的 executor.execute(new SubThread());
Java中的 BlockingQueue 主要有兩種實現(xiàn), 分別是 ArrayBlockingQueue 和 LinkedBlockingQueue.
ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界阻塞隊列, 必須設(shè)置容量.
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue 是一個用鏈表實現(xiàn)的有界阻塞隊列, 容量可以選擇進行設(shè)置, 不設(shè)置的話, 將是一個無邊界的阻塞隊列, 最大長度為 Integer.MAX_VALUE.
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
這里的問題就出在如果我們不設(shè)置 LinkedBlockingQueue 的容量的話, 其默認容量將會是 Integer.MAX_VALUE.
而 newFixedThreadPool 中創(chuàng)建 LinkedBlockingQueue 時, 并未指定容量. 此時, LinkedBlockingQueue 就是一個無邊界隊列, 對于一個無邊界隊列來說, 是可以不斷的向隊列中加入任務(wù)的, 這種情況下就有可能因為任務(wù)過多而導(dǎo)致內(nèi)存溢出問題.
newCachedThreadPool 和 newScheduledThreadPool 這兩種方式創(chuàng)建的最大線程數(shù)可能是Integer.MAX_VALUE, 而創(chuàng)建這么多線程, 必然就有可能導(dǎo)致OOM.
ThreadPoolExecutor 創(chuàng)建線程池
避免使用 Executors 創(chuàng)建線程池, 主要是避免使用其中的默認實現(xiàn), 那么我們可以自己直接調(diào)用 ThreadPoolExecutor 的構(gòu)造函數(shù)來自己創(chuàng)建線程池. 在創(chuàng)建的同時, 給 BlockQueue 指定容量就可以了.
ExecutorService executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));
這種情況下, 一旦提交的線程數(shù)超過當前可用線程數(shù)時, 就會拋出 java.util.concurrent.RejectedExecutionException, 這是因為當前線程池使用的隊列是有邊界隊列, 隊列已經(jīng)滿了便無法繼續(xù)處理新的請求.
除了自己定義 ThreadPoolExecutor 外. 還有其他方法. 如apache和guava等.
四個構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
int corePoolSize => 該線程池中核心線程數(shù)最大值
線程池新建線程的時候,如果當前線程總數(shù)小于corePoolSize, 則新建的是核心線程, 如果超過corePoolSize, 則新建的是非核心線程
核心線程默認情況下會一直存活在線程池中, 即使這個核心線程啥也不干(閑置狀態(tài)).
如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 這個屬性為 true, 那么核心線程如果不干活(閑置狀態(tài))的話, 超過一定時間(時長下面參數(shù)決定), 就會被銷毀掉
很好理解吧, 正常情況下你不干活我也養(yǎng)你, 因為我總有用到你的時候, 但有時候特殊情況(比如我自己都養(yǎng)不起了), 那你不干活我就要把你干掉了
int maximumPoolSize
該線程池中線程總數(shù)最大值
線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù).
long keepAliveTime
該線程池中非核心線程閑置超時時長
一個非核心線程, 如果不干活(閑置狀態(tài))的時長超過這個參數(shù)所設(shè)定的時長, 就會被銷毀掉
如果設(shè)置 allowCoreThreadTimeOut = true, 則會作用于核心線程
TimeUnit unit
keepAliveTime的單位, TimeUnit是一個枚舉類型, 其包括:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
BlockingQueue workQueue
一個阻塞隊列, 用來存儲等待執(zhí)行的任務(wù). 也就是說現(xiàn)在有10個任務(wù), 核心線程 有四個, 非核心線程有六個, 那么這六個線程會被添加到 workQueue 中, 等待執(zhí)行.
這個參數(shù)的選擇也很重要, 會對線程池的運行過程產(chǎn)生重大影響, 一般來說, 這里的阻塞隊列有以下幾種選擇:
SynchronousQueue: 這個隊列接收到任務(wù)的時候, 會直接提交給線程處理, 而不保留它, 如果所有線程都在工作怎么辦? 那就*新建一個線程來處理這個任務(wù)!所以為了保證不出現(xiàn)<線程數(shù)達到了maximumPoolSize而不能新建線程>的錯誤, 使用這個類型隊列的時候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即無限大.
LinkedBlockingQueue: 這個隊列接收到任務(wù)的時候, 如果當前線程數(shù)小于核心線程數(shù), 則核心線程處理任務(wù); 如果當前線程數(shù)等于核心線程數(shù), 則進入隊列等待. 由于這個隊列最大值為 Integer.MAX_VALUE , 即所有超過核心線程數(shù)的任務(wù)都將被添加到隊列中,這也就導(dǎo)致了 maximumPoolSize 的設(shè)定失效, 因為總線程數(shù)永遠不會超過 corePoolSize.
ArrayBlockingQueue: 可以限定隊列的長度, 接收到任務(wù)的時候, 如果沒有達到 corePoolSize 的值, 則核心線程執(zhí)行任務(wù), 如果達到了, 則入隊等候, 如果隊列已滿, 則新建線程(非核心線程)執(zhí)行任務(wù), 又如果總線程數(shù)到了maximumPoolSize, 并且隊列也滿了, 則發(fā)生錯誤.
DelayQueue: 隊列內(nèi)元素必須實現(xiàn) Delayed 接口, 這就意味著你傳進去的任務(wù)必須先實現(xiàn)Delayed接口. 這個隊列接收到任務(wù)時, 首先先入隊, 只有達到了指定的延時時間, 才會執(zhí)行任務(wù).
ThreadFactory threadFactory
它是ThreadFactory類型的變量, 用來創(chuàng)建新線程.
默認使用 Executors.defaultThreadFactory()
來創(chuàng)建線程. 使用默認的 ThreadFactory
來創(chuàng)建線程時, 會使新創(chuàng)建的線程具有相同的 NORM_PRIORITY
優(yōu)先級并且是非守護線程, 同時也設(shè)置了線程的名稱.
RejectedExecutionHandler handler
表示當拒絕處理任務(wù)時的策略, 有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常(默認). ThreadPoolExecutor.DiscardPolicy:直接丟棄任務(wù), 但是不拋出異常. ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù), 然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù).
關(guān)于Java中ThreadPoolExecutor線程池的使用方法就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。