溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

FutureTask怎么實(shí)現(xiàn)最大等待時(shí)間

發(fā)布時(shí)間:2023-03-28 15:47:14 來源:億速云 閱讀:84 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了FutureTask怎么實(shí)現(xiàn)最大等待時(shí)間的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇FutureTask怎么實(shí)現(xiàn)最大等待時(shí)間文章都會(huì)有所收獲,下面我們一起來看看吧。

預(yù)備知識(shí)

Java 線程掛起的常用方式有以下幾種

Thread.sleep(long millis):這個(gè)方法可以讓線程掛起一段時(shí)間,并釋放 CPU 時(shí)間片,等待一段時(shí)間后自動(dòng)恢復(fù)執(zhí)行。這種方式可以用來實(shí)現(xiàn)簡(jiǎn)單的定時(shí)器功能,但如果不恰當(dāng)使用會(huì)影響系統(tǒng)性能。

Object.wait() 和 Object.notify() 或 Object.notifyAll():這是一種通過等待某個(gè)條件的發(fā)生來掛起線程的方式。wait() 方法會(huì)讓線程等待,直到其他線程調(diào)用了 notify() 或 notifyAll() 方法來通知它。這種方式需要使用 synchronized 或者 ReentrantLock 等同步機(jī)制來保證線程之間的協(xié)作和通信。

LockSupport.park() 和 LockSupport.unpark(Thread thread):這兩個(gè)方法可以讓線程掛起和恢復(fù)。park() 方法會(huì)使當(dāng)前線程掛起,直到其他線程調(diào)用了 unpark(Thread thread) 方法來喚醒它。這種方式比較靈活,可以根據(jù)需要控制線程的掛起和恢復(fù)。

先上結(jié)論

1.futureTask.get時(shí)通過LockSupport.park()掛起線程

2.在Thread.run() 方法中 調(diào)用 setException(ex)或set(result),然后調(diào)用LockSupport.unpark(t)喚醒線程。

示例-引入主題

public class FutureTaskDemo {
    public static void main(String[] args) {
        FutureTask<String> futureTask = new FutureTask<>(new Callable() {
            @Override
            public Object call() throws Exception {
                System.out.println("異步線程執(zhí)行");
                Thread.sleep(3000);//模擬線程執(zhí)行任務(wù)需要3秒
                return "ok";
            }
        });
        Thread t1 = new Thread(futureTask, "線程一");
        t1.start();

        try {
            //關(guān)鍵代碼
            String s = futureTask.get(2, TimeUnit.SECONDS); //最大等待線程2秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

進(jìn)入futureTask.get(2, TimeUnit.SECONDS)

  public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //重點(diǎn)awaitDone,即完成了最大等待,依然沒有結(jié)果就拋出異常邏輯
            throw new TimeoutException();
        return report(s);
    }

awaitDone返回線程任務(wù)執(zhí)行狀態(tài),即小于等于COMPLETING(任務(wù)正在運(yùn)行,等待完成)拋出異常TimeoutException

進(jìn)入(awaitDone(true, unit.toNanos(timeout)))原理分析

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

總體解讀awaitDone

利用自旋(for (;????)的方式 ,檢查state(任務(wù)狀態(tài))與waitNode(維護(hù)等待的線程),

第一步:首先檢查if (Thread.interrupted()) 線程是否被打斷(LockSupport.parkNanos掛起的線程被打斷不拋出異常),

第二步:判斷任務(wù)狀態(tài)與waitNode是否入隊(duì)+確定最大等待時(shí)間

若已完成(if (s > COMPLETING))返回任務(wù)狀態(tài)

若已完成(if (s == COMPLETING))-->表示正在完成,但尚未完成。則讓出 CPU,進(jìn)入就緒狀態(tài),等待其他線程的執(zhí)行

若if (q == null)==>創(chuàng)建等待等待節(jié)點(diǎn)

若if (!queued)==>表示上一步創(chuàng)建的節(jié)點(diǎn)沒有和當(dāng)前線程綁定,故綁定

最后else if (timed)與else,判斷最大等待時(shí)間

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
state可能轉(zhuǎn)換的過程 
    1.NEW -> COMPLETING -> NORMAL (成功完成)
    2.NEW -> COMPLETING -> EXCEPTIONAL (異常)
    3.NEW -> CANCELLED (任務(wù)被取消)
    4.NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷)

關(guān)鍵代碼

LockSupport.park(this, nanos) ==內(nèi)部實(shí)現(xiàn)==> UNSAFE.park(false, nanos)();

即讓當(dāng)前線程堵塞直至指定的時(shí)間(nanos),該方法同Thread.sleep()一樣不會(huì)釋放持有的對(duì)象鎖,但不同的是Thread.sleep會(huì)被打斷(interrupted)并拋出異常,而LockSupport.park被打斷不會(huì)拋出異常,故在自旋時(shí)(for (;????)需判斷if (Thread.interrupted())線程是否被打斷(手動(dòng)拋出異常)。

線程運(yùn)行時(shí)state的變化軌跡

新建時(shí)利用構(gòu)造器設(shè)置state=NEW

 public FutureTask(Runnable runnable, V result) {
     this.callable = Executors.callable(runnable, result);
     this.state = NEW;   // 賦值狀態(tài)
 }

線程運(yùn)行時(shí)state可能變化軌跡

public void run() {
        ..........防止多次運(yùn)行stat()方法..............
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); //異常軌跡---> 見下分析
                }
                if (ran)
                    set(result); // 正常軌跡--->見下分析
            }
        } finally {
            runner = null;
    		//----最后結(jié)束---防止線程被打斷
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

異常軌跡setException(ex)

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
        //軌跡變化 2.NEW -> COMPLETING -> EXCEPTIONAL (異常)
    }
    //否則1: 3.NEW -> CANCELLED (任務(wù)被取消)
    //否則2: 4.NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷)
}

正常軌跡 set(result);

1.NEW -> COMPLETING -> NORMAL (成功完成)

關(guān)于“FutureTask怎么實(shí)現(xiàn)最大等待時(shí)間”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“FutureTask怎么實(shí)現(xiàn)最大等待時(shí)間”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI