溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

SpringBoot定時任務(wù)功能怎么實(shí)現(xiàn)

發(fā)布時間:2022-05-18 11:24:08 來源:億速云 閱讀:123 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“SpringBoot定時任務(wù)功能怎么實(shí)現(xiàn)”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

一 背景

項(xiàng)目中需要一個可以動態(tài)新增定時定時任務(wù)的功能,現(xiàn)在項(xiàng)目中使用的是xxl-job定時任務(wù)調(diào)度系統(tǒng),但是經(jīng)過一番對xxl-job功能的了解,發(fā)現(xiàn)xxl-job對項(xiàng)目動態(tài)新增定時任務(wù),動態(tài)刪除定時任務(wù)的支持并不是那么好,所以需要自己手動實(shí)現(xiàn)一個定時任務(wù)的功能

二 動態(tài)定時任務(wù)調(diào)度

1 技術(shù)選擇

Timer or ScheduledExecutorService

這兩個都能實(shí)現(xiàn)定時任務(wù)調(diào)度,先看下Timer的定時任務(wù)調(diào)度

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次執(zhí)行,在當(dāng)前時間的1秒以后,之后每隔兩秒鐘執(zhí)行一次
timer.schedule(timerTask,1000L,2000L);

在看下ScheduledThreadPoolExecutor的實(shí)現(xiàn)

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);

兩個都能實(shí)現(xiàn)定時任務(wù),那他們的區(qū)別呢,使用阿里p3c會給出建議和區(qū)別

多線程并行處理定時任務(wù)時,Timer運(yùn)行多個TimeTask時,只要其中之一沒有捕獲拋出的異常,其它任務(wù)便會自動終止運(yùn)行,使用ScheduledExecutorService則沒有這個問題。

從建議上來看,是一定要選擇ScheduledExecutorService了,我們看看源碼看看為什么Timer出現(xiàn)問題會終止執(zhí)行

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}

新建對象時,我們看到開啟了一個線程,那么這個線程在做什么呢?一起看看

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一個任務(wù)都是一個quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任務(wù)信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn't yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}

我們看到,執(zhí)行了 mainLoop(),里面是 while (true)方法無限循環(huán),獲取程序中任務(wù)對象中的時間和當(dāng)前時間比對,相同就執(zhí)行,但是一旦報錯,就會進(jìn)入finally中清除掉所有任務(wù)信息。

這時候我們已經(jīng)找到了答案,timer是在被實(shí)例化后,啟動一個線程,不間斷的循環(huán)匹配,來執(zhí)行任務(wù),他是單線程的,一旦報錯,線程就終止了,所以不會執(zhí)行后續(xù)的任務(wù),而ScheduledThreadPoolExecutor是多線程執(zhí)行的,就算其中有一個任務(wù)報錯了,并不影響其他線程的執(zhí)行。

2 使用ScheduledThreadPoolExecutor

從上面看,使用ScheduledThreadPoolExecutor還是比較簡單的,但是我們要實(shí)現(xiàn)的更優(yōu)雅一些,所以選擇 TaskScheduler來實(shí)現(xiàn)

@Component
public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }
    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }
    private void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }
            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }
    public void removeCronTask(Runnable task) {
        Set<Runnable> runnables = this.scheduledTasks.keySet();
        Iterator it1 = runnables.iterator();
        while (it1.hasNext()) {
            SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
            Long taskId = schedulingRunnable.getTaskId();
            SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
            if (taskId.equals(cancelRunnable.getTaskId())) {
                ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
                if (scheduledTask != null){
                    scheduledTask.cancel();
                }
            }
        }
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    @Override
    public void destroy() throws Exception {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }
        this.scheduledTasks.clear();
    }
}

TaskScheduler是本次功能實(shí)現(xiàn)的核心類,但是他是一個接口

public interface TaskScheduler {
   /**
    * Schedule the given {@link Runnable}, invoking it whenever the trigger
    * indicates a next execution time.
    * <p>Execution will end once the scheduler shuts down or the returned
    * {@link ScheduledFuture} gets cancelled.
    * @param task the Runnable to execute whenever the trigger fires
    * @param trigger an implementation of the {@link Trigger} interface,
    * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
    * wrapping a cron expression
    * @return a {@link ScheduledFuture} representing pending completion of the task,
    * or {@code null} if the given Trigger object never fires (i.e. returns
    * {@code null} from {@link Trigger#nextExecutionTime})
    * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
    * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
    * @see org.springframework.scheduling.support.CronTrigger
    */
   @Nullable
   ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

前面的代碼可以看到,我們在類中注入了這個類,但是他是接口,我們怎么知道是那個實(shí)現(xiàn)類呢,以往出現(xiàn)這種情況要在類上面加@Primany或者@Quality來執(zhí)行實(shí)現(xiàn)的類,但是我們看到我的注入上并沒有標(biāo)記,因?yàn)槭峭ㄟ^另一種方式實(shí)現(xiàn)的

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定時任務(wù)執(zhí)行線程池核心線程數(shù)
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}

在spring初始化時就注冊了Bean TaskScheduler,而我們可以看到他的實(shí)現(xiàn)是ThreadPoolTaskScheduler,在網(wǎng)上的資料中有人說ThreadPoolTaskScheduler是TaskScheduler的默認(rèn)實(shí)現(xiàn)類,其實(shí)不是,還是需要我們?nèi)ブ付?,而這種方式,當(dāng)我們想替換實(shí)現(xiàn)時,只需要修改配置類就行了,很靈活。

而為什么說他是更優(yōu)雅的實(shí)現(xiàn)方式呢,因?yàn)樗暮诵囊彩峭ㄟ^ScheduledThreadPoolExecutor來實(shí)現(xiàn)的

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}

三 多節(jié)點(diǎn)任務(wù)執(zhí)行問題

這次的實(shí)現(xiàn)過程中,我并沒有選擇xxl-job來進(jìn)行實(shí)現(xiàn),而是采用了TaskScheduler來實(shí)現(xiàn),這也產(chǎn)生了一個問題,xxl-job是分布式的程序調(diào)度系統(tǒng),當(dāng)想要執(zhí)行定時任務(wù)的應(yīng)用使用xxl-job時,無論應(yīng)用程序中部署多少個節(jié)點(diǎn),xxl-job只會選擇其中一個節(jié)點(diǎn)作為定時任務(wù)執(zhí)行的節(jié)點(diǎn),從而不會產(chǎn)生定時任務(wù)在不同節(jié)點(diǎn)上同時執(zhí)行,導(dǎo)致重復(fù)執(zhí)行問題,而使用TaskScheduler來實(shí)現(xiàn),就要考慮多節(jié)點(diǎn)重復(fù)執(zhí)行問題。當(dāng)然既然有問題,就有解決方案

&middot; 方案一 將定時任務(wù)功能拆出來單獨(dú)部署,且只部署一個節(jié)點(diǎn) &middot; 方案二 使用redis setNx的形式,保證同一時間只有一個任務(wù)在執(zhí)行

我選擇的是方案二來執(zhí)行,當(dāng)然還有一些方式也能保證不重復(fù)執(zhí)行,這里就不多說了,一下是我的實(shí)現(xiàn)

public void executeTask(Long taskId) {
    if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
        log.info("已有執(zhí)行中定時發(fā)送短信任務(wù),本次不執(zhí)行!");
        return;
    }

“SpringBoot定時任務(wù)功能怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI