溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java線程池execute()方法怎么用

發(fā)布時(shí)間:2022-03-23 16:09:27 來(lái)源:億速云 閱讀:2260 作者:iii 欄目:開(kāi)發(fā)技術(shù)

本文小編為大家詳細(xì)介紹“Java線程池execute()方法怎么用”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“Java線程池execute()方法怎么用”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。

先理解線程池到底有什么作用

* Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.

線程池處理了兩個(gè)不同的問(wèn)題,線程池通過(guò)減少線程正式調(diào)用之前的開(kāi)銷來(lái)給大量異步任務(wù)更優(yōu)秀的表現(xiàn),與此同時(shí)給出了一系列綁定管理任務(wù)線程的一種手段。每個(gè)線程池都包含了一些基本信息,比如內(nèi)部完成的任務(wù)數(shù)量。

先看ThreadPoolExecutor類的一系列代表狀態(tài)的

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl作為AtomicInteger類存放了類中的兩種信息,在其中由高3位來(lái)保存線程池的狀態(tài),后29位來(lái)保存此時(shí)線程池中的Woker類線程數(shù)量(由此可知,線程池中的線程數(shù)量最高可以接受大約在五億左右)。由此可見(jiàn)給出的runStateOf()和workerCountOf()方法分別給出了查看線程狀態(tài)和線程數(shù)量的方法。

該類一共給出了五種狀態(tài)

讓我們看作者給出的注釋

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
  • RUNNING狀態(tài)可以接受新進(jìn)來(lái)的任務(wù),同時(shí)也會(huì)執(zhí)行隊(duì)列里的任務(wù)。

  • SHUTDOWN 狀態(tài)已經(jīng)不會(huì)再接受新任務(wù),但仍舊會(huì)處理隊(duì)列中的任務(wù)。

  • STOP狀態(tài)在之前的基礎(chǔ)上,不會(huì)處理隊(duì)列中的人物,在執(zhí)行的任務(wù)也會(huì)直接被打斷。

  • TIDYING狀態(tài)在之前的基礎(chǔ)上,所有任務(wù)都已經(jīng)終止,池中的Worker線程都已經(jīng)為0,也就是stop狀態(tài)在清理完所有工作線程之后就會(huì)進(jìn)入該狀態(tài),同時(shí)在shutdown狀態(tài)在隊(duì)列空以及工作線程清理完畢之后也會(huì)直接進(jìn)入這個(gè)階段,這一階段會(huì)循環(huán)執(zhí)行terminated()方法。

  • TERMINATED 狀態(tài)作為最后的狀態(tài),在之前的基礎(chǔ)上terminated()方法也業(yè)已執(zhí)行完畢,才會(huì)從上個(gè)狀態(tài)進(jìn)入這個(gè)狀態(tài),代表線程池已經(jīng)完全停止。

由于線程池的狀態(tài)都是通過(guò)AtomicInteger來(lái)保存的,可以通過(guò)比較的方式簡(jiǎn)單的得到當(dāng)前線程狀態(tài)。

private final BlockingQueue<Runnable> workQueue; 
private final ReentrantLock mainLock = new ReentrantLock(); 
private final HashSet<Worker> workers = new HashSet<Worker>(); 
private final Condition termination = mainLock.newCondition(); 
private int largestPoolSize; 
private long completedTaskCount; 
private volatile ThreadFactory threadFactory; 
private volatile RejectedExecutionHandler handler; 
private volatile long keepAliveTime; 
private volatile boolean allowCoreThreadTimeOut; 
private volatile int corePoolSize; 
private volatile int maximumPoolSize;

接下來(lái)是線程池的幾個(gè)有關(guān)工作線程的變量

  • corePoolSize表示線程池中允許存活最少的工作線程數(shù)量,但值得注意的是如果allowCoreThreadTimeOut一旦設(shè)置true(默認(rèn)false),每個(gè)線程的存活時(shí)間只有keepAliveTime也就是說(shuō)在allowCoreThreadTimeOut為true的時(shí)候,該線程池最小的工作線程數(shù)量為0;maximumPoolSize代表線程池中最大的工作線程數(shù)量。

  • keepAliveTime為線程池中工作線程數(shù)量大于corePoolSize時(shí),每個(gè)工作線程的在等待工作時(shí)最長(zhǎng)的等待時(shí)間。

  • workQueue作為線程池的任務(wù)等待隊(duì)列,這個(gè)將在接下來(lái)的execute()里詳細(xì)解釋。

  • Workers作為存放線程池中存放工作線程的容器。

  • largestPoolSize用來(lái)記錄線程池中存在過(guò)的最大的工作線程數(shù)量。

  • completedTaskCount用來(lái)記錄線程池完成的任務(wù)的總數(shù)。

  • Handler作為線程池中在不能接受任務(wù)的時(shí)候的拒絕策略,我們可以實(shí)現(xiàn)自己的拒絕策略,在實(shí)現(xiàn)了RejectedExecutionHandler接口的前提下。下面是線程池的默認(rèn)拒絕策略,

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

threadFactory作為線程池生產(chǎn)線程的工廠類

下面是線程池默認(rèn)的線程工廠的生產(chǎn)線程方法

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

我們可以先看我們最常調(diào)用的execute()方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute()內(nèi)部的調(diào)用邏輯非常清晰。

如果當(dāng)前線程池的工作線程數(shù)量小于corePoolSize,那么直接調(diào)用addWoker(),來(lái)添加工作線程。

下面是addWorker()的具體方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
           throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這段方法比較長(zhǎng),但整體的邏輯還是清晰的。

首先判斷當(dāng)前線程池的狀態(tài),如果已經(jīng)狀態(tài)不是shutdown或者running,或者已經(jīng)為shutdown但是工作隊(duì)列已經(jīng)為空,那么這個(gè)時(shí)候直接返回添加工作失敗。接下來(lái)是對(duì)線程池線程數(shù)量的判斷,根據(jù)調(diào)用時(shí)的core的值來(lái)判斷是跟corePoolSize還是 maximumPoolSize判斷。

在確認(rèn)了線程池狀態(tài)以及線程池中工作線程數(shù)量之后,才真正開(kāi)始添加工作線程。

新建立一個(gè)worker類(線程池的內(nèi)部類,具體的工作線程),將要執(zhí)行的具體線程做為構(gòu)造方法中的參數(shù)傳遞進(jìn)去,接下來(lái)將其加入線程池的工作線程容器workers,并且更新工作線程最大量,最后調(diào)用worker工作線程的start()方法,就完成了工作線程的建立與啟動(dòng)。

讓我們回到execute()方法,如果我們?cè)谝婚_(kāi)始的線程數(shù)量就大于corePoolSize,或者我們?cè)谡{(diào)用addworker()方法的過(guò)程中出現(xiàn)了問(wèn)題導(dǎo)致添加工作線程數(shù)量失敗,那么我們會(huì)繼續(xù)執(zhí)行接下來(lái)的邏輯。

在判斷完畢線程池的狀態(tài)后,則會(huì)將任務(wù)通過(guò)workQueue.offer())方法試圖加進(jìn)任務(wù)隊(duì)列。Offer()方法的具體實(shí)現(xiàn)會(huì)根據(jù)在線程池構(gòu)造方法中選取的任務(wù)隊(duì)列種類而產(chǎn)生變化。

但是如果成功加入了任務(wù)隊(duì)列,仍舊需要注意判斷如果線程池的狀態(tài)如果已經(jīng)不是running那么會(huì)拒絕執(zhí)行這一任務(wù)并執(zhí)行相應(yīng)的拒絕策略。在最后需要記得成功加入隊(duì)列成功后如果線程池中如果已經(jīng)沒(méi)有了工作線程,需要重新建立一個(gè)工作線程去執(zhí)行仍舊在任務(wù)隊(duì)列中等待執(zhí)行的任務(wù)。

如果在之前的前提下加入任務(wù)隊(duì)列也失敗了(比如任務(wù)隊(duì)列已滿),則會(huì)在不超過(guò)線程池最大線程數(shù)量的前提下建立一個(gè)工作線程來(lái)處理。

如果在最后的建立工作線程也失敗了,那么我們只有很遺憾的執(zhí)行任務(wù)的拒絕策略了。

在之前的過(guò)程中我們建立了工作線程Worker()類,那么我們現(xiàn)在看看worker類的內(nèi)部實(shí)現(xiàn),也可以說(shuō)是線程池的核心部分。

Worker類作為線程池的內(nèi)部類

接下來(lái)是Worker()類的成員

final Thread thread;
 
Runnable firstTask;
 
volatile long completedTasks;
  • thread作為worker的工作線程空間,由線程池中所設(shè)置的線程工廠生成。

  • firstTask則是worker在構(gòu)造方法中所接受到的所要執(zhí)行的任務(wù)。

  • completedTasks作為該worker類所執(zhí)行完畢的任務(wù)總數(shù)。

接下來(lái)我們可以看最重要的,也就是我們之前建立完Worker類之后立馬調(diào)用的run()方法了

public void run() {
    runWorker(this);
}

run()方法實(shí)現(xiàn)的很簡(jiǎn)單

我們可以繼續(xù)追蹤下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果這個(gè)worker還沒(méi)有執(zhí)行過(guò)在構(gòu)造方法就傳入的任務(wù),那么在這個(gè)方法中,會(huì)直接執(zhí)行這一任務(wù),如果沒(méi)有,則會(huì)嘗試去從任務(wù)隊(duì)列當(dāng)中去取的新的任務(wù)。

但是在真正調(diào)用任務(wù)之前,仍舊會(huì)判斷線程池的狀態(tài),如果已經(jīng)不是running亦或是shutdwon,則會(huì)直接確保線程被中斷。如果沒(méi)有,將會(huì)繼續(xù)執(zhí)行并確保不被中斷。

接下來(lái)可見(jiàn),我們所需要的任務(wù),直接在工作線程中直接以run()方式以非線程的方式所調(diào)用,這里也就是我們所需要的任務(wù)真正執(zhí)行的地方。

在執(zhí)行完畢后,工作線程的使命并沒(méi)有真正宣告段落。在while部分worker仍舊會(huì)通過(guò)getTask()方法試圖取得新的任務(wù)。

下面是getTask()的實(shí)現(xiàn)

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
 
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍舊會(huì)判斷線程池的狀態(tài)是否是running還是shutdown以及stop狀態(tài)下隊(duì)列是否仍舊有需要等待執(zhí)行的任務(wù)。如果狀態(tài)沒(méi)有問(wèn)題,則會(huì)跟據(jù)allowCoreThreadTimeOut和corePoolSize的值通過(guò)對(duì)前面這兩個(gè)屬性解釋的方式來(lái)選擇從任務(wù)隊(duì)列中獲得任務(wù)的方式(是否設(shè)置timeout)。其中的timedOut保證了確認(rèn)前一次試圖取任務(wù)時(shí)超時(shí)發(fā)生的記錄,以確保工作線程的回收。

在runWorker()方法的最后

調(diào)用了processWorkerExist()方法來(lái)執(zhí)行工作線程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate(); 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在這一方法中,首先確保已經(jīng)重新更新了線程池中工作線程的數(shù)量,之后從線程池中的工作線程容器移去當(dāng)前工作線程,并且將完成的任務(wù)總數(shù)加到線程池的任務(wù)總數(shù)當(dāng)中。

在最后仍舊要確保線程池中依舊存在大于等于最小線程數(shù)量的工作線程數(shù)量存在,如果沒(méi)有,則重新建立工作線程去等待處理任務(wù)隊(duì)列中任務(wù)。

讀到這里,這篇“Java線程池execute()方法怎么用”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI