您好,登錄后才能下訂單哦!
這篇“Java ThreadPoolExecutor的拒絕策略怎么實現(xiàn)”文章的知識點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Java ThreadPoolExecutor的拒絕策略怎么實現(xiàn)”文章吧。
線程池的原理如下圖:
說明:
當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)。
運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)添加到隊列中。
當(dāng)任務(wù)隊列已滿,則在非corePool中創(chuàng)建新的線程來處理任務(wù)。
創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
線程池為我們提供了四種拒絕策略分別是:CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy
ThreadPoolExecutor中默認(rèn)的拒絕策略就是AbortPolicy直接拋出異常,具體實現(xiàn)如下
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
說明:這種策略非常簡單粗暴,直接拋出RejectedExecutionException異常,也不會執(zhí)行后續(xù)的任務(wù)。
示例說明:
public class ThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(1), new ThreadPoolExecutor.AbortPolicy()); //異步執(zhí)行 for(int i=0; i<10;i++) { System.out.println("添加第"+i+"個任務(wù)"); threadPoolExecutor.execute(new TestThread("線程"+i)); } } } public class TestThread implements Runnable { private String name; public TestThread(String name){ this.name=name; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread name:"+Thread.currentThread().getName()+",執(zhí)行:"+name); } }
執(zhí)行結(jié)果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.skywares.fw.juc.thread.TestThread@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.skywares.fw.juc.thread.ThreadPoolTest.main(ThreadPoolTest.java:26)
thread name:pool-1-thread-5,執(zhí)行:線程5
thread name:pool-1-thread-2,執(zhí)行:線程1
thread name:pool-1-thread-4,執(zhí)行:線程4
thread name:pool-1-thread-3,執(zhí)行:線程3
thread name:pool-1-thread-1,執(zhí)行:線程0
thread name:pool-1-thread-5,執(zhí)行:線程2
從執(zhí)行結(jié)果我們得知,采用AbortPolicy策略當(dāng)任務(wù)執(zhí)行到第七個任務(wù)時會直接報錯,導(dǎo)致后續(xù)的業(yè)務(wù)邏輯不會執(zhí)行。
CallerRunsPolicy在任務(wù)被拒絕添加后,會用調(diào)用execute函數(shù)的上層線程去執(zhí)行被拒絕的任務(wù)。
相關(guān)示例
public class ThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(1), new ThreadPoolExecutor.CallerRunsPolicy()); //異步執(zhí)行 for(int i=0; i<10;i++) { System.out.println("添加第"+i+"個任務(wù)"); threadPoolExecutor.execute(new TestThread("線程"+i)); } } }
執(zhí)行結(jié)果:
添加第0個任務(wù)
添加第1個任務(wù)
添加第2個任務(wù)
添加第3個任務(wù)
添加第4個任務(wù)
添加第5個任務(wù)
添加第6個任務(wù)
thread name:main,執(zhí)行:線程6
thread name:pool-1-thread-3,執(zhí)行:線程3
thread name:pool-1-thread-1,執(zhí)行:線程0
thread name:pool-1-thread-4,執(zhí)行:線程4
thread name:pool-1-thread-2,執(zhí)行:線程1
thread name:pool-1-thread-5,執(zhí)行:線程5
添加第7個任務(wù)
添加第8個任務(wù)
thread name:main,執(zhí)行:線程8
thread name:pool-1-thread-1,執(zhí)行:線程7
thread name:pool-1-thread-3,執(zhí)行:線程2
添加第9個任務(wù)
thread name:pool-1-thread-1,執(zhí)行:線程9
從執(zhí)行的結(jié)果我們可以得知,當(dāng)執(zhí)行到第7個任務(wù)時,由于線程池拒絕策略,此任務(wù)由主線程來執(zhí)行,當(dāng)線程池有空閑時,才繼續(xù)執(zhí)行其他的任務(wù)。所以此策略可能會阻塞主線程。
這種拒絕策略比較簡單,線程池拒絕的任務(wù)直接拋棄,不會拋異常也不會執(zhí)行
修改上述的代碼,將拒絕策略修改為DiscardPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(1), new ThreadPoolExecutor.CallerRunsPolicy());
invoke dealStock success
goodsId:手機(jī)
thread name:pool-1-thread-1,執(zhí)行:線程0
thread name:pool-1-thread-4,執(zhí)行:線程4
thread name:pool-1-thread-5,執(zhí)行:線程5
thread name:pool-1-thread-3,執(zhí)行:線程3
thread name:pool-1-thread-2,執(zhí)行:線程1
thread name:pool-1-thread-1,執(zhí)行:線程2
從執(zhí)行的結(jié)果來看只執(zhí)行了6個任務(wù),其他的任務(wù)都被拋棄了。
DiscardOldestPolicy 當(dāng)任務(wù)拒絕添加時,會拋棄任務(wù)隊列中最先加入隊列的任務(wù),再把新任務(wù)添加進(jìn)去。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 2, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
執(zhí)行結(jié)果:
添加第0個任務(wù)
添加第1個任務(wù)
添加第2個任務(wù)
添加第3個任務(wù)
添加第4個任務(wù)
添加第5個任務(wù)
invoke dealStock success
goodsId:手機(jī)
thread name:pool-1-thread-2,執(zhí)行:線程3
thread name:pool-1-thread-1,執(zhí)行:線程0
thread name:pool-1-thread-1,執(zhí)行:線程2
thread name:pool-1-thread-2,執(zhí)行:線程1
當(dāng)線程池提供的拒絕策略無法滿足要求時,我們可以采用自定義的拒絕策略,只需要實現(xiàn)RejectedExecutionHandler接口即可
public class CustRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { new Thread(r,"線程:"+new Random().nextInt(10)).start(); } } ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 2, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(2), new CustRejectedExecutionHandler());
執(zhí)行結(jié)果:
thread name:客戶線程:6,執(zhí)行:線程5
thread name:pool-1-thread-1,執(zhí)行:線程0
thread name:客戶線程:8,執(zhí)行:線程4
thread name:pool-1-thread-2,執(zhí)行:線程3
thread name:pool-1-thread-1,執(zhí)行:線程1
thread name:pool-1-thread-2,執(zhí)行:線程2
從執(zhí)行的結(jié)果來看,被拒絕的任務(wù)都在客戶的新線程中執(zhí)行。
AbortPolicy:直接拋出異常,后續(xù)的任務(wù)不會執(zhí)行
CallerRunsPolicy:子任務(wù)執(zhí)行的時間過長,可能會阻塞主線程。
DiscardPolicy:不拋異常,任務(wù)直接丟棄
DiscardOldestPolicy;丟棄最先加入隊列的任務(wù)
以上就是關(guān)于“Java ThreadPoolExecutor的拒絕策略怎么實現(xiàn)”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。