您好,登錄后才能下訂單哦!
怎樣正確使用Java 線程池,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
在日常的開發(fā)工作當(dāng)中,線程池往往承載著一個應(yīng)用中最重要的業(yè)務(wù)邏輯,因此我們有必要更多地去關(guān)注線程池的執(zhí)行情況,包括異常的處理和分析等。這里主要聚焦在如何正確使用線程池上,以及提供一些實用的建議。
UncaughtExceptionHandler
我們都知道Runnable接口中的run方法是不允許拋出異常的,因此派生出這個線程的主線程可能無法直接獲得該線程在執(zhí)行過程中的異常信息。如下例:
public static void main(String[] args) throws Exception { Thread thread = new Thread(() -> { Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); System.out.println(1 / 0); // 這行會導(dǎo)致報錯! }); thread.setUncaughtExceptionHandler((t, e) -> { e.printStackTrace(); //如果你把這一行注釋掉,這個程序?qū)⒉粫伋鋈魏萎惓? }); thread.start(); }
為什么會這樣呢?其實我們看一下Thread中的源碼就會發(fā)現(xiàn),Thread在執(zhí)行過程中如果遇到了異常,會先判斷當(dāng)前線程是否有設(shè)置UncaughtExceptionHandler,如果沒有,則會從線程所在的ThreadGroup中獲取。
注意:每個線程都有自己的ThreadGroup,即使你沒有指定,并且它實現(xiàn)了UncaughtExceptionHandler接口。
我們看下ThreadGroup中默認(rèn)的對UncaughtExceptionHandler接口的實現(xiàn):
public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } }
這個ThreadGroup如果有父ThreadGroup,則調(diào)用父ThreadGroup的uncaughtException,否則調(diào)用全局默認(rèn)的Thread.DefaultUncaughtExceptionHandler,如果全局的handler也沒有設(shè)置,則只是簡單地將異常信息定位到System.err中,這就是為什么我們應(yīng)當(dāng)在創(chuàng)建線程的時候,去實現(xiàn)它的UncaughtExceptionHandler接口的原因,這么做可以讓你更方便地去排查問題。
通過execute提交任務(wù)給線程池
回到線程池這個話題,如果我們向線程池提交的任務(wù)中,沒有對異常進行try...catch處理,并且運行的時候出現(xiàn)了異常,那會對線程池造成什么影響呢?答案是沒有影響,線程池依舊可以正常工作,但是異常卻被吞掉了。這通常來說不是一個好事情,因為我們需要拿到原始的異常對象去分析問題。
那么怎樣才能拿到原始的異常對象呢?我們從線程池的源碼著手開始研究這個問題。當(dāng)然網(wǎng)上關(guān)于線程池的源碼解析文章有很多,這里限于篇幅,直接給出最相關(guān)的部分代碼:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
這個方法就是真正去執(zhí)行提交給線程池的任務(wù)的代碼。
這里我們略去其中不相關(guān)的邏輯,重點關(guān)注第19行到第32行的邏輯,其中第23行是真正開始執(zhí)行提交給線程池的任務(wù),那么第20行是干什么的呢?其實就是在執(zhí)行提交給線程池的任務(wù)之前可以做一些前置工作,同樣的,我們看到第31行,這個是在執(zhí)行完提交的任務(wù)之后,可以做一些后置工作。
beforeExecute這個我們暫且不管,重點關(guān)注下afterExecute這個方法。我們可以看到,在執(zhí)行任務(wù)過程中,一旦拋出任何類型的異常,都會提交給afterExecute這個方法,然而查看線程池的源代碼我們可以發(fā)現(xiàn),默認(rèn)的afterExecute是個空實現(xiàn),因此,我們有必要繼承ThreadPoolExecutor去實現(xiàn)這個afterExecute方法。
看源碼我們可以發(fā)現(xiàn)這個afterExecute方法是protected類型的,從官方注釋上也可以看到,這個方法就是推薦子類去實現(xiàn)的。
當(dāng)然,這個方法不能隨意去實現(xiàn),需要遵循一定的步驟,具體的官方注釋也有講,這里摘抄如下:
* <pre> {@code * class ExtendedExecutor extends ThreadPoolExecutor { * // ... * protected void afterExecute(Runnable r, Throwable t) { * super.afterExecute(r, t); * if (t == null && r instanceof Future<?>) { * try { * Object result = ((Future<?>) r).get(); * } catch (CancellationException ce) { * t = ce; * } catch (ExecutionException ee) { * t = ee.getCause(); * } catch (InterruptedException ie) { * Thread.currentThread().interrupt(); // ignore/reset * } * } * if (t != null) * System.out.println(t); * } * }}</pre>
那么通過這種方式,就可以將原先可能被線程池吞掉的異常成功捕獲到,從而便于排查問題。
但是這里還有個小問題,我們注意到在runWorker方法中,執(zhí)行task.run();語句之后,各種類型的異常都被拋出了,那這些被拋出的異常去了哪里?事實上這里的異常對象最終會被傳入到Thread的dispatchUncaughtException方法中,源碼如下:
private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
可以看到它會去獲取UncaughtExceptionHandler的實現(xiàn)類,然后調(diào)用其中的uncaughtException方法,這也就回到了我們上一小節(jié)所分析的UncaughtExceptionHandler實現(xiàn)的具體邏輯。那么為了拿到最原始的異常對象,除了實現(xiàn)UncaughtExceptionHandler接口之外,也可以考慮實現(xiàn)afterExecute方法。
通過submit提交任務(wù)到線程池
這個同樣很簡單,我們還是先回到submit方法的源碼:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
這里的execute方法調(diào)用的是ThreadPoolExecutor中的execute方法,執(zhí)行邏輯跟通過execute提交任務(wù)到線程池是一樣的。我們先重點關(guān)注這里的newTaskFor方法,其源碼如下:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
可以看到提交的Callable對象用FutureTask封裝起來了。我們知道最終會執(zhí)行到上述runWorker這個方法中,并且最核心的執(zhí)行邏輯就是task.run();這行代碼。我們知道這里的task其實是FutureTask類型,因此我們有必要看一下FutureTask中的run方法的實現(xiàn):
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
可以看到這其中跟異常相關(guān)的最關(guān)鍵的代碼就在第17行,也就是setException(ex);這個地方。我們看一下這個地方的實現(xiàn):
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
這里最關(guān)鍵的地方就是將異常對象賦值給了outcome,outcome是FutureTask中的成員變量,我們通過調(diào)用submit方法,拿到一個Future對象之后,再調(diào)用它的get方法,其中最核心的方法就是report方法,下面給出每個方法的源碼:
首先是get方法:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
可以看到最終調(diào)用了report方法,其源碼如下:
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
上面是一些狀態(tài)判斷,如果當(dāng)前任務(wù)不是正常執(zhí)行完畢,或者被取消的話,那么這里的x其實就是原始的異常對象,可以看到會被ExecutionException包裝。因此在你調(diào)用get方法時,可能會拋出ExecutionException異常,那么調(diào)用它的getCause方法就可以拿到最原始的異常對象了。
綜上所述,針對提交給線程池的任務(wù)可能會拋出異常這一問題,主要有以下兩種處理思路:
在提交的任務(wù)當(dāng)中自行try...catch,但這里有個不好的地方就是如果你會提交多種類型的任務(wù)到線程池中,每種類型的任務(wù)都需要自行將異常try...catch住,比較繁瑣。而且如果你只是catch(Exception e),可能依然會漏掉一些包括Error類型的異常,那為了保險起見,可以考慮catch(Throwable t)。
自行實現(xiàn)線程池的afterExecute方法,或者實現(xiàn)Thread的UncaughtExceptionHandler接口。
下面給出我個人創(chuàng)建線程池的一個示例,供大家參考:
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, 60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder() .setThreadFactory(new ThreadFactory() { private int count = 0; private String prefix = "StatisticsTask"; @Override public Thread newThread(Runnable r) { return new Thread(r, prefix + "-" + count++); } }).setUncaughtExceptionHandler((t, e) -> { String threadName = t.getName(); logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e); }).build(), (r, executor) -> { if (!executor.isShutdown()) { logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! "); Uninterruptibles.putUninterruptibly(executor.getQueue(), r); } }) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; future.get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) { logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t); } } }; statisticsThreadPool.prestartAllCoreThreads();
我們知道任務(wù)一般有兩種:CPU密集型和IO密集型。那么面對CPU密集型的任務(wù),線程數(shù)不宜過多,一般選擇CPU核心數(shù)+1或者核心數(shù)的2倍是比較合理的一個值。因此我們可以考慮將corePoolSize設(shè)置為CPU核心數(shù)+1,maxPoolSize設(shè)置為核心數(shù)的2倍。
同樣的,面對IO密集型任務(wù)時,我們可以考慮以核心數(shù)乘以4倍作為核心線程數(shù),然后核心數(shù)乘以5倍作為最大線程數(shù)的方式去設(shè)置線程數(shù),這樣的設(shè)置會比直接拍腦袋設(shè)置一個值會更合理一些。
當(dāng)然總的線程數(shù)不宜過多,控制在100個線程以內(nèi)比較合理,否則線程數(shù)過多可能會導(dǎo)致頻繁地上下文切換,導(dǎo)致系統(tǒng)性能反不如前。
說到如何正確去關(guān)閉一個線程池,這里面也有點講究。為了實現(xiàn)優(yōu)雅停機的目標(biāo),我們應(yīng)當(dāng)先調(diào)用shutdown方法,調(diào)用這個方法也就意味著,這個線程池不會再接收任何新的任務(wù),但是已經(jīng)提交的任務(wù)還會繼續(xù)執(zhí)行,包括隊列中的。所以,之后你還應(yīng)當(dāng)調(diào)用awaitTermination方法,這個方法可以設(shè)定線程池在關(guān)閉之前的最大超時時間,如果在超時時間結(jié)束之前線程池能夠正常關(guān)閉,這個方法會返回true,否則,一旦超時,就會返回false。通常來說我們不可能無限制地等待下去,因此需要我們事先預(yù)估一個合理的超時時間,然后去使用這個方法。
如果awaitTermination方法返回false,你又希望盡可能在線程池關(guān)閉之后再做其他資源回收工作,可以考慮再調(diào)用一下shutdownNow方法,此時隊列中所有尚未被處理的任務(wù)都會被丟棄,同時會設(shè)置線程池中每個線程的中斷標(biāo)志位。shutdownNow并不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務(wù)能夠正確響應(yīng)中斷。到了這一步,可以考慮繼續(xù)調(diào)用awaitTermination方法,或者直接放棄,去做接下來要做的事情。
大家可能有留意到,我在創(chuàng)建線程池的時候,還調(diào)用了這個方法:prestartAllCoreThreads。這個方法有什么作用呢?我們知道一個線程池創(chuàng)建出來之后,在沒有給它提交任何任務(wù)之前,這個線程池中的線程數(shù)為0。有時候我們事先知道會有很多任務(wù)會提交給這個線程池,但是等它一個個去創(chuàng)建新線程開銷太大,影響系統(tǒng)性能,因此可以考慮在創(chuàng)建線程池的時候就將所有的核心線程全部一次性創(chuàng)建完畢,這樣系統(tǒng)起來之后就可以直接使用了。
其實線程池中還提供了其他一些比較有意思的方法。比如我們現(xiàn)在設(shè)想一個場景,當(dāng)一個線程池負(fù)載很高,快要撐爆導(dǎo)致觸發(fā)拒絕策略時,有沒有什么辦法可以緩解這一問題?其實是有的,因為線程池提供了設(shè)置核心線程數(shù)和最大線程數(shù)的方法,它們分別是setCorePoolSize方法和setMaximumPoolSize方法。是的,線程池創(chuàng)建完畢之后也是可以更改其線程數(shù)的!因此,面對線程池高負(fù)荷運行的情況,我們可以這么處理:
起一個定時輪詢線程(守護類型),定時檢測線程池中的線程數(shù),具體來說就是調(diào)用getActiveCount方法。
當(dāng)發(fā)現(xiàn)線程數(shù)超過了核心線程數(shù)大小時,可以考慮將CorePoolSize和MaximumPoolSize的數(shù)值同時乘以2,當(dāng)然這里不建議設(shè)置很大的線程數(shù),因為并不是線程越多越好的,可以考慮設(shè)置一個上限值,比如50、100之類的。
同時,去獲取隊列中的任務(wù)數(shù),具體來說是調(diào)用getQueue方法再調(diào)用size方法。當(dāng)隊列中的任務(wù)數(shù)少于隊列大小的二分之一時,我們可以認(rèn)為現(xiàn)在線程池的負(fù)載沒有那么高了,因此可以考慮在線程池先前有擴容過的情況下,將CorePoolSize和MaximumPoolSize還原回去,也就是除以2。
具體來說如下圖:
以上是我個人建議的一種使用線程池的方式。
線程池并非在任何情況下都是性能最優(yōu)的方案。如果是一個追求極致性能的場景,可以考慮使用Disruptor,這是一個高性能隊列。排除Disruptor不談,單純基于JDK的話會不會有更好的方案?答案是有的。
我們知道在一個線程池中,多個線程是共用一個隊列的,因此在任務(wù)很多的情況下,需要對這個隊列進行頻繁讀寫,為了防止沖突因此需要加鎖。事實上在閱讀線程池源代碼的時候就可以發(fā)現(xiàn),里面充斥著各種加鎖的代碼,那有沒有更好的實現(xiàn)方式呢?
其實我們可以考慮創(chuàng)建一個由單線程線程池構(gòu)成的列表,每個線程池都使用有界隊列這種方式去實現(xiàn)多線程。這么做的好處是,每個線程池中的隊列都只會被一個線程去操作,這樣就沒有競爭的問題。
其實這種用空間換時間的思路借鑒了Netty中EventLoop的實現(xiàn)機制。試想,如果線程池的性能真的有那么好,為什么Netty不用呢?
任何情況下都不應(yīng)該使用可伸縮線程池(線程的創(chuàng)建和銷毀開銷是很大的)。
任何情況下都不應(yīng)該使用無界隊列,單測除外。有界隊列常用的有ArrayBlockingQueue和LinkedBlockingQueue,前者基于數(shù)組實現(xiàn),后者基于鏈表。從性能表現(xiàn)上來看,LinkedBlockingQueue的吞吐量更高但是性能并不穩(wěn)定,實際情況下應(yīng)當(dāng)使用哪一種建議自行測試之后決定。順便說一句,Executors的newFixedThreadPool采用的是LinkedBlockingQueue。
推薦自行實現(xiàn)RejectedExecutionHandler,JDK自帶的都不是很好用,你可以在里面實現(xiàn)自己的邏輯。如果需要一些特定的上下文信息,可以在Runnable實現(xiàn)類中添加一些自己的東西,這樣在RejectedExecutionHandler中就可以直接使用了。
這里其實指的是一種特殊情況,就是比如突然遇到了一股流量尖峰,導(dǎo)致線程池負(fù)載已經(jīng)非常高了,即快要觸發(fā)拒絕策略的時候,我們可以怎么做來盡量防止提交的任務(wù)丟失。一般來說當(dāng)遇到這種情況的時候,應(yīng)當(dāng)盡快觸發(fā)報警通知研發(fā)人員來處理。之后不管是限流也好,還是增加機器也好,甚至是上Kafka、Redis甚至是數(shù)據(jù)庫用來暫存任務(wù)數(shù)據(jù)也是可以的,但畢竟遠(yuǎn)水救不了近火,如果我們希望在正式解決這個問題之前,先盡可能地緩解,可以考慮怎么做呢?
首先可以考慮的就是我前面提到的動態(tài)增大線程池中的線程數(shù),但是假如已經(jīng)擴容過了,此時不應(yīng)繼續(xù)擴容,否則可能導(dǎo)致系統(tǒng)的吞吐量更低。在這種情況下,應(yīng)當(dāng)自行實現(xiàn)RejectedExecutionHandler,具體來說就是在實現(xiàn)類中,單獨開一個單線程的線程池,然后調(diào)用原線程池的getQueue方法的put方法,將塞不進去的任務(wù)再次嘗試塞進去。當(dāng)然在隊列滿的時候是塞不進去的,但那至少也只是阻塞了這個單獨的線程而已,并不影響主流程。
當(dāng)然,這種方案是治標(biāo)不治本的,面對流量激增這種場景其實業(yè)界有很多成熟的做法,只是單純從線程池的角度來看的話,這種方式不失為一種臨時有效的解決方案。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。