溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

數據庫周期性線程池與主要源碼分析

發(fā)布時間:2021-11-16 16:36:56 來源:億速云 閱讀:147 作者:iii 欄目:大數據

本篇內容主要講解“數據庫周期性線程池與主要源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“數據庫周期性線程池與主要源碼分析”吧!

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor:用來處理延時任務或定時任務 定時線程池類的類結構圖 數據庫周期性線程池與主要源碼分析

ScheduledThreadPoolExecutor接收ScheduleFutureTask類型的任務,是線程池調度任務的最小單位。 它采用DelayQueue存儲等待的任務: 1、DelayQueue內部封裝成一個PriorityQueue,它會根據time的先后時間順序,如果time相同則根絕sequenceNumber排序; 2、DelayQueue是無界隊列;

數據庫周期性線程池與主要源碼分析

ScheduleFutureTask

接收的參數:

private final long sequenceNumber;//任務的序號
private long time;//任務開始的時間
private final long period;//任務執(zhí)行的時間間隔

工作線程的的執(zhí)行過程: 工作線程會從DelayQueue取出已經到期的任務去執(zhí)行; 執(zhí)行結束后重新設置任務的到期時間,再次放回DelayQueue;

ScheduledThreadPoolExecutor會把待執(zhí)行的任務放到工作隊列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對隊列中的ScheduledFutureTask進行排序,具體的排序算法實現如下:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        //首先按照time排序,time小的排到前面,time大的排到后面
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        //time相同,按照sequenceNumber排序;
        //sequenceNumber小的排在前面,sequenceNumber大的排在后面 
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

接下來看看ScheduledFutureTask的run方法實現, run方法是調度task的核心,task的執(zhí)行實際上是run方法的執(zhí)行。

public void run() {
    //是否是周期性的
    boolean periodic = isPeriodic();
    //線程池是shundown狀態(tài)不支持處理新任務,直接取消任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //如果不需要執(zhí)行執(zhí)行周期性任務,直接執(zhí)行run方法結束
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //如果需要周期性執(zhí)行,則在執(zhí)行任務完成后,設置下一次執(zhí)行時間
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設置下一次執(zhí)行該任務的時間
        setNextRunTime();
        //重復執(zhí)行該任務
        reExecutePeriodic(outerTask);
    }
}

run方法的執(zhí)行步驟:

  • 1、如果線程池是shundown狀態(tài)不支持處理新任務,直接取消任務,否則步驟2;

  • 2、如果不是周期性任務,直接調用ScheduledFutureTask的run方法執(zhí)行,會設置執(zhí)行結果,然后直接返回,否則步驟3;

  • 3、如果是周期性任務,調用ScheduledFutureTask的runAndset方法執(zhí)行,不會設置執(zhí)行結果,然后直接返回,否則執(zhí)行步驟4和步驟5;

  • 4、計算下一次執(zhí)行該任務的時間;

  • 5、重復執(zhí)行該任務;

接下來看下reExecutePeriodic方法的執(zhí)行步驟:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

由于已經執(zhí)行過一次周期性任務,所以不會reject當前任務,同時傳入的任務一定是周期性任務。

周期性線程池任務的提交方式

周期性有三種提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面從使用和源碼兩個方面進行說明,首先是如果提交任務:

pool.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("延遲執(zhí)行");
    }
},1, TimeUnit.SECONDS);

/**
 * 這個執(zhí)行周期是固定,不管任務執(zhí)行多長時間,每過3秒中就會產生一個新的任務
 */
pool.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //這個業(yè)務邏輯需要很長的時間,超過了3秒
        System.out.println("重復執(zhí)行");
    }
},1,3,TimeUnit.SECONDS);

pool.shutdown();

/**
 * 假如run方法30min后執(zhí)行完成,然后間隔3秒,再周期性執(zhí)行下一個任務
 */
pool.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        //30min
        System.out.println("重復執(zhí)行");
    }
},1,3,TimeUnit.SECONDS);

知道了如何提交周期性任務,接下來源碼是如何執(zhí)行的,首先是schedule方法,該方法是指任務在指定延遲時間到達后觸發(fā),只會執(zhí)行一次。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //把任務封裝成ScheduledFutureTask,之后調用decorateTask進行包裝;
    //decorateTask方法是空方法,留給用戶去實現的;
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //包裝好任務之后,進行任務的提交                                  
    delayedExecute(t);
    return t;
}

任務提交方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果線程池不是RUNNING狀態(tài),則使用拒絕策略把提交任務拒絕掉
    if (isShutdown())
        reject(task);
    else {
        //與ThreadPoolExecutor不同,這里直接把任務加入延遲隊列
        super.getQueue().add(task);
        //如果當前狀態(tài)無法執(zhí)行任務,則取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
        //和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker;
        //增加Worker,確保提交的任務能夠被執(zhí)行
            ensurePrestart();
    }
}

到此,相信大家對“數據庫周期性線程池與主要源碼分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI