溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么連接JAVA高并發(fā)的線程和線程池

發(fā)布時(shí)間:2021-10-29 16:25:02 來源:億速云 閱讀:127 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“怎么連接JAVA高并發(fā)的線程和線程池”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么連接JAVA高并發(fā)的線程和線程池”吧!

1 JAVA線程的實(shí)現(xiàn)原理

怎么連接JAVA高并發(fā)的線程和線程池
  • java的線程是基于操作系統(tǒng)原生的線程模型(非用戶態(tài)),通過系統(tǒng)調(diào)用,將程序的線程交給系統(tǒng)調(diào)度執(zhí)行

  • java線程擁有屬于自己的虛擬機(jī)棧,當(dāng)JVM將棧、程序計(jì)數(shù)器、工作內(nèi)存等準(zhǔn)備好后,會(huì)分配一個(gè)系統(tǒng)原生線程來執(zhí)行。Java線程結(jié)束,原生線程隨之被回收

  • 原生線程初始化完畢,會(huì)調(diào)Java線程的run方法。當(dāng)JAVA線程結(jié)束時(shí),則釋放原生線程和Java線程的所有資源

  • java方法的執(zhí)行對應(yīng)虛擬機(jī)棧的一個(gè)棧幀,用于存儲局部變量、操作數(shù)棧、動(dòng)態(tài)鏈接、方法出口等

2 JAVA線程的生命周期

怎么連接JAVA高并發(fā)的線程和線程池
怎么連接JAVA高并發(fā)的線程和線程池
  • New(新建狀態(tài)):用new關(guān)鍵字創(chuàng)建線程之后,該線程處于新建狀態(tài),此時(shí)僅由JVM為其分配內(nèi)存,并初始化其成員變量

  • Runnable(就緒狀態(tài)):當(dāng)調(diào)用Thread.start方法后,該線程處于就緒狀態(tài)。JVM會(huì)為其分配虛擬機(jī)棧等,然后等待系統(tǒng)調(diào)度

  • running(運(yùn)行狀態(tài)):處于就緒狀態(tài)的線程獲得CPU,執(zhí)行run方法時(shí),則線程處于運(yùn)行狀態(tài)

  • Terminated(線程死亡):線程正常run結(jié)束、或拋出一個(gè)未捕獲的Throwable、調(diào)用Thread.stop來結(jié)束該線程,都會(huì)導(dǎo)致線程的死亡

3 JAVA線程的幾種常用方法

「線程啟動(dòng)函數(shù)」

//Thread.java //調(diào)用start啟動(dòng)線程,進(jìn)入Runnable狀態(tài),等待系統(tǒng)調(diào)度執(zhí)行 public synchronized void start(){//synchronized同步執(zhí)行     if (threadStatus != 0) //0 代表new狀態(tài),非0則拋出錯(cuò)誤             throw new IllegalThreadStateException();     ...     start0(); //本地方法方法 private native void start0()     ... } //Running狀態(tài),新線程執(zhí)行的代碼方法,可被子類重寫 public void run() {     if (target != null) {         //target是Runnable,new Thread(Runnable)時(shí)傳入         target.run();      } }

「線程終止函數(shù)」

//Thread.java @Deprecated public final void stop(); //中斷線程 public void interrupt() //判斷的是當(dāng)前線程是否處于中斷狀態(tài) public static boolean interrupted()
  • 用stop會(huì)強(qiáng)行終止線程,導(dǎo)致線程所持有的全部鎖突然釋放(不可控制),而被鎖突同步的邏輯遭到破壞。不建議使用

  • interrupt函數(shù)中斷線程,但它不一定會(huì)讓線程退出的。它比stop函數(shù)優(yōu)雅,可控制

    • 當(dāng)線程處于調(diào)用sleep、wait的阻塞狀態(tài)時(shí),會(huì)拋出InterruptedException,代碼內(nèi)部捕獲,然后結(jié)束線程

    • 線程處于非阻塞狀態(tài),則需要程序自己調(diào)用interrupted()判斷,再?zèng)Q定是否退出

  • 其他常用方法

//Thread.java //阻塞等待其他線程 public final synchronized void join(final long millis) //暫時(shí)讓出CPU執(zhí)行 public static native void yield(); //休眠一段時(shí)間 public static native void sleep(long millis) throws InterruptedException;

start與run方法的區(qū)別

  • start是Thread類的方法,從線程的生命周期來看,start的執(zhí)行并不意味著新線程的執(zhí)行,而是讓JVM分配虛擬機(jī)棧,進(jìn)入Runnable狀態(tài),start的執(zhí)行還是在舊線程上

  • run則是新線程被系統(tǒng)調(diào)度,獲取CPU時(shí),執(zhí)行的方法,必須是繼承Thread或者是實(shí)現(xiàn)Runnable接口

    • Thread.sleep與Object.wait區(qū)別

  • Thread.sleep需要指定休眠時(shí)間,時(shí)間一到繼續(xù)運(yùn)行;和鎖機(jī)制無關(guān),不能加鎖也不用釋放鎖

  • Object.wait需要在synchronized中調(diào)用,否則報(bào)IllegalMonitorStateException錯(cuò)誤。wait方法會(huì)釋放鎖,需要調(diào)用相同鎖對象Object.notify來喚醒線程

4  線程池及其優(yōu)點(diǎn)

  • 線程的每次使用時(shí)創(chuàng)建,結(jié)束再銷毀,是非常巨大的開銷。若用緩存的策略(線程池),暫存曾經(jīng)創(chuàng)建的線程,復(fù)用這些線程,可以減少程序的消耗,提高線程的利用率

  • 降低資源消耗:重復(fù)利用線程可降低線程創(chuàng)建和銷毀造成的消耗

  • 提高響應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),不需要等待線程創(chuàng)建就能立即執(zhí)行

  • 提高線程的可管理性:使用線程池可以進(jìn)行統(tǒng)一的分配,監(jiān)控和調(diào)優(yōu)

5 JDK封裝的線程池

怎么連接JAVA高并發(fā)的線程和線程池

//ThreadPoolExecutor.java public ThreadPoolExecutor(     int corePoolSize,      int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue<Runnable> workQueue,     ThreadFactory threadFactory,     RejectedExecutionHandler handler)
  • 1 corePoolSize:核心線程數(shù),線程池維持的線程數(shù)量

  • 2  maximumPoolSize:最大的線程數(shù),當(dāng)阻塞隊(duì)列不可再接受任務(wù)時(shí)且maximumPoolSize大于corePoolSize則會(huì)創(chuàng)建非核心線程來執(zhí)行。無任務(wù)執(zhí)行時(shí),會(huì)被銷毀

  • 3 keepAliveTime:非核心線程在閑暇間的存活時(shí)間

  • 4 TimeUnit:和keepAliveTime配合使用,表示keepAliveTime參數(shù)的時(shí)間單位

  • 5 workQueue:正在執(zhí)行的任務(wù)數(shù)超過corePoolSize時(shí),任務(wù)的等待阻塞隊(duì)列

  • 6 threadFactory:線程的創(chuàng)建工廠

  • 7 handler:拒絕策略,線程數(shù)達(dá)到了maximumPoolSize,還有任務(wù)提交則使用拒絕策略處理

6 線程池原理之執(zhí)行流程

怎么連接JAVA高并發(fā)的線程和線程池

//ThreadPoolExecutor.java public void execute(Runnable command) {     ...     if (workerCountOf(c) < corePoolSize) { //plan A         if (addWorker(command, true))               return;         c = ctl.get();     }     if (isRunning(c) && workQueue.offer(command)) { //plan B         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))             reject(command);         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     //addWorker(command, false) false代表可創(chuàng)建非核心線程執(zhí)行任務(wù)     else if (!addWorker(command, false)) //plan C         reject(command);    // //plan D }

plan A:任務(wù)的execute,先判斷核心線程數(shù)量達(dá)到上限;否,則創(chuàng)建核心線程來執(zhí)行任務(wù);是,則執(zhí)行plan B

plan B:當(dāng)任務(wù)數(shù)大于核心數(shù)時(shí),任務(wù)被加入阻塞隊(duì)列,如果超過阻塞隊(duì)列的容量上限,執(zhí)行C

plan C: 阻塞隊(duì)列不能接受任務(wù)時(shí),且設(shè)置的maximumPoolSize大于corePoolSize,創(chuàng)建新的非核心線程執(zhí)行任務(wù)

plan D:當(dāng)plan A、B、C都無能為力時(shí),使用拒絕策略處理

7 阻塞隊(duì)列的簡單了解

  • 隊(duì)列的阻塞插入:當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿

  • 隊(duì)列的阻塞移除:當(dāng)隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?/p>

  • BlockingQueue提供的方法如下,其中put和take是阻塞操作

操作方法拋出異常阻塞線程返回特殊值超時(shí)退出
插入元素add(e)put(e)offer(e)offer(e, timeout, unit)
移除元素remove()take()poll()pull(timeout, unit)
檢查element()peek()

「ArrayBlockingQueue」

  • ArrayBlockingQueue是用數(shù)組實(shí)現(xiàn)的「有界阻塞隊(duì)列」,必須指定隊(duì)列大小,先進(jìn)先出(FIFO)原則排隊(duì)

「LinkedBlockingQueue」

  • 是用鏈表實(shí)現(xiàn)的「有界阻塞隊(duì)列」,如果構(gòu)造LinkedBlockingQueue時(shí)沒有指定大小,則默認(rèn)是Integer.MAX_VALUE,無限大

  • 該隊(duì)列生產(chǎn)端和消費(fèi)端使用獨(dú)立的鎖來控制數(shù)據(jù)操作,以此來提高隊(duì)列的并發(fā)性

「PriorityBlockingQueue」

  • public PriorityBlockingQueue(int initialCapacity, Comparator  comparator)基于數(shù)組,元素具有優(yōu)先級的「無界阻塞隊(duì)列」,優(yōu)先級由Comparator決定

  • PriorityBlockingQueue不會(huì)阻塞生產(chǎn)者,卻會(huì)在沒有可消費(fèi)的任務(wù)時(shí),阻塞消費(fèi)者

「DelayQueue」

  • 支持延時(shí)獲取元素的「無界阻塞隊(duì)列」,基于PriorityQueue實(shí)現(xiàn)

  • 元素必須實(shí)現(xiàn)Delayed接口,指定多久才能從隊(duì)列中獲取該元素。

  • 可用于緩存系統(tǒng)的設(shè)計(jì)、定時(shí)任務(wù)調(diào)度等場景的使用

「SynchronousQueue」

  • SynchronousQueue是一種無緩沖的等待隊(duì)列,「添加一個(gè)元素必須等待被取走后才能繼續(xù)添加元素」

「LinkedTransferQueue」

  • 由鏈表組成的TransferQueue「無界阻塞隊(duì)列」,相比其他隊(duì)列多了tryTransfer和transfer函數(shù)

  • transfer:當(dāng)前有消費(fèi)者正在等待元素,則直接傳給消費(fèi)者,「否則存入隊(duì)尾,并阻塞等待元素被消費(fèi)才返回」

  • tryTransfer:試探傳入的元素是否能直接傳給消費(fèi)者。如果沒消費(fèi)者等待消費(fèi)元素,元素加入隊(duì)尾,返回false

「LinkedBlockingDeque」

  • LinkedBlockingDeque是由鏈表構(gòu)建的雙向阻塞隊(duì)列,多了一端可操作入隊(duì)出隊(duì),少了一半的競爭,提高并發(fā)性

8 Executors的四種線程池淺析

「newFixedThreadPool」

指定核心線程數(shù),隊(duì)列是LinkedBlockingQueue無界阻塞隊(duì)列,永遠(yuǎn)不可能拒絕任務(wù);適合用在穩(wěn)定且固定的并發(fā)場景,建議線程設(shè)置為CPU核數(shù)

//Executors.java public static ExecutorService newFixedThreadPool(int nThreads) {     return new ThreadPoolExecutor(nThreads, nThreads,                         0L, TimeUnit.MILLISECONDS,                         new LinkedBlockingQueue<Runnable>()); }

「newCachedThreadPool」

核心池大小為0,線程池最大線程數(shù)為最大整型,任務(wù)提交先加入到阻塞隊(duì)列中,非核心線程60s沒任務(wù)執(zhí)行則銷毀,阻塞隊(duì)列為SynchronousQueue。newCachedThreadPool會(huì)不斷的創(chuàng)建新線程來執(zhí)行任務(wù),不建議用

//Executors.java public static ExecutorService newCachedThreadPool() {     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                   60L, TimeUnit.SECONDS,                   new SynchronousQueue<Runnable>()); }

「newScheduledThreadPool」

ScheduledThreadPoolExecutor(STPE)其實(shí)是ThreadPoolExecutor的子類,可指定核心線程數(shù),隊(duì)列是STPE的內(nèi)部類DelayedWorkQueue?!窼TPE的好處是  A 延時(shí)可執(zhí)行任務(wù),B 可執(zhí)行帶有返回值的任務(wù)」

//Executors.java public ScheduledThreadPoolExecutor(int corePoolSize,                                    ThreadFactory threadFactory) {     super(corePoolSize, Integer.MAX_VALUE,           DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,           new DelayedWorkQueue(), threadFactory); } //指定延遲執(zhí)行時(shí)間     public <V> ScheduledFuture<V>  schedule(Callable<V> callable, long delay, TimeUnit unit)

「newSingleThreadExecutor」

和newFixedThreadPool構(gòu)造方法一致,不過線程數(shù)被設(shè)置為1了。SingleThreadExecutor比new個(gè)線程的好處是;「線程運(yùn)行時(shí)拋出異常的時(shí)候會(huì)有新的線程加入線程池完成接下來的任務(wù);阻塞隊(duì)列可以保證任務(wù)按FIFO執(zhí)行」

//Executors.java public static ExecutorService newSingleThreadExecutor() {     return new FinalizableDelegatedExecutorService         (new ThreadPoolExecutor(1, 1,                   0L, TimeUnit.MILLISECONDS,                   new LinkedBlockingQueue<Runnable>())); //無界隊(duì)列 }

9  如果優(yōu)雅地關(guān)閉線程池線程池

  • 的關(guān)閉,就要先關(guān)閉池中的線程,上文第三點(diǎn)有提,暴力強(qiáng)制性stop線程會(huì)導(dǎo)致同步數(shù)據(jù)的不一致,因此我們要調(diào)用interrupt關(guān)閉線程

  • 而線程池提供了兩個(gè)關(guān)閉方法,shutdownNow和shuwdown

  • shutdownNow:線程池拒接收新任務(wù),同時(shí)立馬關(guān)閉線程池(執(zhí)行中的會(huì)繼續(xù)執(zhí)行完),隊(duì)列的任務(wù)不再執(zhí)行,返回未執(zhí)行任務(wù)List

public List<Runnable> shutdownNow() {     ...     final ReentrantLock mainLock = this.mainLock;     mainLock.lock(); //加鎖     try {         checkShutdownAccess();         advanceRunState(STOP);         interruptWorkers(); //interrupt關(guān)閉線程         tasks = drainQueue(); //未執(zhí)行任務(wù)     ...
  • shuwdown:線程池拒接收新任務(wù),同時(shí)等待線程池里的任務(wù)執(zhí)行完畢后關(guān)閉線程池,代碼和shutdownNow類似就不貼了

10 線程池為什么使用的是阻塞隊(duì)列

先考慮下為啥線程池的線程不會(huì)被釋放,它是怎么管理線程的生命周期的呢

//ThreadPoolExecutor.Worker.class final void runWorker(Worker w) {     ...     //工作線程會(huì)進(jìn)入一個(gè)循環(huán)獲取任務(wù)執(zhí)行的邏輯     while (task != null || (task = getTask()) != null)     ... }  private Runnable getTask(){     ...     Runnable r = timed ?          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)          : workQueue.take(); //線程會(huì)阻塞掛起等待任務(wù),     ...     }

可以看出,無任務(wù)執(zhí)行時(shí),線程池其實(shí)是利用阻塞隊(duì)列的take方法掛起,從而維持核心線程的存活

11 線程池的worker繼承AQS的意義

//Worker class,一個(gè)worker一個(gè)線程 Worker(Runnable firstTask) {     //禁止新線程未開始就被中斷     setState(-1); // inhibit interrupts until runWorker     this.firstTask = firstTask;     this.thread = getThreadFactory().newThread(this); }  final void runWorker(Worker w) {     ....     //對應(yīng)構(gòu)造Worker是的setState(-1)     w.unlock(); // allow interrupts     boolean completedAbruptly = true;         ....         w.lock(); //加鎖同步         ....         try {             ...             task.run();             afterExecute(task, null);         } finally {             ....             w.unlock(); //釋放鎖         }

worker繼承AQS的意義:A 禁止線程未開始就被中斷;B 同步runWorker方法的處理邏輯

12 拒絕策略

  • AbortPolicy 「丟棄任務(wù)并拋出RejectedExecutionException異?!?/p>

  • DiscardOldestPolicy 「丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)」

  • DiscardPolicy 「丟棄任務(wù),但是不拋出異?!?/p>

  • CallerRunsPolicy

?A handler for rejected tasks that runs the rejected task directly in the  calling thread of the {@code execute} method, unless the executor has been shut  down, in which case the task is discarded.?

如果任務(wù)被拒絕了,則由「提交任務(wù)的線程」執(zhí)行此任務(wù)

13 ForkJoinPool了解一波

怎么連接JAVA高并發(fā)的線程和線程池
  • ForkJoinPool和ThreadPoolExecutor不同,它適合執(zhí)行可以分解子任務(wù)的任務(wù),如樹的遍歷,歸并排序等一些遞歸場景

怎么連接JAVA高并發(fā)的線程和線程池
怎么連接JAVA高并發(fā)的線程和線程池
  • ForkJoinPool每個(gè)線程有一個(gè)對應(yīng)的雙端隊(duì)列deque;當(dāng)線程中的任務(wù)被fork分裂,分裂出來的子任務(wù)會(huì)放入線程自己的deque,減少線程的競爭

work-stealing工作竊取算法

怎么連接JAVA高并發(fā)的線程和線程池

當(dāng)線程執(zhí)行完自己deque的任務(wù),且其他線程deque還有多的任務(wù),則會(huì)啟動(dòng)竊取策略,從其他線程deque隊(duì)尾獲取線程

  • 使用RecursiveTask實(shí)現(xiàn)forkjoin流程demo

public class ForkJoinPoolTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         ForkJoinPool forkJoinPool = new ForkJoinPool();         for (int i = 0; i < 10; i++) {             ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));             System.out.println(task.get());         }     }     static class Fibonacci extends RecursiveTask<Integer> {         int n;         public Fibonacci(int n) {  this.n = n;  }         @Override         protected Integer compute() {             if (n <= 1) { return n; }             Fibonacci fib1 = new Fibonacci(n - 1);             fib1.fork(); //相當(dāng)于開啟新線程執(zhí)行             Fibonacci fib2 = new Fibonacci(n - 2);             fib2.fork(); //相當(dāng)于開啟新線程執(zhí)行             return fib1.join() + fib2.join(); //合并阻塞返回結(jié)果         }     } }

感謝各位的閱讀,以上就是“怎么連接JAVA高并發(fā)的線程和線程池”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么連接JAVA高并發(fā)的線程和線程池這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI