溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java線程池ThreadPoolExecutor拒絕策略有哪些

發(fā)布時(shí)間:2021-11-01 14:30:44 來源:億速云 閱讀:130 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“Java線程池ThreadPoolExecutor拒絕策略有哪些”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

池化設(shè)計(jì)思想

池話設(shè)計(jì)應(yīng)該不是一個(gè)新名詞。我們常見的如java線程池、jdbc連接池、redis連接池等就是這類設(shè)計(jì)的代表實(shí)現(xiàn)。這種設(shè)計(jì)會初始預(yù)設(shè)資源,解決的問題就是抵消每次獲取資源的消耗,如創(chuàng)建線程的開銷,獲取遠(yuǎn)程連接的開銷等。就好比你去食堂打飯,打飯的大媽會先把飯盛好幾份放那里,你來了就直接拿著飯盒加菜即可,不用再臨時(shí)又盛飯又打菜,效率就高了。除了初始化資源,池化設(shè)計(jì)還包括如下這些特征:池子的初始值、池子的活躍值、池子的最大值等,這些特征可以直接映射到j(luò)ava線程池和數(shù)據(jù)庫連接池的成員屬性中。

線程池觸發(fā)拒絕策略的時(shí)機(jī)

和數(shù)據(jù)源連接池不一樣,線程池除了初始大小和池子最大值,還多了一個(gè)阻塞隊(duì)列來緩沖。數(shù)據(jù)源連接池一般請求的連接數(shù)超過連接池的最大值的時(shí)候就會觸發(fā)拒絕策略,策略一般是阻塞等待設(shè)置的時(shí)間或者直接拋異常。而線程池的觸發(fā)時(shí)機(jī)如下圖:

Java線程池ThreadPoolExecutor拒絕策略有哪些

如圖,想要了解線程池什么時(shí)候觸發(fā)拒絕粗略,需要明確上面三個(gè)參數(shù)的具體含義,是這三個(gè)參數(shù)總體協(xié)調(diào)的結(jié)果,而不是簡單的超過最大線程數(shù)就會觸發(fā)線程拒絕粗略,當(dāng)提交的任務(wù)數(shù)大于corePoolSize時(shí),會優(yōu)先放到隊(duì)列緩沖區(qū),只有填滿了緩沖區(qū)后,才會判斷當(dāng)前運(yùn)行的任務(wù)是否大于maxPoolSize,小于時(shí)會新建線程處理。大于時(shí)就觸發(fā)了拒絕策略,總結(jié)就是:當(dāng)前提交任務(wù)數(shù)大于(maxPoolSize  + queueCapacity)時(shí)就會觸發(fā)線程池的拒絕策略了。

JDK內(nèi)置4種線程池拒絕策略

拒絕策略接口定義

在分析JDK自帶的線程池拒絕策略前,先看下JDK定義的 拒絕策略接口,如下:

public interface RejectedExecutionHandler {  void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }

接口定義很明確,當(dāng)觸發(fā)拒絕策略時(shí),線程池會調(diào)用你設(shè)置的具體的策略,將當(dāng)前提交的任務(wù)以及線程池實(shí)例本身傳遞給你處理,具體作何處理,不同場景會有不同的考慮,下面看JDK為我們內(nèi)置了哪些實(shí)現(xiàn):

CallerRunsPolicy(調(diào)用者運(yùn)行策略)

public static class CallerRunsPolicy implements RejectedExecutionHandler {  public CallerRunsPolicy() { }  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  if (!e.isShutdown()) {  r.run();  }  }  }
  • 功能:當(dāng)觸發(fā)拒絕策略時(shí),只要線程池沒有關(guān)閉,就由提交任務(wù)的當(dāng)前線程處理。

  • 使用場景:一般在不允許失敗的、對性能要求不高、并發(fā)量較小的場景下使用,因?yàn)榫€程池一般情況下不會關(guān)閉,也就是提交的任務(wù)一定會被運(yùn)行,但是由于是調(diào)用者線程自己執(zhí)行的,當(dāng)多次提交任務(wù)時(shí),就會阻塞后續(xù)任務(wù)執(zhí)行,性能和效率自然就慢了。

AbortPolicy(中止策略)

public static class AbortPolicy implements RejectedExecutionHandler {  public AbortPolicy() { }  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  throw new RejectedExecutionException("Task " + r.toString() +  " rejected from " +  e.toString());  }  }
  • 功能:當(dāng)觸發(fā)拒絕策略時(shí),直接拋出拒絕執(zhí)行的異常,中止策略的意思也就是打斷當(dāng)前執(zhí)行流程

  • 使用場景:這個(gè)就沒有特殊的場景了,但是一點(diǎn)要正確處理拋出的異常。ThreadPoolExecutor中默認(rèn)的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因?yàn)槎紱]有顯示的設(shè)置拒絕策略,所以默認(rèn)的都是這個(gè)。但是請注意,ExecutorService中的線程池實(shí)例隊(duì)列都是無界的,也就是說把內(nèi)存撐爆了都不會觸發(fā)拒絕策略。當(dāng)自己自定義線程池實(shí)例時(shí),使用這個(gè)策略一定要處理好觸發(fā)策略時(shí)拋的異常,因?yàn)樗麜驍喈?dāng)前的執(zhí)行流程。

DiscardPolicy(丟棄策略)

public static class DiscardPolicy implements RejectedExecutionHandler {  public DiscardPolicy() { }  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  }  }
  • 功能:直接靜悄悄的丟棄這個(gè)任務(wù),不觸發(fā)任何動作

  • 使用場景:如果你提交的任務(wù)無關(guān)緊要,你就可以使用它 。因?yàn)樗褪莻€(gè)空實(shí)現(xiàn),會悄無聲息的吞噬你的的任務(wù)。所以這個(gè)策略基本上不用了

DiscardOldestPolicy(棄老策略)

public static class DiscardOldestPolicy implements RejectedExecutionHandler {  public DiscardOldestPolicy() { }  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  if (!e.isShutdown()) {  e.getQueue().poll();  e.execute(r);  }  }  }
  • 功能:如果線程池未關(guān)閉,就彈出隊(duì)列頭部的元素,然后嘗試執(zhí)行

  • 使用場景:這個(gè)策略還是會丟棄任務(wù),丟棄時(shí)也是毫無聲息,但是特點(diǎn)是丟棄的是老的未執(zhí)行的任務(wù),而且是待執(zhí)行優(yōu)先級較高的任務(wù)?;谶@個(gè)特性,我能想到的場景就是,發(fā)布消息,和修改消息,當(dāng)消息發(fā)布出去后,還未執(zhí)行,此時(shí)更新的消息又來了,這個(gè)時(shí)候未執(zhí)行的消息的版本比現(xiàn)在提交的消息版本要低就可以被丟棄了。因?yàn)殛?duì)列中還有可能存在消息版本更低的消息會排隊(duì)執(zhí)行,所以在真正處理消息的時(shí)候一定要做好消息的版本比較

第三方實(shí)現(xiàn)的拒絕策略

dubbo中的線程拒絕策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {  protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);  private final String threadName;  private final URL url;  private static volatile long lastPrintTime = 0;  private static Semaphore guard = new Semaphore(1);  public AbortPolicyWithReport(String threadName, URL url) {  this.threadName = threadName;  this.url = url;  }  @Override  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  String msg = String.format("Thread pool is EXHAUSTED!" +  " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +  " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",  threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),  e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),  url.getProtocol(), url.getIp(), url.getPort());  logger.warn(msg);  dumpJStack();  throw new RejectedExecutionException(msg);  }  private void dumpJStack() {  //省略實(shí)現(xiàn)  } }

可以看到,當(dāng)dubbo的工作線程觸發(fā)了線程拒絕后,主要做了三個(gè)事情,原則就是盡量讓使用者清楚觸發(fā)線程拒絕策略的真實(shí)原因

  • 輸出了一條警告級別的日志,日志內(nèi)容為線程池的詳細(xì)設(shè)置參數(shù),以及線程池當(dāng)前的狀態(tài),還有當(dāng)前拒絕任務(wù)的一些詳細(xì)信息??梢哉f,這條日志,使用dubbo的有過生產(chǎn)運(yùn)維經(jīng)驗(yàn)的或多或少是見過的,這個(gè)日志簡直就是日志打印的典范,其他的日志打印的典范還有spring。得益于這么詳細(xì)的日志,可以很容易定位到問題所在

  • 輸出當(dāng)前線程堆棧詳情,這個(gè)太有用了,當(dāng)你通過上面的日志信息還不能定位問題時(shí),案發(fā)現(xiàn)場的dump線程上下文信息就是你發(fā)現(xiàn)問題的救命稻草。

  • 繼續(xù)拋出拒絕執(zhí)行異常,使本次任務(wù)失敗,這個(gè)繼承了JDK默認(rèn)拒絕策略的特性

Netty中的線程池拒絕策略

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {  NewThreadRunsPolicy() {  super();  }  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  try {  final Thread t = new Thread(r, "Temporary task executor");  t.start();  } catch (Throwable e) {  throw new RejectedExecutionException(  "Failed to start a new thread", e);  }  }  }

Netty中的實(shí)現(xiàn)很像JDK中的CallerRunsPolicy,舍不得丟棄任務(wù)。不同的是,CallerRunsPolicy是直接在調(diào)用者線程執(zhí)行的任務(wù)。而  Netty是新建了一個(gè)線程來處理的。所以,Netty的實(shí)現(xiàn)相較于調(diào)用者執(zhí)行策略的使用面就可以擴(kuò)展到支持高效率高性能的場景了。但是也要注意一點(diǎn),Netty的實(shí)現(xiàn)里,在創(chuàng)建線程時(shí)未做任何的判斷約束,也就是說只要系統(tǒng)還有資源就會創(chuàng)建新的線程來處理,直到new不出新的線程了,才會拋創(chuàng)建線程失敗的異常

activeMq中的線程池拒絕策略

new RejectedExecutionHandler() {  @Override  public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {  try {  executor.getQueue().offer(r, 60, TimeUnit.SECONDS);  } catch (InterruptedException e) {  throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");  }  throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");  }  });

activeMq中的策略屬于最大努力執(zhí)行任務(wù)型,當(dāng)觸發(fā)拒絕策略時(shí),在嘗試一分鐘的時(shí)間重新將任務(wù)塞進(jìn)任務(wù)隊(duì)列,當(dāng)一分鐘超時(shí)還沒成功時(shí),就拋出異常

pinpoint中的線程池拒絕策略

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {  private final RejectedExecutionHandler[] handlerChain;  public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {  Objects.requireNonNull(chain, "handlerChain must not be null");  RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);  return new RejectedExecutionHandlerChain(handlerChain);  }  private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {  this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");  }  @Override  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {  rejectedExecutionHandler.rejectedExecution(r, executor);  }  } }

pinpoint的拒絕策略實(shí)現(xiàn)很有特點(diǎn),和其他的實(shí)現(xiàn)都不同。他定義了一個(gè)拒絕策略鏈,包裝了一個(gè)拒絕策略列表,當(dāng)觸發(fā)拒絕策略時(shí),會將策略鏈中的rejectedExecution依次執(zhí)行一遍

“Java線程池ThreadPoolExecutor拒絕策略有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI