您好,登錄后才能下訂單哦!
本篇文章為大家展示了Java中怎么使用Executor框架,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
引入線程池
任務是一組邏輯工作單元,線程則是使任務異步執(zhí)行的機制。當存在大量并發(fā)任務時,創(chuàng)建、銷毀線程需要很大的開銷,運用線程池可以大大減小開銷。
Executor框架
說明:
Executor 執(zhí)行器接口,該接口定義執(zhí)行Runnable任務的方式。
ExecutorService 該接口定義提供對Executor的服務。
ScheduledExecutorService 定時調(diào)度接口。
AbstractExecutorService 執(zhí)行框架抽象類。
ThreadPoolExecutor JDK中線程池的具體實現(xiàn)。
Executors 線程池工廠類。
ThreadPoolExecutor線程池類
線程池是一個復雜的任務調(diào)度工具,它涉及到任務、線程池等的生命周期問題。要配置一個線程池是比較復雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優(yōu)的。
JDK中的線程池均由ThreadPoolExecutor類實現(xiàn)。其構造方法如下:
參數(shù)說明:
corePoolSize:核心線程數(shù)。
maximumPoolSize:最大線程數(shù)。
keepAliveTime:線程存活時間。當線程數(shù)大于core數(shù),那么超過該時間的線程將會被終結。
unit:keepAliveTime的單位。java.util.concurrent.TimeUnit類存在靜態(tài)靜態(tài)屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS
workQueue:Runnable的阻塞隊列。若線程池已經(jīng)被占滿,則該隊列用于存放無法再放入線程池中的Runnable。
另一個構造方法:
復制代碼 代碼如下:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
該方法在下面的擴展部分有更深入的講解。其中handler表示線程池對拒絕任務的處理策略。
ThreadPoolExecutor的使用需要注意以下概念:
若線程池中的線程數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來處理被添加的任務。
若線程池中的線程數(shù)量等于 corePoolSize且緩沖隊列 workQueue未滿,則任務被放入緩沖隊列。
若線程池中線程的數(shù)量大于corePoolSize且緩沖隊列workQueue滿,且線程池中的數(shù)量小于maximumPoolSize,則建新的線程來處理被添加的任務。
若線程池中線程的數(shù)量大于corePoolSize且緩沖隊列workQueue滿,且線程池中的數(shù)量等于maximumPoolSize,那么通過 handler所指定的策略來處理此任務。
當線程池中的線程數(shù)量大于corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。
Executors 工廠方法
JDK內(nèi)部提供了五種最常見的線程池。由Executors類的五個靜態(tài)工廠方法創(chuàng)建。
newFixedThreadPool(...)
newSingleThreadExecutor(...)
newCachedThreadPool(...)
newScheduledThreadPool(...)
newSingleThreadScheduledExecutor()
單線程的線程池newSingleThreadExecutor
這個線程池只有一個線程在工作,也就是相當于單線程串行執(zhí)行所有任務。
返回單線程的Executor,將多個任務交給此Exector時,這個線程處理完一個任務后接著處理下一個任務,若該線程出現(xiàn)異常,將會有一個新的線程來替代。此線程池保證所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。
說明:LinkedBlockingQueue會無限的添加需要執(zhí)行的Runnable。
創(chuàng)建固定大小的線程池newFixedThreadPool
每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結束,那么線程池會補充一個新線程。
public static ExecutorSevice newFixedThreadPool()
返回一個包含指定數(shù)目線程的線程池,如果任務數(shù)量多于線程數(shù)目,那么沒有沒有執(zhí)行的任務必須等待,直到有任務完成為止。
可緩存的線程池newCachedThreadPool
如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務)的線程,當任務數(shù)增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。
newCachedThreadPool方法創(chuàng)建的線程池可以自動的擴展線程池的容量。核心線程數(shù)量為0。
SynchronousQueue是個特殊的隊列。SynchronousQueue隊列的容量為0。當試圖為SynchronousQueue添加Runnable,則執(zhí)行會失敗。只有當一邊從SynchronousQueue取數(shù)據(jù),一邊向SynchronousQueue添加數(shù)據(jù)才可以成功。SynchronousQueue僅僅起到數(shù)據(jù)交換的作用,并不保存線程。但newCachedThreadPool()方法沒有線程上限。Runable添加到SynchronousQueue會被立刻取出。
根據(jù)用戶的任務數(shù)創(chuàng)建相應的線程來處理,該線程池不會對線程數(shù)目加以限制,完全依賴于JVM能創(chuàng)建線程的數(shù)量,可能引起內(nèi)存不足。
定時任務調(diào)度的線程池newScheduledThreadPool
創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務的需求。
例:
public class ScheduledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); } }, 1000, 2000, TimeUnit.MILLISECONDS); } }
單線程的定時任務調(diào)度線程池newSingleThreadScheduledExecutor
此線程池支持定時以及周期性執(zhí)行任務的需求。
Executor接口
Executor是一個線程執(zhí)行接口。任務執(zhí)行的主要抽象不是Thead,而是Executor。
public interface Executor{ void executor(Runnable command); }
Executor將任務的提交過程與執(zhí)行過程解耦,并用Runnable來表示任務。執(zhí)行的任務放入run方法中即可,將Runnable接口的實現(xiàn)類交給線程池的execute方法,作為它的一個參數(shù)。如果需要給任務傳遞參數(shù),可以通過創(chuàng)建一個Runnable接口的實現(xiàn)類來完成。
Executor可以支持多種不同類型的任務執(zhí)行策略。
Executor基于生產(chǎn)者消費者模式,提交任務的操作相當于生產(chǎn)者,執(zhí)行任務的線程則相當于消費者。
ExecutorService接口
線程池接口。ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:
Future<?> submit(Runnable task) <T> Future<T> submit(Callable<T> task)
這兩個方法都是向線程池中提交任務,它們的區(qū)別在于Runnable在執(zhí)行完畢后沒有結果,Callable執(zhí)行完畢后有一個結果。這在多個線程中傳遞狀態(tài)和結果是非常有用的。另外他們的相同點在于都返回一個Future對象。Future對象可以阻塞線程直到運行完畢(獲取結果,如果有的話),也可以取消任務執(zhí)行,當然也能夠檢測任務是否被取消或者是否執(zhí)行完畢。
在沒有Future之前我們檢測一個線程是否執(zhí)行完畢通常使用Thread.join()或者用一個死循環(huán)加狀態(tài)位來描述線程執(zhí)行完畢。現(xiàn)在有了更好的方法能夠阻塞線程,檢測任務執(zhí)行完畢甚至取消執(zhí)行中或者未開始執(zhí)行的任務。
ScheduledExecutorService接口
ScheduledExecutorService描述的功能和Timer/TimerTask類似,解決那些需要任務重復執(zhí)行的問題。這包括延遲時間一次性執(zhí)行、延遲時間周期性執(zhí)行以及固定延遲時間周期性執(zhí)行等。當然了繼承ExecutorService的ScheduledExecutorService擁有ExecutorService的全部特性。
線程池生命周期
線程是有多種執(zhí)行狀態(tài)的,同樣管理線程的線程池也有多種狀態(tài)。JVM會在所有線程(非后臺daemon線程)全部終止后才退出,為了節(jié)省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候無法正確的關閉線程池,將會阻止JVM的結束。
線程池Executor是異步的執(zhí)行任務,因此任何時刻不能夠直接獲取提交的任務的狀態(tài)。這些任務有可能已經(jīng)完成,也有可能正在執(zhí)行或者還在排隊等待執(zhí)行。因此關閉線程池可能出現(xiàn)一下幾種情況:
平緩關閉:已經(jīng)啟動的任務全部執(zhí)行完畢,同時不再接受新的任務。
立即關閉:取消所有正在執(zhí)行和未執(zhí)行的任務。
另外關閉線程池后對于任務的狀態(tài)應該有相應的反饋信息。
啟動線程池
線程池在構造前(new操作)是初始狀態(tài),一旦構造完成線程池就進入了執(zhí)行狀態(tài)RUNNING。嚴格意義上講線程池構造完成后并沒有線程被立即啟動,只有進行"預啟動"或者接收到任務的時候才會啟動線程。
線程池是處于運行狀態(tài),隨時準備接受任務來執(zhí)行。
關閉線程池
線程池運行中可以通過shutdown()和shutdownNow()來改變運行狀態(tài)。
shutdown():平緩的關閉線程池。線程池停止接受新的任務,同時等待已經(jīng)提交的任務執(zhí)行完畢,包括那些進入隊列還沒有開始的任務。shutdown()方法執(zhí)行過程中,線程池處于SHUTDOWN狀態(tài)。
shutdownNow():立即關閉線程池。線程池停止接受新的任務,同時線程池取消所有執(zhí)行的任務和已經(jīng)進入隊列但是還沒有執(zhí)行的任務。shutdownNow()方法執(zhí)行過程中,線程池處于STOP狀態(tài)。shutdownNow方法本質(zhì)是調(diào)用Thread.interrupt()方法。但我們知道該方法僅僅是讓線程處于interrupted狀態(tài),并不會讓線程真正的停止!所以若只調(diào)用或只調(diào)用一次shutdownNow()方法,不一定會讓線程池中的線程都關閉掉,線程中必須要有處理interrupt事件的機制。
線程池結束
一旦shutdown()或者shutdownNow()執(zhí)行完畢,線程池就進入TERMINATED狀態(tài),即線程池就結束了。
isTerminating() 如果關閉后所有任務都已完成,則返回true。
isShutdown() 如果此執(zhí)行程序已關閉,則返回true。
例:使用固定大小的線程池。并將任務添加到線程池。
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class JavaThreadPool {
public static void main(String[] args) {
// 創(chuàng)建一個可重用固定線程數(shù)的線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 創(chuàng)建實現(xiàn)了Runnable接口對象,Thread對象當然也實現(xiàn)了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 將線程放入池中進行執(zhí)行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 關閉線程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行。。。");
}
}
Java線程池擴展
ThreadPoolExecutor線程池的執(zhí)行監(jiān)控
ThreadPoolExecutor中定義了三個空方法,用于監(jiān)控線程的執(zhí)行情況。
ThreadPoolExecutor源碼:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
例:使用覆蓋方法,定義新的線程池。
public class ExtThreadPoolTest { static class MyTask implements Runnable { public String name; public MyTask(String name) { super(); this.name = name; } @Override public void run() { try { Thread.sleep(500); System.out.println("執(zhí)行中:"+this.name); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執(zhí)行:" + ((MyTask)r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執(zhí)行完成:" + ((MyTask)r).name); } @Override protected void terminated() { System.out.println("執(zhí)行退出"); } }; for(int i=0;i<5;i++){ MyTask task = new MyTask("Task-"+i); es.execute(task); } Thread.sleep(10); // 等待terminated()執(zhí)行 es.shutdown(); // 若無該方法,主線程不會結束。 } }
ThreadPoolExecutor的拒絕策略
線程池不可能處理無限多的線程。所以一旦線程池中中需要執(zhí)行的任務過多,線程池對于某些任務就無法處理了。拒絕策略即對這些無法處理的任務進行處理??赡軄G棄掉這些不能處理的任務,也可能用其他方式。
ThreadPoolExecutor類還有另一個構造方法。該構造方法中的RejectedExecutionHandler用于定義拒絕策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ..... }
JDK內(nèi)部已經(jīng)提供一些拒絕策略。
AbortPolicy一旦線程不能處理,則拋出異常。
AbortPolicy源碼:
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy 一旦線程不能處理,則丟棄任務。
DiscardPolicy源碼:
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
CallerRunsPolicy 一旦線程不能處理,則將任務返回給提交任務的線程處理。
CallerRunsPolicy源碼:
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardOldestPolicy 一旦線程不能處理,丟棄掉隊列中最老的任務。
DiscardOldestPolicy源碼:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
例:自定義拒絕策略。打印并丟棄無法處理的任務。
public class RejectedPolicyHandleTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 打印并丟棄。 System.out.println(r.toString()+" is discard"); } }); for(int i=0;i<Integer.MAX_VALUE;i++){ MyTask task = new MyTask("Task-"+i); es.execute(task); Thread.sleep(10); } es.shutdown(); // 若無該方法,主線程不會結束。 } }
ThreadFactory 線程工廠
ThreadPoolExecutor類構造器的參數(shù)其中之一即為ThreadFactory線程工廠。
ThreadFactory用于創(chuàng)建線程池中的線程。
public interface ThreadFactory { Thread newThread(Runnable r); }
ThreadFactory的實現(xiàn)類中一般定義線程了線程組,線程數(shù)與線程名稱。
DefaultThreadFactory源碼:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
上述內(nèi)容就是Java中怎么使用Executor框架,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。