溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

掌握系列之并發(fā)編程-9.線程池

發(fā)布時間:2020-07-30 07:02:27 來源:網(wǎng)絡(luò) 閱讀:167 作者:學(xué)習(xí)Lr 欄目:編程語言

掌握高并發(fā)、高可用架構(gòu)

第二課 并發(fā)編程

從本課開始學(xué)習(xí)并發(fā)編程的內(nèi)容。主要介紹并發(fā)編程的基礎(chǔ)知識、鎖、內(nèi)存模型、線程池、各種并發(fā)容器的使用。

第九節(jié) 線程池

線程池 Executors Executor ExecutorService ThreadPoolExecutor

為何要使用線程池

有以下幾個原因:

  1. 線程池改進(jìn)了多線程應(yīng)用程序的響應(yīng)時間:由于線程池中的線程已經(jīng)準(zhǔn)備好且等待被分配任務(wù),可以直接拿來使用而不用新建一個線程
  2. 線程池節(jié)省了為每個短生命周期任務(wù)而創(chuàng)建一個完整的線程開銷,并且可以在任務(wù)完成后回收資源
  3. 線程池根據(jù)當(dāng)前在系統(tǒng)中運(yùn)行的進(jìn)程來優(yōu)化線程時間片
  4. 線程池運(yùn)行我們開啟多個任務(wù)而不用為每個線程單獨(dú)設(shè)置屬性
  5. 線程池允許我們?yōu)檎趫?zhí)行的任務(wù)的程序參數(shù)傳遞一個包含狀態(tài)信息的對象引用
  6. 線程池可以用來解決處理一個特定請求最大線程數(shù)量限制的問題

根本上說,我們使用線程池主要就是為了減少創(chuàng)建和銷毀線程的次數(shù),每個線程 都可以重復(fù)利用,可以執(zhí)行多個任務(wù),從而節(jié)省內(nèi)存(線程開的越多,消耗的內(nèi)存越大),提高了資源利用率。

線程池ThreadPoolExecutor

代碼起始是Executor,這是一個接口。

掌握系列之并發(fā)編程-9.線程池

抽象類AbstractExecutorService實(shí)現(xiàn)了接口ExecutorService,ExecutorService又繼承了Executor,AbstractExecutorService的默認(rèn)實(shí)現(xiàn)類是ThreadPoolExecutor,這個類就是線程池。

ThreadPoolExecutor有4個構(gòu)造函數(shù):

public ThreadPoolExecutor(int corePoolSize,
                          int maximunPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Running> workQueue);

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Running> workQueue,
                          ThreadFactory factory);

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Running> workQueue,
                          RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Running> workQueue,
                          ThreadFactory factory,
                          RejectedExecutionHandler handler);

corePoolSize:線程池中核心線程的最大數(shù)量

核心線程,線程池創(chuàng)建后,如果當(dāng)前線程總數(shù)小于corePoolSize,則新建的就是核心線程,如果超過corePoolSize,則新建的就是非核心線程。

核心線程默認(rèn)情況下會一直存活在線程池中,即使這個核心線程處于閑置狀態(tài)。

如果指定ThreadPoolExecutor的allowCoreThreadTimeOut為true,那么核心線程如果處于閑置狀態(tài)的話,超過一定時間(keepAliveTime)也會被銷毀。

maximumPoolSize: 線程池中線程總數(shù)的最大值

線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù)

keepAliveTime: 線程池中非核心線程閑置超時時長

默認(rèn)情況下,一個非核心線程,如果閑置時長超過該參數(shù)設(shè)置,就會被銷毀。如果設(shè)置參數(shù)allowCoreThreadTimeOut為true,則超時時長也會作用于核心線程。

TimeUnit unit: 時長單位

枚舉值,MILLSECONDS:毫秒;SECONDS:秒;MINUTS:分鐘;HOURS:小時;DAYS:天

workQueue: 阻塞隊(duì)列

線程池中的任務(wù)隊(duì)列,維護(hù)著等待執(zhí)行的Runnable對象。當(dāng)所有的核心線程都在干活時,新添加的任務(wù)會被添加到這個隊(duì)列中等待處理,如果這個隊(duì)列滿了,則會新建非核心線程來執(zhí)行任務(wù)。

public interface BlockingQueue<E> extends Queue<E> {
    // 將指定元素添加到隊(duì)列,成功返回true,否則拋出異常;如果是給限定了長度的隊(duì)列中添加元素,推薦offer
    boolean add(E e);
    // 將指定的元素添加的隊(duì)列,如果成功則返回true,否則返回false;元素不能為空,否則拋出NPE
    boolean offer(E e);
    // 將元素添加到隊(duì)列,如果隊(duì)列沒有多余空間,方法會一直阻塞,直到隊(duì)列有多余空間
    boolean put(E e) throws InterruptedException;
    // 將元素在指定的時間內(nèi)添加到隊(duì)列中,成功返回true,否則返回false
    boolean off(E e, long timeout, TimeUnit unit) throws InterruptedException;
    // 從隊(duì)列中獲取值,如果隊(duì)列沒有元素,方法會一直阻塞,直到隊(duì)列有值
    E take() throws InterruptedException;
    // 在給定的時間內(nèi)獲取隊(duì)列中的值,沒有獲取到會拋出異常
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    // 獲取隊(duì)列的剩余空間
    int remainingCapacity();
    // 移除指定的值
    boolean remove(Object o);
    // 判斷隊(duì)列是否包含值
    boolean contains(Object o);
    // 將隊(duì)列中的所有元素都移除,并設(shè)置到指定集合中
    int drainTo(Collection<? extends E> c);
}

一般來說,workQueue有以下四種隊(duì)列類型:

SynchronousQueue:同步隊(duì)列,這種隊(duì)列在接收到任務(wù)時,會直接提交給線程處理,而不會保留它。假如所有線程都在忙碌,則會新建線程來處理這個任務(wù)。所以為了防止出現(xiàn)線程數(shù)達(dá)到maximumPoolSize而不能新建線程的錯誤,當(dāng)使用這種隊(duì)列時,需要把maximumPoolSize指定為Integer.MAX_VALUE。

LinkedBlockingQueue:×××鏈表阻塞隊(duì)列,這種隊(duì)列接收到任務(wù)時,如果當(dāng)前線程數(shù)少于核心線程數(shù),則會新建核心線程來處理任務(wù);如果當(dāng)前線程數(shù)達(dá)到核心線程數(shù),則會保持到該隊(duì)列中;由于隊(duì)列×××,即所有超過核心線程數(shù)的任務(wù)都會存入隊(duì)列,所以會導(dǎo)致maximumPoolSize失效。

ArrayBlockingQueue:有界數(shù)組阻塞隊(duì)列,接收到任務(wù)時,如果當(dāng)前線程數(shù)少于核心線程數(shù),則會新建核心線程來處理任務(wù);如果當(dāng)前線程數(shù)達(dá)到核心線程數(shù),則會保持到該隊(duì)列中;如果隊(duì)列已滿,則新建線程來執(zhí)行任務(wù),如果隊(duì)列已滿,而且線程數(shù)已達(dá)到maximumPoolSize指定的數(shù)量,則會報錯。

DelayQueue:延遲隊(duì)列,隊(duì)列元素必須實(shí)現(xiàn)Delayed接口,接收到任務(wù)時,先入隊(duì)列,只有達(dá)到了指定時間,才會執(zhí)行任務(wù)。

ThreadFactory factory: 創(chuàng)建線程的方式

這是一個接口,通過調(diào)用它的方法: Thread newThread(Runnable r)來創(chuàng)建線程

RejectedExecutionHandler handler: 線程池?zé)o法創(chuàng)建線程時,如何拋出異常

一般是當(dāng)線程池中的線程數(shù)量已經(jīng)達(dá)到最大值,或者線程池已經(jīng)關(guān)閉時,會拋出一個RejectedExecutionException

既然線程池新添加了任務(wù),那么線程池是如何處理這些批量任務(wù)?

  1. 如果線程數(shù)量未達(dá)到corePoolSize,則新建一個線程(核心線程)執(zhí)行任務(wù)
  2. 如果線程數(shù)量達(dá)到了corePoolSize,則將任務(wù)移入隊(duì)列等待
  3. 如果隊(duì)列已滿,新建線程(非核心線程)執(zhí)行任務(wù)
  4. 如果隊(duì)列已滿,總線程數(shù)又達(dá)到了maximumPoolSize,就會由RejectedExecutionHandler拋出異常
四種線程池

1 newFixedThreadPool:定長的線程池,可控制線程的最大并發(fā)數(shù),超出的任務(wù)會在隊(duì)列中等待。

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThread) {
        return new ThreadPoolExecutor(nThread, nThread, 0L, TimeUnit.MILLSECONDS, new LinkedBlockingQueue<Runnable>());
    }
}

2 newCachedThreadPool:可緩存的線程池,如果線程池長度超過需要,可以靈活回收空閑線程,如無可回收線程,則會新建線程來處理任務(wù)。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

3 newScheduledThreadPool:定長任務(wù)線程池,支持定時和周期性任務(wù)執(zhí)行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

4 newSingleThreadExecutor:一個 單線程的線程池,只有一個線程來執(zhí)行任務(wù)。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLSECONDS, new LinkedBlockingQueue<Runnable>());
    );
}
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI