溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

死磕 java線程系列之自己動(dòng)手寫(xiě)一個(gè)線程池(續(xù))

發(fā)布時(shí)間:2020-07-19 19:37:09 來(lái)源:網(wǎng)絡(luò) 閱讀:136 作者:彤哥讀源碼 欄目:編程語(yǔ)言

死磕 java線程系列之自己動(dòng)手寫(xiě)一個(gè)線程池(續(xù))

(手機(jī)橫屏看源碼更方便)


問(wèn)題

(1)自己動(dòng)手寫(xiě)的線程池如何支持帶返回值的任務(wù)呢?

(2)如果任務(wù)執(zhí)行的過(guò)程中拋出異常了該怎么處理呢?

簡(jiǎn)介

上一章我們自己動(dòng)手寫(xiě)了一個(gè)線程池,但是它是不支持帶返回值的任務(wù)的,那么,我們自己能否實(shí)現(xiàn)呢?必須可以,今天我們就一起來(lái)實(shí)現(xiàn)帶返回值任務(wù)的線程池。

前情回顧

首先,讓我們先回顧一下上一章寫(xiě)的線程池:

(1)它包含四個(gè)要素:核心線程數(shù)、最大線程數(shù)、任務(wù)隊(duì)列、拒絕策略;

(2)它具有執(zhí)行無(wú)返回值任務(wù)的能力;

(3)它無(wú)法處理有返回值的任務(wù);

(4)它無(wú)法處理任務(wù)執(zhí)行的異常(線程中的異常不會(huì)拋出到線程外);

那么,我們能不能在現(xiàn)有的基礎(chǔ)上實(shí)現(xiàn)其下面兩項(xiàng)能力呢?讓我們一起來(lái)試一試吧!

有返回值和無(wú)返回值的任務(wù)到底有何不同?

答案很明顯,就是一個(gè)有返回值,一個(gè)無(wú)返回值,用偽代碼來(lái)表示就是下面這樣:

    // 無(wú)返回值
    threadPool.execute(()->{
        System.out.println(1);
    });
    // 有返回值,分兩步走
    // 1. 提交任務(wù)到線程池中
    SomeClass result = threadPool.execute(()->{
        System.out.println(1);
        return 1;
    });
    // 2. 等待任務(wù)的結(jié)果返回
    Object value = result.get();

無(wú)返回值的任務(wù)提交了就完事,主線程并不Care它到底有沒(méi)有執(zhí)行完,并不關(guān)心它是不是拋出異常,主線程Just提交線程到線程池中,其余什么都不管。

有返回值的任務(wù)就不一樣了,主線程首先要提交任務(wù)到線程池中,它需要使用到任務(wù)執(zhí)行的結(jié)果,所以它必須等待任務(wù)執(zhí)行完畢才能拿到任務(wù)執(zhí)行的結(jié)果。

那么,為什么不直接在execute的時(shí)候就等待任務(wù)執(zhí)行完畢呢?這樣的話那不就跟串行沒(méi)啥區(qū)別了,還不如直接在主線程執(zhí)行任務(wù)呢,還少了線程切換的資源消耗。

之所以要分成兩步,是因?yàn)橹骶€程并不一定需要立即獲取返回值,在需要用到返回值的時(shí)候才去get,這樣就可以在提交任務(wù)和獲取返回值之間干些其它的事情,提高效率。

所以,提交任務(wù)的時(shí)候不需要阻塞,get返回值的時(shí)候才可能需要阻塞,如果get的時(shí)候任務(wù)已經(jīng)執(zhí)行完畢了,這時(shí)候也不需要阻塞,如果get的時(shí)候任務(wù)還未執(zhí)行完畢,那就要阻塞等待任務(wù)執(zhí)行完畢才能獲取到返回值。

實(shí)現(xiàn)分析

首先,無(wú)返回值的任務(wù)我們直接使用的Runnable函數(shù)式接口,有返回值的任務(wù)有沒(méi)有現(xiàn)成的接口呢?還真有,那就是Callable接口,它有個(gè)返回值。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

其次,提交任務(wù)的時(shí)候需要有個(gè)返回值,它是在將來(lái)用來(lái)獲取任務(wù)執(zhí)行結(jié)果的,實(shí)際上它也是新任務(wù)的一種能力,可以使用它對(duì)任務(wù)進(jìn)行包裝,使其具有返回值的能力。

public interface Future<T> {
    T get();
}

再次,我們需要給現(xiàn)有的線程池增加一種新的能力,根據(jù)單一職責(zé)原則,我們定義一個(gè)新的接口來(lái)承載這種能力。

public interface FutureExecutor extends Executor {
    <T> Future<T> submit(Callable<T> command);
}

然后,我們需要一種新的任務(wù),它既具有舊任務(wù)的執(zhí)行能力(run()方法),又具有新任務(wù)的返回值能力(get()方法),所以我們?cè)煲粋€(gè)“將來(lái)的任務(wù)”對(duì)提交的任務(wù)進(jìn)行包裝,使其具有返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {

    /**
     * 真正的任務(wù)
     */
    private Callable<T> task;

    public FutureTask(Callable<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        // 具體實(shí)現(xiàn)...
    }

    @Override
    public T get() {
        // 具體實(shí)現(xiàn)...
    }
}

最后,我們只要對(duì)原有的線程池進(jìn)行擴(kuò)展,將提交的任務(wù)包裝成“將來(lái)獲取返回值的任務(wù)”,還是使用原來(lái)的方法去執(zhí)行,然后返回這個(gè)將來(lái)的任務(wù)即可。

根據(jù)開(kāi)閉原則,,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)原來(lái)的代碼我們不做任何修改,擴(kuò)展新的子類(lèi)來(lái)實(shí)現(xiàn)新的能力。

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {

    public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        super(name, coreSize, maxSize, taskQueue, rejectPolicy);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // 包裝成將來(lái)獲取返回值的任務(wù)
        FutureTask<T> futureTask = new FutureTask<>(task);
        // 還是使用原來(lái)的執(zhí)行能力
        execute(futureTask);
        // 返回將來(lái)的任務(wù),只需要返回其get返回值的能力即可
        // 所以這里返回的是Future而不是FutureTask類(lèi)型
        return futureTask;
    }
}

好了,到這里整體的邏輯我們就已經(jīng)比較清晰地實(shí)現(xiàn)完了,還剩下最關(guān)鍵的部分,這個(gè)“將來(lái)的任務(wù)”的兩個(gè)能力要如何實(shí)現(xiàn)。

將來(lái)的任務(wù)

將來(lái)的任務(wù),具有兩個(gè)能力:一是執(zhí)行真正任務(wù)的能力,二是將來(lái)獲取返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {
    @Override
    public void run() {
        // 具體實(shí)現(xiàn)...
    }

    @Override
    public T get() {
        // 具體實(shí)現(xiàn)...
    }
}

首先,我們要明確一件事,任務(wù)的執(zhí)行是線程池中,獲取返回值是在主線程中,它們是在兩個(gè)線程中執(zhí)行的,而且誰(shuí)先誰(shuí)后我們無(wú)法確定。

其次,如果run()在get()之前執(zhí)行,我們需要告訴get()任務(wù)已經(jīng)執(zhí)行完畢了,所以需要一個(gè)狀態(tài)來(lái)通知這個(gè)事,還需要一個(gè)變量來(lái)承載任務(wù)執(zhí)行的返回值。

    /**
     * 任務(wù)執(zhí)行的狀態(tài),0未開(kāi)始,1正常完成,2異常完成
     * 也可以使用volatile+Unsafe實(shí)現(xiàn)CAS操作
     */
    private AtomicInteger state = new AtomicInteger(NEW);
    private static final int NEW = 0;
    private static final int FINISHED = 1;
    private static final int EXCEPTION = 2;
    /**
     * 任務(wù)執(zhí)行的結(jié)果,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
     * 如果執(zhí)行正常,返回結(jié)果為T(mén)
     * 如果執(zhí)行異常,返回結(jié)果為Exception
     */
    private Object result;

再次,如果get()在run()之前執(zhí)行,那就需要阻塞等待run()執(zhí)行完畢才能拿到返回值,所以需要保存調(diào)用者(主線程),get()的時(shí)候park阻塞住,run()完成了unpark喚醒它來(lái)拿返回值。

    /**
     * 調(diào)用者線程
     * 也可以使用volatile+Unsafe實(shí)現(xiàn)CAS操作
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();

然后,我們先來(lái)看看run()方法的邏輯,它其實(shí)就是先執(zhí)行真正的任務(wù),然后修改狀態(tài)為完成,并保存任務(wù)的返回值,如果保存了主線程,還要喚醒它。

    @Override
    public void run() {
        // 如果狀態(tài)不是NEW,說(shuō)明執(zhí)行過(guò)了,直接返回
        if (state.get() != NEW) {
            return;
        }
        try {
            // 執(zhí)行任務(wù),本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
            T r = task.call();
            // CAS更新state的值為FINISHED
            // 如果更新成功,就把r賦值給result
            // 如果更新失敗,說(shuō)明state的值不為NEW了,也就是任務(wù)已經(jīng)執(zhí)行過(guò)了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必須放在修改state里面,見(jiàn)下面的分析
                finish();
            }
        } catch (Exception e) {
            // 如果CAS更新state的值為EXCEPTION成功,就把e賦值給result
            // 如果CAS更新失敗,說(shuō)明state的值不為NEW了,也就是任務(wù)已經(jīng)執(zhí)行過(guò)了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必須放在修改state里面,見(jiàn)下面的分析
                finish();
            }
        }
    }

    private void finish() {
        // 檢查調(diào)用者是否為空,如果不為空,喚醒它
        // 調(diào)用者在調(diào)用get()方法的進(jìn)入阻塞狀態(tài)
        for (Thread c; (c = caller.get()) != null;) {
            if (caller.compareAndSet(c, null)) {
                LockSupport.unpark(c);
            }
        }
    }

最后,我們?cè)倏纯磄et()方法,如果任務(wù)還未執(zhí)行,就阻塞等待任務(wù)的執(zhí)行;如果任務(wù)已經(jīng)執(zhí)行完畢了,直接拿返回值即可;但是,還有一種情況,get()方法執(zhí)行的過(guò)程中run()方法也在執(zhí)行,所以get()方法中的每一步都要檢查狀態(tài)的值有沒(méi)有變化。

@Override
    public T get() {
        int s = state.get();
        // 如果任務(wù)還未執(zhí)行完成,判斷當(dāng)前線程是否要進(jìn)入阻塞狀態(tài)
        if (s == NEW) {
            // 標(biāo)識(shí)調(diào)用者線程是否被標(biāo)記過(guò)
            boolean marked = false;
            for (;;) {
                // 重新獲取state的值
                s = state.get();
                // 如果state大于NEW說(shuō)明完成了,跳出循環(huán)
                if (s > NEW) {
                    break;
                    // 此處必須把caller的CAS更新和park()方法分成兩步處理,不能把park()放在CAS里面
                } else if (!marked) {
                    // 嘗試更新調(diào)用者線程
                    // 試想斷點(diǎn)停在此處,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
                    // 此時(shí)state為NEW,讓run()方法執(zhí)行到底,它不會(huì)執(zhí)行finish()中的unpark()方法
                    // 這時(shí)打開(kāi)斷點(diǎn),這里會(huì)更新caller成功,但是循環(huán)從頭再執(zhí)行一遍發(fā)現(xiàn)state已經(jīng)變了,
                    // 直接在上面的if(s>NEW)處跳出循環(huán)了,因?yàn)閒inish()在修改state內(nèi)部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 調(diào)用者線程更新之后park當(dāng)前線程
                    // 試想斷點(diǎn)停在此處
                    // 此時(shí)state為NEW,讓run()方法執(zhí)行到底,因?yàn)樯厦娴腸aller已經(jīng)設(shè)置值了,
                    // 所以會(huì)執(zhí)行finish()方法中的unpark()方法,
                    // 這時(shí)再打開(kāi)斷點(diǎn),這里不會(huì)park住
                    // 見(jiàn)unpark()方法的注釋?zhuān)厦鎸?xiě)得很清楚:
                    // 如果線程執(zhí)行了park()方法,那么執(zhí)行unpark()方法會(huì)喚醒那個(gè)線程
                    // 如果先執(zhí)行了unpark()方法,那么線程下一次執(zhí)行park()方法將不會(huì)阻塞
                    LockSupport.park();
                }
            }
        }

        if (s == FINISHED) {
            return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }

在我們的實(shí)現(xiàn)中,如果任務(wù)執(zhí)行的過(guò)程拋出異常了,也是通過(guò)result返回給主線程,這樣主線程就拿到了這個(gè)異常,它就可以做相應(yīng)的處理了。

好了,完整的實(shí)現(xiàn)到此結(jié)束,不知道你領(lǐng)悟了沒(méi)有。

測(cè)試用例

最后奉上測(cè)試代碼:

public class MyThreadPoolFutureExecutorTest {
    public static void main(String[] args) {
        FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000);
                System.out.println("running: " + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {
            System.out.println("runned: " + future.get());
        }
    }
}

運(yùn)行結(jié)果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒絕的任務(wù)
,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9

總結(jié)

(1)有返回值的任務(wù)是通過(guò)包裝成將來(lái)的任務(wù)來(lái)實(shí)現(xiàn)的,這個(gè)任務(wù)既具有基本的執(zhí)行能力,又具有將來(lái)獲取返回值的能力;

(2)任務(wù)執(zhí)行的異常跟任務(wù)正常的返回值是通過(guò)同一個(gè)返回值返回到主線程的,主線程根據(jù)狀態(tài)判斷是異常還是正常值;

(3)我們的實(shí)現(xiàn)中運(yùn)用了單一職責(zé)原則、開(kāi)閉原則等設(shè)計(jì)原則,對(duì)原有代碼沒(méi)有造成任何的侵入;

彩蛋

手寫(xiě)線程池目前只打算寫(xiě)這兩章,后面開(kāi)始進(jìn)入jdk原生線程池的源碼分析,敬請(qǐng)期待。

另外,需要手寫(xiě)線程池完整源碼的同學(xué)請(qǐng)點(diǎn)擊下面的鏈接獲取。

https://gitee.com/alan-tang-tt/yuan/tree/master/%E6%AD%BB%E7%A3%95%20java%E7%BA%BF%E7%A8%8B%E7%B3%BB%E5%88%97/MyThreadPool

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI