溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

java線程池的狀態(tài)有幾種

發(fā)布時(shí)間:2021-07-29 19:22:19 來源:億速云 閱讀:163 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“java線程池的狀態(tài)有幾種”,在日常操作中,相信很多人在java線程池的狀態(tài)有幾種問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”java線程池的狀態(tài)有幾種”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

線程池5種狀態(tài)
// 高3位存放狀態(tài) 低29位存線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

RUNNING = ‐1 << COUNT_BITS; //高3位為111
SHUTDOWN = 0 << COUNT_BITS; //高3位為000
STOP = 1 << COUNT_BITS; //高3位為001
TIDYING = 2 << COUNT_BITS; //高3位為010
TERMINATED = 3 << COUNT_BITS; //高3位為011

RUNNING 初始化以后就是這種狀態(tài) SHUTDOWN 不再接收新任務(wù) 把已有任務(wù)完成后就變狀態(tài) STOP 不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)。 TIDYING(收拾/整理) 沒有任務(wù)沒有線程 有鉤子方法 terminated() TERMINATED 線程池的具體實(shí)現(xiàn) ThreadPoolExecutor 默認(rèn)線程池 ScheduledThreadPoolExecutor 定時(shí)線程池 提交任務(wù)

public void execute() //提交任務(wù)無返回值
public Future<?> submit() //任務(wù)執(zhí)行完成后有返回值

###參數(shù)解釋

corePoolSize

線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì) 提前創(chuàng)建并啟動(dòng)所有核心線程。

maximumPoolSize 線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線

程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;

keepAliveTime

線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí) 候,如果這時(shí)沒有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待 的時(shí)間超過了keepAliveTime; unit keepAliveTime的單位;

workQueue 用來保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供

了如下阻塞隊(duì)列: 1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù); 2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞 吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到 另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQuene; 4、priorityBlockingQuene:具有優(yōu)先級(jí)的無界阻塞隊(duì)列; threadFactory 它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用 Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程 時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程,同時(shí)也設(shè) 置了線程的名稱。

handler

線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必 須采取一種策略處理該任務(wù),線程池提供了4種策略: 1、AbortPolicy:直接拋出異常,默認(rèn)策略; 2、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù); 3、DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù); 4、DiscardPolicy:直接丟棄任務(wù); 上面的4種策略都是ThreadPoolExecutor的內(nèi)部類。 當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如 記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

線程池監(jiān)控
public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)
public long getCompletedTaskCount() //已完成的任務(wù)數(shù)
public int getPoolSize() //線程池當(dāng)前的線程數(shù)
public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量

源碼分析

execute方法
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /**
         * 記錄著線程池狀態(tài)和線程數(shù)量
         */
        int c = ctl.get();
        /**
         * 如果當(dāng)前線程數(shù)小于核心線程數(shù) 創(chuàng)建線程
         */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

如果在執(zhí)行execute方法一直處于running狀態(tài) 執(zhí)行流程如下

  1. 如果wokeCount < corePoolSize 創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)

  2. 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添 加到該阻塞隊(duì)列中;

  3. 如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新 提交的任務(wù);

  4. 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根 據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。 注意一下addWorker(null,false) 創(chuàng)建線程但是沒有傳入任務(wù),因?yàn)橹耙呀?jīng)把任務(wù)放到隊(duì)里里面了

addWorker方法

firstTask參數(shù) 用 于指定新增的線程執(zhí)行的第一個(gè)任務(wù)

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    // 如果設(shè)置成功 跳出第一個(gè)循環(huán)
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    // 如果狀態(tài)變了 回到的第一個(gè)循環(huán)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 用任務(wù)創(chuàng)建了worker對(duì)象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 放入線程不安全的set里
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 設(shè)置最大的池?cái)?shù)量
                            largestPoolSize = s;
                        // 標(biāo)記添加worker成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 開始執(zhí)行任務(wù) 因?yàn)閠hread是利用woker創(chuàng)建的 所以t.start實(shí)際就是調(diào)用的worker的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Worker類

線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是一組 Worker對(duì)象.

Worker類繼承了AQS,并實(shí)現(xiàn)了Runnable接口,注意其中的firstTask和thread屬 性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng) 建的線程,是用來處理任務(wù)的線程。

在調(diào)用構(gòu)造方法時(shí),需要把任務(wù)傳入,這里通過 getThreadFactory().newThread(this); 來 新 建 一 個(gè) 線 程 , newThread 方 法 傳 入 的 參 數(shù) 是 this,因?yàn)閃orker本身繼承了Runnable接口,也就是一個(gè)線程,所以一個(gè)Worker對(duì)象在 啟動(dòng)的時(shí)候會(huì)調(diào)用Worker類中的run方法。

Worker繼承了AQS,使用AQS來實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用ReentrantLock來 實(shí)現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:

  1. lock方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中;

  2. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;

  3. 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù), 這時(shí)可以對(duì)該線程進(jìn)行中斷;

  4. 線程池在執(zhí)行shutdown方法或tryTerminate方法時(shí)會(huì)調(diào)用interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers方法會(huì)使用tryLock方法來判斷線程 池中的線程是否是空閑狀態(tài);

  5. 之所以設(shè)置為不可重入,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像setCorePoolSize這樣的 線程池控制方法時(shí)重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果 在任務(wù)中調(diào)用了如setCorePoolSize這類線程池控制的方法,會(huì)中斷正在運(yùn)行的線 程。 所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。 此外,在構(gòu)造方法中執(zhí)行了setState(-1);,把state變量設(shè)置為-1,為什么這么做呢? 是因?yàn)锳QS中默認(rèn)的state是0,如果剛創(chuàng)建了一個(gè)Worker對(duì)象,還沒有執(zhí)行任務(wù)時(shí),這時(shí) 就不應(yīng)該被中斷,看一下tryAquire方法:

// tryAcquire方法是根據(jù)state是否是0來判斷的,所以,   將state設(shè)置為-1是 為了禁止在執(zhí)行任務(wù)前對(duì)線程進(jìn)行中斷。
//正因?yàn)槿绱耍趓unWorker方法中會(huì)先調(diào)用Worker對(duì)象的unlock方法將state設(shè)置為 0。

protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
runWorker方法

在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的代碼如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts 允許被打斷
        // 是否異常退出循環(huán)
        boolean completedAbruptly = true;
        try {
            // 如果task是null 從隊(duì)列里拿數(shù)據(jù)
            while (task != null || (task = getTask()) != null) {
                // 為什么不用ReentrantLock 因?yàn)橐龅讲豢芍厝?
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                /**
                 * 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài);
                 * 如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài);
                 * Thread.interrupted() 獲得當(dāng)前的中斷狀態(tài) 復(fù)位
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 鉤子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 鉤子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // gettask結(jié)果是null 就會(huì)走這 runWorker 結(jié)束 銷毀線程
            processWorkerExit(w, completedAbruptly);
        }
    }

總結(jié)一下runWorker方法的執(zhí)行過程:

  1. while循環(huán)不斷地通過getTask()方法獲取任務(wù);

  2. getTask()方法從阻塞隊(duì)列中取任務(wù);

  3. 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是 中斷狀態(tài);

  4. 調(diào)用task.run()執(zhí)行任務(wù);

  5. 如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;

  6. runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。 這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給 子類來實(shí)現(xiàn)。

getTask方法
private Runnable getTask() {
        // 判斷從阻塞隊(duì)列里獲取任務(wù)是否超時(shí)
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果線程池是非run狀態(tài)
            // 如果>= stop 說明線程正在停止 或者隊(duì)列是空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 減少任務(wù)數(shù)
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

到此,關(guān)于“java線程池的狀態(tài)有幾種”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI