溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

死磕 java線程系列之線程池深入解析——體系結(jié)構(gòu)

發(fā)布時(shí)間:2020-07-30 12:23:52 來源:網(wǎng)絡(luò) 閱讀:166 作者:彤哥讀源碼 欄目:編程語言

死磕 java線程系列之線程池深入解析——體系結(jié)構(gòu)

(手機(jī)橫屏看源碼更方便)


注:java源碼分析部分如無特殊說明均基于 java8 版本。

簡介

Java的線程池是塊硬骨頭,對線程池的源碼做深入研究不僅能提高對Java整個(gè)并發(fā)編程的理解,也能提高自己在面試中的表現(xiàn),增加被錄取的可能性。

本系列將分成很多個(gè)章節(jié),本章作為線程池的第一章將對整個(gè)線程池體系做一個(gè)總覽。

體系結(jié)構(gòu)

死磕 java線程系列之線程池深入解析——體系結(jié)構(gòu)

上圖列舉了線程池中非常重要的接口和類:

(1)Executor,線程池頂級接口;

(2)ExecutorService,線程池次級接口,對Executor做了一些擴(kuò)展,增加一些功能;

(3)ScheduledExecutorService,對ExecutorService做了一些擴(kuò)展,增加一些定時(shí)任務(wù)相關(guān)的功能;

(4)AbstractExecutorService,抽象類,運(yùn)用模板方法設(shè)計(jì)模式實(shí)現(xiàn)了一部分方法;

(5)ThreadPoolExecutor,普通線程池類,這也是我們通常所說的線程池,包含最基本的一些線程池操作相關(guān)的方法實(shí)現(xiàn);

(6)ScheduledThreadPoolExecutor,定時(shí)任務(wù)線程池類,用于實(shí)現(xiàn)定時(shí)任務(wù)相關(guān)功能;

(7)ForkJoinPool,新型線程池類,java7中新增的線程池類,基于工作竊取理論實(shí)現(xiàn),運(yùn)用于大任務(wù)拆小任務(wù)、任務(wù)無限多的場景;

(8)Executors,線程池工具類,定義了一些快速實(shí)現(xiàn)線程池的方法(謹(jǐn)慎使用);

Executor

線程池頂級接口,只定義了一個(gè)執(zhí)行無返回值任務(wù)的方法。

public interface Executor {
    // 執(zhí)行無返回值任務(wù),本文由公從號“彤哥讀源碼”原創(chuàng)
    void execute(Runnable command);
}

ExecutorService

線程池次級接口,對Executor做了一些擴(kuò)展,主要增加了關(guān)閉線程池、執(zhí)行有返回值任務(wù)、批量執(zhí)行任務(wù)的方法。

public interface ExecutorService extends Executor {
    // 關(guān)閉線程池,不再接受新任務(wù),但已經(jīng)提交的任務(wù)會執(zhí)行完成
    void shutdown();

    // 立即關(guān)閉線程池,嘗試停止正在運(yùn)行的任務(wù),未執(zhí)行的任務(wù)將不再執(zhí)行
    // 被迫停止及未執(zhí)行的任務(wù)將以列表的形式返回
    List<Runnable> shutdownNow();

    // 檢查線程池是否已關(guān)閉
    boolean isShutdown();

    // 檢查線程池是否已終止,只有在shutdown()或shutdownNow()之后調(diào)用才有可能為true
    boolean isTerminated();

    // 在指定時(shí)間內(nèi)線程池達(dá)到終止?fàn)顟B(tài)了才會返回true
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 執(zhí)行有返回值的任務(wù),任務(wù)的返回值為task.call()的結(jié)果
    <T> Future<T> submit(Callable<T> task);

    // 執(zhí)行有返回值的任務(wù),任務(wù)的返回值為這里傳入的result
    // 當(dāng)然只有當(dāng)任務(wù)執(zhí)行完成了調(diào)用get()時(shí)才會返回
    <T> Future<T> submit(Runnable task, T result);

    // 執(zhí)行有返回值的任務(wù),任務(wù)的返回值為null
    // 當(dāng)然只有當(dāng)任務(wù)執(zhí)行完成了調(diào)用get()時(shí)才會返回
    Future<?> submit(Runnable task);

    // 批量執(zhí)行任務(wù),只有當(dāng)這些任務(wù)都完成了這個(gè)方法才會返回
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    // 在指定時(shí)間內(nèi)批量執(zhí)行任務(wù),未執(zhí)行完成的任務(wù)將被取消
    // 這里的timeout是所有任務(wù)的總時(shí)間,不是單個(gè)任務(wù)的時(shí)間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    // 返回任意一個(gè)已完成任務(wù)的執(zhí)行結(jié)果,未執(zhí)行完成的任務(wù)將被取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    // 在指定時(shí)間內(nèi)如果有任務(wù)已完成,則返回任意一個(gè)已完成任務(wù)的執(zhí)行結(jié)果,未執(zhí)行完成的任務(wù)將被取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ScheduledExecutorService

對ExecutorService做了一些擴(kuò)展,增加一些定時(shí)任務(wù)相關(guān)的功能,主要包含兩大類:執(zhí)行一次,重復(fù)多次執(zhí)行。

public interface ScheduledExecutorService extends ExecutorService {

    // 在指定延時(shí)后執(zhí)行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 在指定延時(shí)后執(zhí)行一次
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    // 在指定延時(shí)后開始執(zhí)行,并在之后以指定時(shí)間間隔重復(fù)執(zhí)行(間隔不包含任務(wù)執(zhí)行的時(shí)間)
    // 相當(dāng)于之后的延時(shí)以任務(wù)開始計(jì)算,本文由公從號“彤哥讀源碼”原創(chuàng)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    // 在指定延時(shí)后開始執(zhí)行,并在之后以指定延時(shí)重復(fù)執(zhí)行(間隔包含任務(wù)執(zhí)行的時(shí)間)
    // 相當(dāng)于之后的延時(shí)以任務(wù)結(jié)束計(jì)算
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

AbstractExecutorService

抽象類,運(yùn)用模板方法設(shè)計(jì)模式實(shí)現(xiàn)了一部分方法,主要為執(zhí)行有返回值任務(wù)、批量執(zhí)行任務(wù)的方法。

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        // 略...
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 略...
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        // 略...
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        // 略...
    }

}

可以看到,這里的submit()方法對傳入的任務(wù)都包裝成了FutureTask來進(jìn)行處理,這是什么東西呢?歡迎關(guān)注后面的章節(jié)。

ThreadPoolExecutor

普通線程池類,這也是我們通常所說的線程池,包含最基本的一些線程池操作相關(guān)的方法實(shí)現(xiàn)。

線程池的主要實(shí)現(xiàn)邏輯都在這里面,比如線程的創(chuàng)建、任務(wù)的處理、拒絕策略等,我們后面單獨(dú)分析這個(gè)類。

ScheduledThreadPoolExecutor

定時(shí)任務(wù)線程池類,用于實(shí)現(xiàn)定時(shí)任務(wù)相關(guān)功能,將任務(wù)包裝成定時(shí)任務(wù),并按照定時(shí)策略來執(zhí)行,我們后面單獨(dú)分析這個(gè)類。

問題:你知道定時(shí)任務(wù)線程池類使用的是什么隊(duì)列嗎?

ForkJoinPool

新型線程池類,java7中新增的線程池類,這個(gè)線程池與Go中的線程模型特別類似,都是基于工作竊取理論,特別適合于處理歸并排序這種先分后合的場景。

死磕 java線程系列之線程池深入解析——體系結(jié)構(gòu)

Executors

線程池工具類,定義了一系列快速實(shí)現(xiàn)線程池的方法——newXXX(),不過阿里手冊是不建議使用這個(gè)類來新建線程池的,彤哥我并不這么認(rèn)為,只要能掌握其源碼,知道其利敝偶爾還是可以用的,后面我們再來說這個(gè)事。

彩蛋

無彩蛋不歡,今天的問題是定時(shí)任務(wù)線程池用的是哪種隊(duì)列來實(shí)現(xiàn)的?

答:延時(shí)隊(duì)列。定時(shí)任務(wù)線程池中并沒有直接使用并發(fā)集合中的DelayQueue,而是自己又實(shí)現(xiàn)了一個(gè)DelayedWorkQueue,不過跟DelayQueue的實(shí)現(xiàn)原理是一樣的。

延時(shí)隊(duì)列使用什么數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)的呢?

答:堆(DelayQueue中使用的是優(yōu)先級隊(duì)列,而優(yōu)先級隊(duì)列使用的堆;DelayedWorkQueue直接使用的堆)。

關(guān)于延時(shí)隊(duì)列、優(yōu)先級隊(duì)列和堆的相關(guān)內(nèi)容點(diǎn)擊下面的鏈接直達(dá):

死磕 java集合之DelayQueue源碼分析

死磕 java集合之PriorityQueue源碼分析

拜托,面試別再問我堆(排序)了!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI