溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

線程池的原理與使用場(chǎng)景

發(fā)布時(shí)間:2020-06-28 19:51:15 來源:網(wǎng)絡(luò) 閱讀:974 作者:大傻來了 欄目:開發(fā)技術(shù)

1、線程池簡(jiǎn)介:
多線程技術(shù)主要解決處理器單元內(nèi)多個(gè)線程執(zhí)行的問題,它可以顯著減少處理器單元的閑置時(shí)間,增加處理器單元的吞吐能力。
假設(shè)一個(gè)服務(wù)器完成一項(xiàng)任務(wù)所需時(shí)間為:T1 創(chuàng)建線程時(shí)間,T2 在線程中執(zhí)行任務(wù)的時(shí)間,T3 銷毀線程時(shí)間。

如果:T1 + T3 遠(yuǎn)大于 T2,則可以采用線程池,以提高服務(wù)器性能。
            一個(gè)線程池包括以下四個(gè)基本組成部分:
            1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池,包括 創(chuàng)建線程池,銷毀線程池,添加新任務(wù);
            2、工作線程(PoolWorker):線程池中線程,在沒有任務(wù)時(shí)處于等待狀態(tài),可以循環(huán)的執(zhí)行任務(wù);
            3、任務(wù)接口(Task):每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的執(zhí)行,它主要規(guī)定了任務(wù)的入口,任務(wù)執(zhí)行完后的收尾工作,任務(wù)的執(zhí)行狀態(tài)等;
            4、任務(wù)隊(duì)列(taskQueue):用于存放沒有處理的任務(wù)。提供一種緩沖機(jī)制。

線程池技術(shù)正是關(guān)注如何縮短或調(diào)整T1,T3時(shí)間的技術(shù),從而提高服務(wù)器程序性能的。它把T1,T3分別安排在服務(wù)器程序的啟動(dòng)和結(jié)束的時(shí)間段或者一些空閑的時(shí)間段,這樣在服務(wù)器程序處理客戶請(qǐng)求時(shí),不會(huì)有T1,T3的開銷了。
線程池不僅調(diào)整T1,T3產(chǎn)生的時(shí)間段,而且它還顯著減少了創(chuàng)建線程的數(shù)目,看一個(gè)例子:
假設(shè)一個(gè)服務(wù)器一天要處理50000個(gè)請(qǐng)求,并且每個(gè)請(qǐng)求需要一個(gè)單獨(dú)的線程完成。在線程池中,線程數(shù)一般是固定的,所以產(chǎn)生線程總數(shù)不會(huì)超過線程池中線程的數(shù)目,而如果服務(wù)器不利用線程池來處理這些請(qǐng)求則線程總數(shù)為50000。一般線程池大小是遠(yuǎn)小于50000。所以利用線程池的服務(wù)器程序不會(huì)為了創(chuàng)建50000而在處理請(qǐng)求時(shí)浪費(fèi)時(shí)間,從而提高效率。

代碼實(shí)現(xiàn)中并沒有實(shí)現(xiàn)任務(wù)接口,而是把Runnable對(duì)象加入到線程池管理器(ThreadPool),然后剩下的事情就由線程池管理器(ThreadPool)來完成了

復(fù)制代碼
復(fù)制代碼
package mine.util.thread;

import java.util.LinkedList;  
import java.util.List;  

/** 
 * 線程池類,線程管理器:創(chuàng)建線程,執(zhí)行任務(wù),銷毀線程,獲取線程基本信息 
 */  
public final class ThreadPool {  
    // 線程池中默認(rèn)線程的個(gè)數(shù)為5  
    private static int worker_num = 5;  
    // 工作線程  
    private WorkThread[] workThrads;  
    // 未處理的任務(wù)  
    private static volatile int finished_task = 0;  
    // 任務(wù)隊(duì)列,作為一個(gè)緩沖,List線程不安全  
    private List<Runnable> taskQueue = new LinkedList<Runnable>();  
    private static ThreadPool threadPool;  

    // 創(chuàng)建具有默認(rèn)線程個(gè)數(shù)的線程池  
    private ThreadPool() {  
        this(5);  
    }  

    // 創(chuàng)建線程池,worker_num為線程池中工作線程的個(gè)數(shù)  
    private ThreadPool(int worker_num) {  
        ThreadPool.worker_num = worker_num;  
        workThrads = new WorkThread[worker_num];  
        for (int i = 0; i < worker_num; i++) {  
            workThrads[i] = new WorkThread();  
            workThrads[i].start();// 開啟線程池中的線程  
        }  
    }  

    // 單態(tài)模式,獲得一個(gè)默認(rèn)線程個(gè)數(shù)的線程池  
    public static ThreadPool getThreadPool() {  
        return getThreadPool(ThreadPool.worker_num);  
    }  

    // 單態(tài)模式,獲得一個(gè)指定線程個(gè)數(shù)的線程池,worker_num(>0)為線程池中工作線程的個(gè)數(shù)  
    // worker_num<=0創(chuàng)建默認(rèn)的工作線程個(gè)數(shù)  
    public static ThreadPool getThreadPool(int worker_num1) {  
        if (worker_num1 <= 0)  
            worker_num1 = ThreadPool.worker_num;  
        if (threadPool == null)  
            threadPool = new ThreadPool(worker_num1);  
        return threadPool;  
    }  

    // 執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器覺定  
    public void execute(Runnable task) {  
        synchronized (taskQueue) {  
            taskQueue.add(task);  
            taskQueue.notify();  
        }  
    }  

    // 批量執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器覺定  
    public void execute(Runnable[] task) {  
        synchronized (taskQueue) {  
            for (Runnable t : task)  
                taskQueue.add(t);  
            taskQueue.notify();  
        }  
    }  

    // 批量執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器覺定  
    public void execute(List<Runnable> task) {  
        synchronized (taskQueue) {  
            for (Runnable t : task)  
                taskQueue.add(t);  
            taskQueue.notify();  
        }  
    }  

    // 銷毀線程池,該方法保證在所有任務(wù)都完成的情況下才銷毀所有線程,否則等待任務(wù)完成才銷毀  
    public void destroy() {  
        while (!taskQueue.isEmpty()) {// 如果還有任務(wù)沒執(zhí)行完成,就先睡會(huì)吧  
            try {  
                Thread.sleep(10);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
        // 工作線程停止工作,且置為null  
        for (int i = 0; i < worker_num; i++) {  
            workThrads[i].stopWorker();  
            workThrads[i] = null;  
        }  
        threadPool=null;  
        taskQueue.clear();// 清空任務(wù)隊(duì)列  
    }  

    // 返回工作線程的個(gè)數(shù)  
    public int getWorkThreadNumber() {  
        return worker_num;  
    }  

    // 返回已完成任務(wù)的個(gè)數(shù),這里的已完成是只出了任務(wù)隊(duì)列的任務(wù)個(gè)數(shù),可能該任務(wù)并沒有實(shí)際執(zhí)行完成  
    public int getFinishedTasknumber() {  
        return finished_task;  
    }  

    // 返回任務(wù)隊(duì)列的長(zhǎng)度,即還沒處理的任務(wù)個(gè)數(shù)  
    public int getWaitTasknumber() {  
        return taskQueue.size();  
    }  

    // 覆蓋toString方法,返回線程池信息:工作線程個(gè)數(shù)和已完成任務(wù)個(gè)數(shù)  
    @Override  
    public String toString() {  
        return "WorkThread number:" + worker_num + "  finished task number:"  
                + finished_task + "  wait task number:" + getWaitTasknumber();  
    }  

    /** 
     * 內(nèi)部類,工作線程 
     */  
    private class WorkThread extends Thread {  
        // 該工作線程是否有效,用于結(jié)束該工作線程  
        private boolean isRunning = true;  

        /* 
         * 關(guān)鍵所在啊,如果任務(wù)隊(duì)列不空,則取出任務(wù)執(zhí)行,若任務(wù)隊(duì)列空,則等待 
         */  
        @Override  
        public void run() {  
            Runnable r = null;  
            while (isRunning) {// 注意,若線程無效則自然結(jié)束run方法,該線程就沒用了  
                synchronized (taskQueue) {  
                    while (isRunning && taskQueue.isEmpty()) {// 隊(duì)列為空  
                        try {  
                            taskQueue.wait(20);  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                    }  
                    if (!taskQueue.isEmpty())  
                        r = taskQueue.remove(0);// 取出任務(wù)  
                }  
                if (r != null) {  
                    r.run();// 執(zhí)行任務(wù)  
                }  
                finished_task++;  
                r = null;  
            }  
        }  

        // 停止工作,讓該線程自然執(zhí)行完run方法,自然結(jié)束  
        public void stopWorker() {  
            isRunning = false;  
        }  
    }  
}  

復(fù)制代碼
復(fù)制代碼

復(fù)制代碼
復(fù)制代碼
package mine.util.thread;

//測(cè)試線程池  
public class TestThreadPool {  
    public static void main(String[] args) {  
        // 創(chuàng)建3個(gè)線程的線程池  
        ThreadPool t = ThreadPool.getThreadPool(3);  
        t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
        t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
        System.out.println(t);  
        t.destroy();// 所有線程都執(zhí)行完成才destory  
        System.out.println(t);  
    }  

    // 任務(wù)類  
    static class Task implements Runnable {  
        private static volatile int i = 1;  

        @Override  
        public void run() {// 執(zhí)行任務(wù)  
            System.out.println("任務(wù) " + (i++) + " 完成");  
        }  
    }  
}  

復(fù)制代碼
復(fù)制代碼
運(yùn)行結(jié)果:

WorkThread number:3 finished task number:0 wait task number:6
任務(wù) 1 完成
任務(wù) 2 完成
任務(wù) 3 完成
任務(wù) 4 完成
任務(wù) 5 完成
任務(wù) 6 完成
WorkThread number:3 finished task number:6 wait task number:0

分析:由于并沒有任務(wù)接口,傳入的可以是自定義的任何任務(wù),所以線程池并不能準(zhǔn)確的判斷該任務(wù)是否真正的已經(jīng)完成(真正完成該任務(wù)是這個(gè)任務(wù)的run方法執(zhí)行完畢),只能知道該任務(wù)已經(jīng)出了任務(wù)隊(duì)列,正在執(zhí)行或者已經(jīng)完成。

2、Java類庫中提供的線程池簡(jiǎn)介:

 java提供的線程池更加強(qiáng)大,相信理解線程池的工作原理,看類庫中的線程池就不會(huì)感到陌生了。

文章2:

Java線程池使用說明

一簡(jiǎn)介
線程的使用在java中占有極其重要的地位,在jdk1.4極其之前的jdk版本中,關(guān)于線程池的使用是極其簡(jiǎn)陋的。在jdk1.5之后這一情況有了很大的改觀。Jdk1.5之后加入了java.util.concurrent包,這個(gè)包中主要介紹java中線程以及線程池的使用。為我們?cè)陂_發(fā)中處理線程的問題提供了非常大的幫助。

二:線程池
線程池的作用:

線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。
根據(jù)系統(tǒng)的環(huán)境情況,可以自動(dòng)或手動(dòng)設(shè)置線程數(shù)量,達(dá)到運(yùn)行的最佳效果;少了浪費(fèi)了系統(tǒng)資源,多了造成系統(tǒng)擁擠效率不高。用線程池控制線程數(shù)量,其他線程排隊(duì)等候。一個(gè)任務(wù)執(zhí)行完畢,再從隊(duì)列的中取最前面的任務(wù)開始執(zhí)行。若隊(duì)列中沒有等待進(jìn)程,線程池的這一資源處于等待。當(dāng)一個(gè)新任務(wù)需要運(yùn)行時(shí),如果線程池中有等待的工作線程,就可以開始運(yùn)行了;否則進(jìn)入等待隊(duì)列。

為什么要用線程池:

1.減少了創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)。

2.可以根據(jù)系統(tǒng)的承受能力,調(diào)整線程池中工作線線程的數(shù)目,防止因?yàn)橄倪^多的內(nèi)存,而把服務(wù)器累趴下(每個(gè)線程需要大約1MB內(nèi)存,線程開的越多,消耗的內(nèi)存也就越大,最后死機(jī))。

Java里面線程池的頂級(jí)接口是Executor,但是嚴(yán)格意義上講Executor并不是一個(gè)線程池,而只是一個(gè)執(zhí)行線程的工具。真正的線程池接口是ExecutorService。

比較重要的幾個(gè)類:

ExecutorService

真正的線程池接口。

ScheduledExecutorService

能和Timer/TimerTask類似,解決那些需要任務(wù)重復(fù)執(zhí)行的問題。

ThreadPoolExecutor

ExecutorService的默認(rèn)實(shí)現(xiàn)。

ScheduledThreadPoolExecutor

繼承ThreadPoolExecutor的ScheduledExecutorService接口實(shí)現(xiàn),周期性任務(wù)調(diào)度的類實(shí)現(xiàn)。

要配置一個(gè)線程池是比較復(fù)雜的,尤其是對(duì)于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優(yōu)的,因此在Executors類里面提供了一些靜態(tài)工廠,生成一些常用的線程池。

  1. newSingleThreadExecutor

創(chuàng)建一個(gè)單線程的線程池。這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。

2.newFixedThreadPool

創(chuàng)建固定大小的線程池。每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。

  1. newCachedThreadPool

創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,

那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。

4.newScheduledThreadPool

創(chuàng)建一個(gè)大小無限的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。

實(shí)例

1:newSingleThreadExecutor

MyThread.java

publicclassMyThread extends Thread {

@Override

publicvoid run() {

    System.out.println(Thread.currentThread().getName() + "正在執(zhí)行。。。");

}

}

TestSingleThreadExecutor.java

publicclassTestSingleThreadExecutor {

publicstaticvoid main(String[] args) {

    //創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池

    ExecutorService pool = Executors. newSingleThreadExecutor();

    //創(chuàng)建實(shí)現(xiàn)了Runnable接口對(duì)象,Thread對(duì)象當(dāng)然也實(shí)現(xiàn)了Runnable接口

    Thread t1 = new MyThread();

    Thread t2 = new MyThread();

    Thread t3 = new MyThread();

    Thread t4 = new MyThread();

    Thread t5 = new MyThread();

    //將線程放入池中進(jìn)行執(zhí)行

    pool.execute(t1);

    pool.execute(t2);

    pool.execute(t3);

    pool.execute(t4);

    pool.execute(t5);

    //關(guān)閉線程池

    pool.shutdown();

}

}

輸出結(jié)果

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

2newFixedThreadPool

TestFixedThreadPool.Java

publicclass TestFixedThreadPool {

publicstaticvoid main(String[] args) {

    //創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池

    ExecutorService pool = Executors.newFixedThreadPool(2);

    //創(chuàng)建實(shí)現(xiàn)了Runnable接口對(duì)象,Thread對(duì)象當(dāng)然也實(shí)現(xiàn)了Runnable接口

    Thread t1 = new MyThread();

    Thread t2 = new MyThread();

    Thread t3 = new MyThread();

    Thread t4 = new MyThread();

    Thread t5 = new MyThread();

    //將線程放入池中進(jìn)行執(zhí)行

    pool.execute(t1);

    pool.execute(t2);

    pool.execute(t3);

    pool.execute(t4);

    pool.execute(t5);

    //關(guān)閉線程池

    pool.shutdown();

}

}

輸出結(jié)果

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-2正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-2正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

3 newCachedThreadPool

TestCachedThreadPool.java

publicclass TestCachedThreadPool {

publicstaticvoid main(String[] args) {

    //創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池

    ExecutorService pool = Executors.newCachedThreadPool();

    //創(chuàng)建實(shí)現(xiàn)了Runnable接口對(duì)象,Thread對(duì)象當(dāng)然也實(shí)現(xiàn)了Runnable接口

    Thread t1 = new MyThread();

    Thread t2 = new MyThread();

    Thread t3 = new MyThread();

    Thread t4 = new MyThread();

    Thread t5 = new MyThread();

    //將線程放入池中進(jìn)行執(zhí)行

    pool.execute(t1);

    pool.execute(t2);

    pool.execute(t3);

    pool.execute(t4);

    pool.execute(t5);

    //關(guān)閉線程池

    pool.shutdown();

}

}

輸出結(jié)果:

pool-1-thread-2正在執(zhí)行。。。

pool-1-thread-4正在執(zhí)行。。。

pool-1-thread-3正在執(zhí)行。。。

pool-1-thread-1正在執(zhí)行。。。

pool-1-thread-5正在執(zhí)行。。。

4newScheduledThreadPool

TestScheduledThreadPoolExecutor.java

publicclass TestScheduledThreadPoolExecutor {

publicstaticvoid main(String[] args) {

    ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);

    exec.scheduleAtFixedRate(new Runnable() {//每隔一段時(shí)間就觸發(fā)異常

                  @Override

                  publicvoid run() {

                       //throw new RuntimeException();

                       System.out.println("================");

                  }

              }, 1000, 5000, TimeUnit.MILLISECONDS);

    exec.scheduleAtFixedRate(new Runnable() {//每隔一段時(shí)間打印系統(tǒng)時(shí)間,證明兩者是互不影響的

                  @Override

                  publicvoid run() {

                       System.out.println(System.nanoTime());

                  }

              }, 1000, 2000, TimeUnit.MILLISECONDS);

}

}

輸出結(jié)果

================

8384644549516

8386643829034

8388643830710

================

8390643851383

8392643879319

8400643939383

三:ThreadPoolExecutor詳解
ThreadPoolExecutor的完整構(gòu)造方法的簽名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .

corePoolSize - 池中所保存的線程數(shù),包括空閑線程。

maximumPoolSize-池中允許的最大線程數(shù)。

keepAliveTime - 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。

unit - keepAliveTime 參數(shù)的時(shí)間單位。

workQueue - 執(zhí)行前用于保持任務(wù)的隊(duì)列。此隊(duì)列僅保持由 execute方法提交的 Runnable任務(wù)。

threadFactory - 執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠。

handler - 由于超出線程范圍和隊(duì)列容量而使執(zhí)行被阻塞時(shí)所使用的處理程序。

ThreadPoolExecutor是Executors類的底層實(shí)現(xiàn)。

在JDK幫助文檔中,有如此一段話:

“強(qiáng)烈建議程序員使用較為方便的Executors工廠方法Executors.newCachedThreadPool()(***線程池,可以進(jìn)行自動(dòng)線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個(gè)后臺(tái)線程)

它們均為大多數(shù)使用場(chǎng)景預(yù)定義了設(shè)置?!?/p>

下面介紹一下幾個(gè)類的源碼:

ExecutorService newFixedThreadPool (int nThreads):固定大小線程池。

可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實(shí)際上,后面會(huì)介紹,如果使用***queue的話maximumPoolSize參數(shù)是沒有意義的),keepAliveTime和unit的設(shè)值表名什么?-就是該實(shí)現(xiàn)不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個(gè)特點(diǎn),他是***的。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {

  2. return new ThreadPoolExecutor(nThreads, nThreads,

  3. 0L, TimeUnit.MILLISECONDS,

  4. new LinkedBlockingQueue<Runnable>());

  5. }

ExecutorService newSingleThreadExecutor():?jiǎn)尉€程

  1. public static ExecutorService newSingleThreadExecutor() {

  2. return new FinalizableDelegatedExecutorService

  3. (new ThreadPoolExecutor(1, 1,

  4. 0L, TimeUnit.MILLISECONDS,

  5. new LinkedBlockingQueue<Runnable>()));

  6. }

ExecutorService newCachedThreadPool():***線程池,可以進(jìn)行自動(dòng)線程回收

這個(gè)實(shí)現(xiàn)就有意思了。首先是***的線程池,所以我們可以發(fā)現(xiàn)maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue??赡軐?duì)于該BlockingQueue有些陌生,簡(jiǎn)單說:該QUEUE中,每個(gè)插入操作必須等待另一個(gè)線程的對(duì)應(yīng)移除操作。

  1. public static ExecutorService newCachedThreadPool() {

  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

  3. 60L, TimeUnit.SECONDS,

  4. new SynchronousQueue<Runnable>());

    }
    先從BlockingQueue<Runnable> workQueue這個(gè)入?yún)㈤_始說起。在JDK中,其實(shí)已經(jīng)說得很清楚了,一共有三種類型的queue。

所有BlockingQueue 都可用于傳輸和保持提交的任務(wù)。可以使用此隊(duì)列與池大小進(jìn)行交互:

如果運(yùn)行的線程少于 corePoolSize,則 Executor始終首選添加新的線程,而不進(jìn)行排隊(duì)。(如果當(dāng)前運(yùn)行的線程小于corePoolSize,則任務(wù)根本不會(huì)存放,添加到queue中,而是直接抄家伙(thread)開始運(yùn)行)

如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor始終首選將請(qǐng)求加入隊(duì)列,而不添加新的線程。

如果無法將請(qǐng)求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。

queue上的三種類型。

排隊(duì)有三種通用策略:

直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會(huì)構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。直接提交通常要求*** maximumPoolSizes 以避免拒絕新提交的任務(wù)。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許***線程具有增長(zhǎng)的可能性。

***隊(duì)列。使用***隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用***隊(duì)列;例如,在 Web頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許***線程具有增長(zhǎng)的可能性。

有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會(huì)降低吞吐量。

BlockingQueue的選擇。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是***的,也就是說他存數(shù)任務(wù)的能力是沒有限制的,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加。在這里不是核心線程便是新創(chuàng)建的線程,但是我們?cè)囅胍粯酉?,下面的?chǎng)景。

我們使用一下參數(shù)構(gòu)造ThreadPoolExecutor:

  1. new ThreadPoolExecutor(

  2. 2, 3, 30, TimeUnit.SECONDS,

  3. new SynchronousQueue<Runnable>(),

  4. new RecorderThreadFactory("CookieRecorderPool"),

        new ThreadPoolExecutor.CallerRunsPolicy());  

    new ThreadPoolExecutor(

    2, 3, 30, TimeUnit.SECONDS,

    new SynchronousQueue<Runnable>(),

    new RecorderThreadFactory("CookieRecorderPool"),

    new ThreadPoolExecutor.CallerRunsPolicy());

    當(dāng)核心線程已經(jīng)有2個(gè)正在運(yùn)行.

此時(shí)繼續(xù)來了一個(gè)任務(wù)(A),根據(jù)前面介紹的“如果運(yùn)行的線程等于或多于 corePoolSize,則Executor始終首選將請(qǐng)求加入隊(duì)列,而不添加新的線程?!?所以A被添加到queue中。
又來了一個(gè)任務(wù)(B),且核心2個(gè)線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由于使用的SynchronousQueue,所以一定無法加入進(jìn)去。
此時(shí)便滿足了上面提到的“如果無法將請(qǐng)求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出maximumPoolSize,在這種情況下,任務(wù)將被拒絕?!?,所以必然會(huì)新建一個(gè)線程來運(yùn)行這個(gè)任務(wù)。
暫時(shí)還可以,但是如果這三個(gè)任務(wù)都還沒完成,連續(xù)來了兩個(gè)任務(wù),第一個(gè)添加入queue中,后一個(gè)呢?queue中無法插入,而線程數(shù)達(dá)到了maximumPoolSize,所以只好執(zhí)行異常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是***的,這樣就可以避免上述情況發(fā)生(如果希望限制就直接使用有界隊(duì)列)。對(duì)于使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。

什么意思?如果你的任務(wù)A1,A2有內(nèi)部關(guān)聯(lián),A1需要先運(yùn)行,那么先提交A1,再提交A2,當(dāng)使用SynchronousQueue我們可以保證,A1必定先被執(zhí)行,在A1么有被執(zhí)行前,A2不可能添加入queue中。

例子二:使用***隊(duì)列策略,即LinkedBlockingQueue

這個(gè)就拿newFixedThreadPool來說,根據(jù)前文提到的規(guī)則:

如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。那么當(dāng)任務(wù)繼續(xù)增加,會(huì)發(fā)生什么呢?

如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請(qǐng)求加入隊(duì)列,而不添加新的線程。OK,此時(shí)任務(wù)變加入隊(duì)列之中了,那什么時(shí)候才會(huì)添加新線程呢?

如果無法將請(qǐng)求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。這里就很有意思了,可能會(huì)出現(xiàn)無法加入隊(duì)列嗎?不像SynchronousQueue那樣有其自身的特點(diǎn),對(duì)于***隊(duì)列來說,總是可以加入的(資源耗盡,當(dāng)然另當(dāng)別論)。換句說,永遠(yuǎn)也不會(huì)觸發(fā)產(chǎn)生新的線程!corePoolSize大小的線程數(shù)會(huì)一直運(yùn)行,忙完當(dāng)前的,就從隊(duì)列中拿任務(wù)開始運(yùn)行。所以要防止任務(wù)瘋長(zhǎng),比如任務(wù)運(yùn)行的實(shí)行比較長(zhǎng),而添加任務(wù)的速度遠(yuǎn)遠(yuǎn)超過處理任務(wù)的時(shí)間,而且還不斷增加,不一會(huì)兒就爆了。

例子三:有界隊(duì)列,使用ArrayBlockingQueue。

這個(gè)是最為復(fù)雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點(diǎn)便是可以防止資源耗盡的情況發(fā)生。

舉例來說,請(qǐng)看如下構(gòu)造方法:

  1. new ThreadPoolExecutor(

  2. 2, 4, 30, TimeUnit.SECONDS,

  3. new ArrayBlockingQueue<Runnable>(2),

  4. new RecorderThreadFactory("CookieRecorderPool"),

  5. new ThreadPoolExecutor.CallerRunsPolicy());

new ThreadPoolExecutor(

2, 4, 30, TimeUnit.SECONDS,

new ArrayBlockingQueue<Runnable>(2),

new RecorderThreadFactory("CookieRecorderPool"),

new ThreadPoolExecutor.CallerRunsPolicy());

假設(shè),所有的任務(wù)都永遠(yuǎn)無法執(zhí)行完。

對(duì)于首先來的A,B來說直接運(yùn)行,接下來,如果來了C,D,他們會(huì)被放到queue中,如果接下來再來E,F,則增加線程運(yùn)行E,F(xiàn)。但是如果再來任務(wù),隊(duì)列無法再接受了,線程數(shù)也到達(dá)最大的限制了,所以就會(huì)使用拒絕策略來處理。

keepAliveTime

jdk中的解釋是:當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。

有點(diǎn)拗口,其實(shí)這個(gè)不難理解,在使用了“池”的應(yīng)用中,大多都有類似的參數(shù)需要配置。比如數(shù)據(jù)庫連接池,DBCP中的maxIdle,minIdle參數(shù)。

什么意思?接著上面的解釋,后來向老板派來的工人始終是“借來的”,俗話說“有借就有還”,但這里的問題就是什么時(shí)候還了,如果借來的工人剛完成一個(gè)任務(wù)就還回去,后來發(fā)現(xiàn)任務(wù)還有,那豈不是又要去借?這一來一往,老板肯定頭也大死了。

合理的策略:既然借了,那就多借一會(huì)兒。直到“某一段”時(shí)間后,發(fā)現(xiàn)再也用不到這些工人時(shí),便可以還回去了。這里的某一段時(shí)間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。

RejectedExecutionHandler

另一種情況便是,即使向老板借了工人,但是任務(wù)還是繼續(xù)過來,還是忙不過來,這時(shí)整個(gè)隊(duì)伍只好拒絕接受了。

RejectedExecutionHandler接口提供了對(duì)于拒絕任務(wù)的處理的自定方法的機(jī)會(huì)。在ThreadPoolExecutor中已經(jīng)默認(rèn)包含了4中策略,因?yàn)樵创a非常簡(jiǎn)單,這里直接貼出來。

CallerRunsPolicy:線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  2. if (!e.isShutdown()) {

  3. r.run();

  4. }

  5. }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

       if (!e.isShutdown()) {

           r.run();

       }

   }

這個(gè)策略顯然不想放棄執(zhí)行任務(wù)。但是由于池中已經(jīng)沒有任何資源了,那么就直接使用調(diào)用該execute的線程本身來執(zhí)行。

AbortPolicy:處理程序遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  2. throw new RejectedExecutionException();

  3. }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

       throw new RejectedExecutionException();

   }

這種策略直接拋出異常,丟棄任務(wù)。

DiscardPolicy:不能執(zhí)行的任務(wù)將被刪除

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  2. }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

   }

這種策略和AbortPolicy幾乎一樣,也是丟棄任務(wù),只不過他不拋出異常。

DiscardOldestPolicy:如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  2. if (!e.isShutdown()) {

  3. e.getQueue().poll();

  4. e.execute(r);

  5. }

    }  

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

       if (!e.isShutdown()) {
    
           e.getQueue().poll();
    
           e.execute(r);
    
       }

    }

該策略就稍微復(fù)雜一些,在pool沒有關(guān)閉的前提下首先丟掉緩存在隊(duì)列中的最早的任務(wù),然后重新嘗試運(yùn)行該任務(wù)。這個(gè)策略需要適當(dāng)小心。

設(shè)想:如果其他線程都還在運(yùn)行,那么新來任務(wù)踢掉舊任務(wù),緩存在queue中,再來一個(gè)任務(wù)又會(huì)踢掉queue中最老任務(wù)。

總結(jié):

keepAliveTime和maximumPoolSize及BlockingQueue的類型均有關(guān)系。如果BlockingQueue是***的,那么永遠(yuǎn)不會(huì)觸發(fā)maximumPoolSize,自然keepAliveTime也就沒有了意義。

反之,如果核心數(shù)較小,有界BlockingQueue數(shù)值又較小,同時(shí)keepAliveTime又設(shè)的很小,如果任務(wù)頻繁,那么系統(tǒng)就會(huì)頻繁的申請(qǐng)回收線程。

public static ExecutorService newFixedThreadPool(int nThreads) {

   return new ThreadPoolExecutor(nThreads, nThreads,

                                 0L, TimeUnit.MILLISECONDS,

                                 new LinkedBlockingQueue<Runnable>());

}

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI