您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么連接JAVA高并發(fā)的線程和線程池”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么連接JAVA高并發(fā)的線程和線程池”吧!
1 JAVA線程的實(shí)現(xiàn)原理
java的線程是基于操作系統(tǒng)原生的線程模型(非用戶態(tài)),通過系統(tǒng)調(diào)用,將程序的線程交給系統(tǒng)調(diào)度執(zhí)行
java線程擁有屬于自己的虛擬機(jī)棧,當(dāng)JVM將棧、程序計(jì)數(shù)器、工作內(nèi)存等準(zhǔn)備好后,會(huì)分配一個(gè)系統(tǒng)原生線程來執(zhí)行。Java線程結(jié)束,原生線程隨之被回收
原生線程初始化完畢,會(huì)調(diào)Java線程的run方法。當(dāng)JAVA線程結(jié)束時(shí),則釋放原生線程和Java線程的所有資源
java方法的執(zhí)行對應(yīng)虛擬機(jī)棧的一個(gè)棧幀,用于存儲局部變量、操作數(shù)棧、動(dòng)態(tài)鏈接、方法出口等
2 JAVA線程的生命周期
New(新建狀態(tài)):用new關(guān)鍵字創(chuàng)建線程之后,該線程處于新建狀態(tài),此時(shí)僅由JVM為其分配內(nèi)存,并初始化其成員變量
Runnable(就緒狀態(tài)):當(dāng)調(diào)用Thread.start方法后,該線程處于就緒狀態(tài)。JVM會(huì)為其分配虛擬機(jī)棧等,然后等待系統(tǒng)調(diào)度
running(運(yùn)行狀態(tài)):處于就緒狀態(tài)的線程獲得CPU,執(zhí)行run方法時(shí),則線程處于運(yùn)行狀態(tài)
Terminated(線程死亡):線程正常run結(jié)束、或拋出一個(gè)未捕獲的Throwable、調(diào)用Thread.stop來結(jié)束該線程,都會(huì)導(dǎo)致線程的死亡
3 JAVA線程的幾種常用方法
「線程啟動(dòng)函數(shù)」
//Thread.java //調(diào)用start啟動(dòng)線程,進(jìn)入Runnable狀態(tài),等待系統(tǒng)調(diào)度執(zhí)行 public synchronized void start(){//synchronized同步執(zhí)行 if (threadStatus != 0) //0 代表new狀態(tài),非0則拋出錯(cuò)誤 throw new IllegalThreadStateException(); ... start0(); //本地方法方法 private native void start0() ... } //Running狀態(tài),新線程執(zhí)行的代碼方法,可被子類重寫 public void run() { if (target != null) { //target是Runnable,new Thread(Runnable)時(shí)傳入 target.run(); } }
「線程終止函數(shù)」
//Thread.java @Deprecated public final void stop(); //中斷線程 public void interrupt() //判斷的是當(dāng)前線程是否處于中斷狀態(tài) public static boolean interrupted()
用stop會(huì)強(qiáng)行終止線程,導(dǎo)致線程所持有的全部鎖突然釋放(不可控制),而被鎖突同步的邏輯遭到破壞。不建議使用
interrupt函數(shù)中斷線程,但它不一定會(huì)讓線程退出的。它比stop函數(shù)優(yōu)雅,可控制
當(dāng)線程處于調(diào)用sleep、wait的阻塞狀態(tài)時(shí),會(huì)拋出InterruptedException,代碼內(nèi)部捕獲,然后結(jié)束線程
線程處于非阻塞狀態(tài),則需要程序自己調(diào)用interrupted()判斷,再?zèng)Q定是否退出
其他常用方法
//Thread.java //阻塞等待其他線程 public final synchronized void join(final long millis) //暫時(shí)讓出CPU執(zhí)行 public static native void yield(); //休眠一段時(shí)間 public static native void sleep(long millis) throws InterruptedException;
start與run方法的區(qū)別
start是Thread類的方法,從線程的生命周期來看,start的執(zhí)行并不意味著新線程的執(zhí)行,而是讓JVM分配虛擬機(jī)棧,進(jìn)入Runnable狀態(tài),start的執(zhí)行還是在舊線程上
run則是新線程被系統(tǒng)調(diào)度,獲取CPU時(shí),執(zhí)行的方法,必須是繼承Thread或者是實(shí)現(xiàn)Runnable接口
Thread.sleep與Object.wait區(qū)別
Thread.sleep需要指定休眠時(shí)間,時(shí)間一到繼續(xù)運(yùn)行;和鎖機(jī)制無關(guān),不能加鎖也不用釋放鎖
Object.wait需要在synchronized中調(diào)用,否則報(bào)IllegalMonitorStateException錯(cuò)誤。wait方法會(huì)釋放鎖,需要調(diào)用相同鎖對象Object.notify來喚醒線程
4 線程池及其優(yōu)點(diǎn)
線程的每次使用時(shí)創(chuàng)建,結(jié)束再銷毀,是非常巨大的開銷。若用緩存的策略(線程池),暫存曾經(jīng)創(chuàng)建的線程,復(fù)用這些線程,可以減少程序的消耗,提高線程的利用率
降低資源消耗:重復(fù)利用線程可降低線程創(chuàng)建和銷毀造成的消耗
提高響應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),不需要等待線程創(chuàng)建就能立即執(zhí)行
提高線程的可管理性:使用線程池可以進(jìn)行統(tǒng)一的分配,監(jiān)控和調(diào)優(yōu)
5 JDK封裝的線程池
//ThreadPoolExecutor.java public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1 corePoolSize:核心線程數(shù),線程池維持的線程數(shù)量
2 maximumPoolSize:最大的線程數(shù),當(dāng)阻塞隊(duì)列不可再接受任務(wù)時(shí)且maximumPoolSize大于corePoolSize則會(huì)創(chuàng)建非核心線程來執(zhí)行。無任務(wù)執(zhí)行時(shí),會(huì)被銷毀
3 keepAliveTime:非核心線程在閑暇間的存活時(shí)間
4 TimeUnit:和keepAliveTime配合使用,表示keepAliveTime參數(shù)的時(shí)間單位
5 workQueue:正在執(zhí)行的任務(wù)數(shù)超過corePoolSize時(shí),任務(wù)的等待阻塞隊(duì)列
6 threadFactory:線程的創(chuàng)建工廠
7 handler:拒絕策略,線程數(shù)達(dá)到了maximumPoolSize,還有任務(wù)提交則使用拒絕策略處理
6 線程池原理之執(zhí)行流程
//ThreadPoolExecutor.java public void execute(Runnable command) { ... if (workerCountOf(c) < corePoolSize) { //plan A if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //plan B int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //addWorker(command, false) false代表可創(chuàng)建非核心線程執(zhí)行任務(wù) else if (!addWorker(command, false)) //plan C reject(command); // //plan D }
plan A:任務(wù)的execute,先判斷核心線程數(shù)量達(dá)到上限;否,則創(chuàng)建核心線程來執(zhí)行任務(wù);是,則執(zhí)行plan B
plan B:當(dāng)任務(wù)數(shù)大于核心數(shù)時(shí),任務(wù)被加入阻塞隊(duì)列,如果超過阻塞隊(duì)列的容量上限,執(zhí)行C
plan C: 阻塞隊(duì)列不能接受任務(wù)時(shí),且設(shè)置的maximumPoolSize大于corePoolSize,創(chuàng)建新的非核心線程執(zhí)行任務(wù)
plan D:當(dāng)plan A、B、C都無能為力時(shí),使用拒絕策略處理
7 阻塞隊(duì)列的簡單了解
隊(duì)列的阻塞插入:當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿
隊(duì)列的阻塞移除:當(dāng)隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?/p>
BlockingQueue提供的方法如下,其中put和take是阻塞操作
操作方法 | 拋出異常 | 阻塞線程 | 返回特殊值 | 超時(shí)退出 |
---|---|---|---|---|
插入元素 | add(e) | put(e) | offer(e) | offer(e, timeout, unit) |
移除元素 | remove() | take() | poll() | pull(timeout, unit) |
檢查 | element() | peek() | 無 | 無 |
「ArrayBlockingQueue」
ArrayBlockingQueue是用數(shù)組實(shí)現(xiàn)的「有界阻塞隊(duì)列」,必須指定隊(duì)列大小,先進(jìn)先出(FIFO)原則排隊(duì)
「LinkedBlockingQueue」
是用鏈表實(shí)現(xiàn)的「有界阻塞隊(duì)列」,如果構(gòu)造LinkedBlockingQueue時(shí)沒有指定大小,則默認(rèn)是Integer.MAX_VALUE,無限大
該隊(duì)列生產(chǎn)端和消費(fèi)端使用獨(dú)立的鎖來控制數(shù)據(jù)操作,以此來提高隊(duì)列的并發(fā)性
「PriorityBlockingQueue」
public PriorityBlockingQueue(int initialCapacity, Comparator comparator)基于數(shù)組,元素具有優(yōu)先級的「無界阻塞隊(duì)列」,優(yōu)先級由Comparator決定
PriorityBlockingQueue不會(huì)阻塞生產(chǎn)者,卻會(huì)在沒有可消費(fèi)的任務(wù)時(shí),阻塞消費(fèi)者
「DelayQueue」
支持延時(shí)獲取元素的「無界阻塞隊(duì)列」,基于PriorityQueue實(shí)現(xiàn)
元素必須實(shí)現(xiàn)Delayed接口,指定多久才能從隊(duì)列中獲取該元素。
可用于緩存系統(tǒng)的設(shè)計(jì)、定時(shí)任務(wù)調(diào)度等場景的使用
「SynchronousQueue」
SynchronousQueue是一種無緩沖的等待隊(duì)列,「添加一個(gè)元素必須等待被取走后才能繼續(xù)添加元素」
「LinkedTransferQueue」
由鏈表組成的TransferQueue「無界阻塞隊(duì)列」,相比其他隊(duì)列多了tryTransfer和transfer函數(shù)
transfer:當(dāng)前有消費(fèi)者正在等待元素,則直接傳給消費(fèi)者,「否則存入隊(duì)尾,并阻塞等待元素被消費(fèi)才返回」
tryTransfer:試探傳入的元素是否能直接傳給消費(fèi)者。如果沒消費(fèi)者等待消費(fèi)元素,元素加入隊(duì)尾,返回false
「LinkedBlockingDeque」
LinkedBlockingDeque是由鏈表構(gòu)建的雙向阻塞隊(duì)列,多了一端可操作入隊(duì)出隊(duì),少了一半的競爭,提高并發(fā)性
8 Executors的四種線程池淺析
「newFixedThreadPool」
指定核心線程數(shù),隊(duì)列是LinkedBlockingQueue無界阻塞隊(duì)列,永遠(yuǎn)不可能拒絕任務(wù);適合用在穩(wěn)定且固定的并發(fā)場景,建議線程設(shè)置為CPU核數(shù)
//Executors.java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
「newCachedThreadPool」
核心池大小為0,線程池最大線程數(shù)為最大整型,任務(wù)提交先加入到阻塞隊(duì)列中,非核心線程60s沒任務(wù)執(zhí)行則銷毀,阻塞隊(duì)列為SynchronousQueue。newCachedThreadPool會(huì)不斷的創(chuàng)建新線程來執(zhí)行任務(wù),不建議用
//Executors.java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
「newScheduledThreadPool」
ScheduledThreadPoolExecutor(STPE)其實(shí)是ThreadPoolExecutor的子類,可指定核心線程數(shù),隊(duì)列是STPE的內(nèi)部類DelayedWorkQueue?!窼TPE的好處是 A 延時(shí)可執(zhí)行任務(wù),B 可執(zhí)行帶有返回值的任務(wù)」
//Executors.java public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } //指定延遲執(zhí)行時(shí)間 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
「newSingleThreadExecutor」
和newFixedThreadPool構(gòu)造方法一致,不過線程數(shù)被設(shè)置為1了。SingleThreadExecutor比new個(gè)線程的好處是;「線程運(yùn)行時(shí)拋出異常的時(shí)候會(huì)有新的線程加入線程池完成接下來的任務(wù);阻塞隊(duì)列可以保證任務(wù)按FIFO執(zhí)行」
//Executors.java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); //無界隊(duì)列 }
9 如果優(yōu)雅地關(guān)閉線程池線程池
的關(guān)閉,就要先關(guān)閉池中的線程,上文第三點(diǎn)有提,暴力強(qiáng)制性stop線程會(huì)導(dǎo)致同步數(shù)據(jù)的不一致,因此我們要調(diào)用interrupt關(guān)閉線程
而線程池提供了兩個(gè)關(guān)閉方法,shutdownNow和shuwdown
shutdownNow:線程池拒接收新任務(wù),同時(shí)立馬關(guān)閉線程池(執(zhí)行中的會(huì)繼續(xù)執(zhí)行完),隊(duì)列的任務(wù)不再執(zhí)行,返回未執(zhí)行任務(wù)List
public List<Runnable> shutdownNow() { ... final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //加鎖 try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); //interrupt關(guān)閉線程 tasks = drainQueue(); //未執(zhí)行任務(wù) ...
shuwdown:線程池拒接收新任務(wù),同時(shí)等待線程池里的任務(wù)執(zhí)行完畢后關(guān)閉線程池,代碼和shutdownNow類似就不貼了
10 線程池為什么使用的是阻塞隊(duì)列
先考慮下為啥線程池的線程不會(huì)被釋放,它是怎么管理線程的生命周期的呢
//ThreadPoolExecutor.Worker.class final void runWorker(Worker w) { ... //工作線程會(huì)進(jìn)入一個(gè)循環(huán)獲取任務(wù)執(zhí)行的邏輯 while (task != null || (task = getTask()) != null) ... } private Runnable getTask(){ ... Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //線程會(huì)阻塞掛起等待任務(wù), ... }
可以看出,無任務(wù)執(zhí)行時(shí),線程池其實(shí)是利用阻塞隊(duì)列的take方法掛起,從而維持核心線程的存活
11 線程池的worker繼承AQS的意義
//Worker class,一個(gè)worker一個(gè)線程 Worker(Runnable firstTask) { //禁止新線程未開始就被中斷 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } final void runWorker(Worker w) { .... //對應(yīng)構(gòu)造Worker是的setState(-1) w.unlock(); // allow interrupts boolean completedAbruptly = true; .... w.lock(); //加鎖同步 .... try { ... task.run(); afterExecute(task, null); } finally { .... w.unlock(); //釋放鎖 }
worker繼承AQS的意義:A 禁止線程未開始就被中斷;B 同步runWorker方法的處理邏輯
12 拒絕策略
AbortPolicy 「丟棄任務(wù)并拋出RejectedExecutionException異?!?/p>
DiscardOldestPolicy 「丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)」
DiscardPolicy 「丟棄任務(wù),但是不拋出異?!?/p>
CallerRunsPolicy
?A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.?
如果任務(wù)被拒絕了,則由「提交任務(wù)的線程」執(zhí)行此任務(wù)
13 ForkJoinPool了解一波
ForkJoinPool和ThreadPoolExecutor不同,它適合執(zhí)行可以分解子任務(wù)的任務(wù),如樹的遍歷,歸并排序等一些遞歸場景
ForkJoinPool每個(gè)線程有一個(gè)對應(yīng)的雙端隊(duì)列deque;當(dāng)線程中的任務(wù)被fork分裂,分裂出來的子任務(wù)會(huì)放入線程自己的deque,減少線程的競爭
work-stealing工作竊取算法
當(dāng)線程執(zhí)行完自己deque的任務(wù),且其他線程deque還有多的任務(wù),則會(huì)啟動(dòng)竊取策略,從其他線程deque隊(duì)尾獲取線程
使用RecursiveTask實(shí)現(xiàn)forkjoin流程demo
public class ForkJoinPoolTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); for (int i = 0; i < 10; i++) { ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i)); System.out.println(task.get()); } } static class Fibonacci extends RecursiveTask<Integer> { int n; public Fibonacci(int n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) { return n; } Fibonacci fib1 = new Fibonacci(n - 1); fib1.fork(); //相當(dāng)于開啟新線程執(zhí)行 Fibonacci fib2 = new Fibonacci(n - 2); fib2.fork(); //相當(dāng)于開啟新線程執(zhí)行 return fib1.join() + fib2.join(); //合并阻塞返回結(jié)果 } } }
感謝各位的閱讀,以上就是“怎么連接JAVA高并發(fā)的線程和線程池”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么連接JAVA高并發(fā)的線程和線程池這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。