溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ThreadPoolExecutor參數(shù)含義及源碼執(zhí)行流程是什么

發(fā)布時間:2022-11-09 09:31:02 來源:億速云 閱讀:139 作者:iii 欄目:開發(fā)技術(shù)

今天小編給大家分享一下ThreadPoolExecutor參數(shù)含義及源碼執(zhí)行流程是什么的相關(guān)知識點,內(nèi)容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

    典型回答

    ThreadPoolExecutor 的核心參數(shù)指的是它在構(gòu)建時需要傳遞的參數(shù),其構(gòu)造方法如下所示:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            // maximumPoolSize 必須大于 0,且必須大于 corePoolSize
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    第 1 個參數(shù):corePoolSize 表示線程池的常駐核心線程數(shù)。如果設(shè)置為 0,則表示在沒有任何任務(wù)時,銷毀線程池;如果大于 0,即使沒有任務(wù)時也會保證線程池的線程數(shù)量等于此值。但需要注意,此值如果設(shè)置的比較小,則會頻繁的創(chuàng)建和銷毀線程(創(chuàng)建和銷毀的原因會在本課時的下半部分講到);如果設(shè)置的比較大,則會浪費系統(tǒng)資源,所以開發(fā)者需要根據(jù)自己的實際業(yè)務(wù)來調(diào)整此值。

    第 2 個參數(shù):maximumPoolSize 表示線程池在任務(wù)最多時,最大可以創(chuàng)建的線程數(shù)。官方規(guī)定此值必須大于 0,也必須大于等于 corePoolSize,此值只有在任務(wù)比較多,且不能存放在任務(wù)隊列時,才會用到。

    第 3 個參數(shù):keepAliveTime 表示線程的存活時間,當線程池空閑時并且超過了此時間,多余的線程就會銷毀,直到線程池中的線程數(shù)量銷毀的等于 corePoolSize 為止,如果 maximumPoolSize 等于 corePoolSize,那么線程池在空閑的時候也不會銷毀任何線程。

    第 4 個參數(shù):unit 表示存活時間的單位,它是配合 keepAliveTime 參數(shù)共同使用的。

    第 5 個參數(shù):workQueue 表示線程池執(zhí)行的任務(wù)隊列,當線程池的所有線程都在處理任務(wù)時,如果來了新任務(wù)就會緩存到此任務(wù)隊列中排隊等待執(zhí)行。

    第 6 個參數(shù):threadFactory 表示線程的創(chuàng)建工廠,此參數(shù)一般用的比較少,我們通常在創(chuàng)建線程池時不指定此參數(shù),它會使用默認的線程創(chuàng)建工廠的方法來創(chuàng)建線程,源代碼如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        // Executors.defaultThreadFactory() 為默認的線程創(chuàng)建工廠
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    // 默認的線程創(chuàng)建工廠,需要實現(xiàn) ThreadFactory 接口
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        // 創(chuàng)建線程
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon()) 
                t.setDaemon(false); // 創(chuàng)建一個非守護線程
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY); // 線程優(yōu)先級設(shè)置為默認值
            return t;
        }
    }

    我們也可以自定義一個線程工廠,通過實現(xiàn) ThreadFactory 接口來完成,這樣就可以自定義線程的名稱或線程執(zhí)行的優(yōu)先級了。

    第 7 個參數(shù):RejectedExecutionHandler 表示指定線程池的拒絕策略,當線程池的任務(wù)已經(jīng)在緩存隊列 workQueue 中存儲滿了之后,并且不能創(chuàng)建新的線程來執(zhí)行此任務(wù)時,就會用到此拒絕策略,它屬于一種限流保護的機制。

    線程池的工作流程要從它的執(zhí)行方法 execute() 說起,源碼如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 當前工作的線程數(shù)小于核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            // 創(chuàng)建新的線程執(zhí)行此任務(wù)
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 檢查線程池是否處于運行狀態(tài),如果是則把任務(wù)添加到隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再出檢查線程池是否處于運行狀態(tài),防止在第一次校驗通過后線程池關(guān)閉
            // 如果是非運行狀態(tài),則將剛加入隊列的任務(wù)移除
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果線程池的線程數(shù)為 0 時(當 corePoolSize 設(shè)置為 0 時會發(fā)生)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false); // 新建線程執(zhí)行任務(wù)
        }
        // 核心線程都在忙且隊列都已爆滿,嘗試新啟動一個線程執(zhí)行失敗
        else if (!addWorker(command, false)) 
            // 執(zhí)行拒絕策略
            reject(command);
    }

    其中 addWorker(Runnable firstTask, boolean core) 方法的參數(shù)說明如下:

    • firstTask,線程應(yīng)首先運行的任務(wù),如果沒有則可以設(shè)置為 null;

    • core,判斷是否可以創(chuàng)建線程的閥值(最大值),如果等于 true 則表示使用 corePoolSize 作為閥值,false 則表示使用maximumPoolSize 作為閥值。

    考點分析

    線程池任務(wù)執(zhí)行的主要流程,可以參考以下流程圖:

    ThreadPoolExecutor參數(shù)含義及源碼執(zhí)行流程是什么

    與 ThreadPoolExecutor 相關(guān)的面試題還有以下幾個:

    • ThreadPoolExecutor 的執(zhí)行方法有幾種?它們有什么區(qū)別?

    • 什么是線程的拒絕策略?

    • 拒絕策略的分類有哪些?

    • 如何自定義拒絕策略?

    • ThreadPoolExecutor 能不能實現(xiàn)擴展?如何實現(xiàn)擴展?

    知識拓展

    execute() VS submit()

    execute() 和 submit() 都是用來執(zhí)行線程池任務(wù)的,它們最主要的區(qū)別是,submit() 方法可以接收線程池執(zhí)行的返回值,而 execute() 不能接收返回值。

    來看兩個方法的具體使用:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,
            TimeUnit.SECONDS, new LinkedBlockingQueue(20));
    // execute 使用
    executor.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Hello, execute.");
        }
    });
    // submit 使用
    Future<String> future = executor.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("Hello, submit.");
            return "Success";
        }
    });
    System.out.println(future.get());

    以上程序執(zhí)行結(jié)果如下:

    Hello, submit.
    Hello, execute.
    Success

    從以上結(jié)果可以看出 submit() 方法可以配合 Futrue 來接收線程執(zhí)行的返回值。它們的另一個區(qū)別是 execute() 方法屬于 Executor 接口的方法,而 submit() 方法則是屬于 ExecutorService 接口的方法,它們的繼承關(guān)系如下圖所示:

    ThreadPoolExecutor參數(shù)含義及源碼執(zhí)行流程是什么

    線程池的拒絕策略

    當線程池中的任務(wù)隊列已經(jīng)被存滿,再有任務(wù)添加時會先判斷當前線程池中的線程數(shù)是否大于等于線程池的最大值,如果是,則會觸發(fā)線程池的拒絕策略。

    Java 自帶的拒絕策略有 4 種:

    • AbortPolicy,終止策略,線程池會拋出異常并終止執(zhí)行,它是默認的拒絕策略;

    • CallerRunsPolicy,把任務(wù)交給當前線程來執(zhí)行;

    • DiscardPolicy,忽略此任務(wù)(最新的任務(wù));

    • DiscardOldestPolicy,忽略最早的任務(wù)(最先加入隊列的任務(wù))。

    例如,我們來演示一個 AbortPolicy 的拒絕策略,代碼如下:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
            new ThreadPoolExecutor.AbortPolicy()); // 添加 AbortPolicy 拒絕策略
    for (int i = 0; i < 6; i++) {
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
    }

    以上程序的執(zhí)行結(jié)果:

    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-2
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
     at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
     at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
     at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
     at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)
     at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)

    可以看出當?shù)?6 個任務(wù)來的時候,線程池則執(zhí)行了AbortPolicy 拒絕策略,拋出了異常。因為隊列最多存儲 2 個任務(wù),最大可以創(chuàng)建 3 個線程來執(zhí)行任務(wù)(2+3=5),所以當?shù)?6 個任務(wù)來的時候,此線程池就“忙”不過來了。

    自定義拒絕策略

    自定義拒絕策略只需要新建一個 RejectedExecutionHandler 對象,然后重寫它的 rejectedExecution() 方法即可,如下代碼所示:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
            new RejectedExecutionHandler() {  // 添加自定義拒絕策略
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    // 業(yè)務(wù)處理方法
                    System.out.println("執(zhí)行自定義拒絕策略");
                }
            });
    for (int i = 0; i < 6; i++) {
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
    }

    以上代碼執(zhí)行的結(jié)果如下:

    執(zhí)行自定義拒絕策略
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-2

    可以看出線程池執(zhí)行了自定義的拒絕策略,我們可以在 rejectedExecution 中添加自己業(yè)務(wù)處理的代碼。

    ThreadPoolExecutor 擴展

    ThreadPoolExecutor 的擴展主要是通過重寫它的 beforeExecute() 和 afterExecute() 方法實現(xiàn)的,我們可以在擴展方法中添加日志或者實現(xiàn)數(shù)據(jù)統(tǒng)計,比如統(tǒng)計線程的執(zhí)行時間,如下代碼所示:

    public class ThreadPoolExtend {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 線程池擴展調(diào)用
            MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2, 4, 10,
                    TimeUnit.SECONDS, new LinkedBlockingQueue());
            for (int i = 0; i < 3; i++) {
                executor.execute(() -> {
                    Thread.currentThread().getName();
                });
            }
        }
       /**
         * 線程池擴展
         */
        static class MyThreadPoolExecutor extends ThreadPoolExecutor {
            // 保存線程執(zhí)行開始時間
            private final ThreadLocal<Long> localTime = new ThreadLocal<>();
            public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
            /**
             * 開始執(zhí)行之前
             * @param t 線程
             * @param r 任務(wù)
             */
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                Long sTime = System.nanoTime(); // 開始時間 (單位:納秒)
                localTime.set(sTime);
                System.out.println(String.format("%s | before | time=%s",
                        t.getName(), sTime));
                super.beforeExecute(t, r);
            }
            /**
             * 執(zhí)行完成之后
             * @param r 任務(wù)
             * @param t 拋出的異常
             */
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                Long eTime = System.nanoTime(); // 結(jié)束時間 (單位:納秒)
                Long totalTime = eTime - localTime.get(); // 執(zhí)行總時間
                System.out.println(String.format("%s | after | time=%s | 耗時:%s 毫秒",
                        Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));
                super.afterExecute(r, t);
            }
        }
    }

    以上程序的執(zhí)行結(jié)果如下所示:

    pool-1-thread-1 | before | time=4570298843700
    pool-1-thread-2 | before | time=4570298840000
    pool-1-thread-1 | after | time=4570327059500 | 耗時:28.2158 毫秒
    pool-1-thread-2 | after | time=4570327138100 | 耗時:28.2981 毫秒
    pool-1-thread-1 | before | time=4570328467800
    pool-1-thread-1 | after | time=4570328636800 | 耗時:0.169 毫秒

    以上就是“ThreadPoolExecutor參數(shù)含義及源碼執(zhí)行流程是什么”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關(guān)注億速云行業(yè)資訊頻道。

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI