溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Springboot中怎么實(shí)現(xiàn)多線程并發(fā)定時(shí)任務(wù)

發(fā)布時(shí)間:2022-02-25 15:04:40 來(lái)源:億速云 閱讀:417 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下Springboot中怎么實(shí)現(xiàn)多線程并發(fā)定時(shí)任務(wù),相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

一、實(shí)現(xiàn)

1、啟動(dòng)類

在啟動(dòng)類添加注解@EnableScheduling開啟,不然不起用做。

2、新建任務(wù)類

添加注解@Component注冊(cè)到spring的容器中。

package com.example.demo.task;

import com.example.demo.entity.MyTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;

/**
 * @path:com.example.demo.task.ScheduledTask.java
 * @className:ScheduledTask.java
 * @description:定時(shí)任務(wù)
 * @author:tanyp
 * @dateTime:2020/7/23 21:37 
 * @editNote:
 */
@Slf4j
@Component
public class ScheduledTask implements SchedulingConfigurer {

    private volatile ScheduledTaskRegistrar registrar;

    private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();

    private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();

    /**
     * 默認(rèn)啟動(dòng)10個(gè)線程
     */
    private static final Integer DEFAULT_THREAD_POOL = 10;

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {
        registrar.setScheduler(Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL));
        this.registrar = registrar;
    }

    @PreDestroy
    public void destroy() {
        this.registrar.destroy();
    }

    /**
     * @methodName:refreshTask
     * @description:初始化任務(wù)
     * 1、從數(shù)據(jù)庫(kù)獲取執(zhí)行任務(wù)的集合【TxTask】
     * 2、通過(guò)調(diào)用 【refresh】 方法刷新任務(wù)列表
     * 3、每次數(shù)據(jù)庫(kù)中的任務(wù)發(fā)生變化后重新執(zhí)行【1、2】
     * @author:tanyp
     * @dateTime:2020/7/23 21:37
     * @Params: [tasks]
     * @Return: void
     * @editNote:
     */
    public void refreshTask(List<MyTask> tasks) {
        // 刪除已經(jīng)取消任務(wù)
        scheduledFutures.keySet().forEach(key -> {
            if (Objects.isNull(tasks) || tasks.size() == 0) {
                scheduledFutures.get(key).cancel(false);
                scheduledFutures.remove(key);
                cronTasks.remove(key);
                return;
            }
            tasks.forEach(task -> {
                if (!Objects.equals(key, task.getTaskId())) {
                    scheduledFutures.get(key).cancel(false);
                    scheduledFutures.remove(key);
                    cronTasks.remove(key);
                    return;
                }
            });
        });

        // 添加新任務(wù)、更改執(zhí)行規(guī)則任務(wù)
        tasks.forEach(txTask -> {
            String expression = txTask.getExpression();
            // 任務(wù)表達(dá)式為空則跳過(guò)
            if (StringUtils.isEmpty(expression)) {
                return;
            }

            // 任務(wù)已存在并且表達(dá)式未發(fā)生變化則跳過(guò)
            if (scheduledFutures.containsKey(txTask.getTaskId()) && cronTasks.get(txTask.getTaskId()).getExpression().equals(expression)) {
                return;
            }

            // 任務(wù)執(zhí)行時(shí)間發(fā)生了變化,則刪除該任務(wù)
            if (scheduledFutures.containsKey(txTask.getTaskId())) {
                scheduledFutures.get(txTask.getTaskId()).cancel(false);
                scheduledFutures.remove(txTask.getTaskId());
                cronTasks.remove(txTask.getTaskId());
            }
            CronTask task = new CronTask(new Runnable() {
                @Override
                public void run() {
                    // 執(zhí)行業(yè)務(wù)邏輯
                    try {
                        log.info("執(zhí)行單個(gè)任務(wù),任務(wù)ID【{}】執(zhí)行規(guī)則【{}】", txTask.getTaskId(), txTask.getExpression());
                        System.out.println("==========================執(zhí)行任務(wù)=============================");
                    } catch (Exception e) {
                        log.error("執(zhí)行發(fā)送消息任務(wù)異常,異常信息:{}", e);
                    }
                }
            }, expression);
            ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
            cronTasks.put(txTask.getTaskId(), task);
            scheduledFutures.put(txTask.getTaskId(), future);
        });
    }

}

3、創(chuàng)建自啟動(dòng)任務(wù)類

package com.example.demo.task;

import com.example.demo.task.ScheduledTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

/**
 * @path:com.example.demo.task.MyApplicationRunner.java
 * @className:ScheduledTask.java
 * @description:自啟動(dòng)
 * @author:tanyp
 * @dateTime:2020/7/23 21:37 
 * @editNote:
 */
@Slf4j
@Component
public class MyApplicationRunner implements ApplicationRunner {

    @Autowired
    private ScheduledTask scheduledTask;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("================項(xiàng)目啟動(dòng)初始化定時(shí)任務(wù)====開始===========");
        /**
         * 初始化三個(gè)任務(wù):
         * 1、10秒執(zhí)行一次
         * 2、15秒執(zhí)行一次
         * 3、20秒執(zhí)行一次
         */
        List<MyTask> tasks = Arrays.asList(
                MyTask.builder().taskId("10001").expression("*/10 * * * * ?").build(),
                MyTask.builder().taskId("10002").expression("*/15 * * * * ?").build(),
                MyTask.builder().taskId("10003").expression("*/20 * * * * ?").build()
        );
        scheduledTask.refreshTask(tasks);
        log.info("================項(xiàng)目啟動(dòng)初始化定時(shí)任務(wù)====完成==========");
    }
}

4、實(shí)體

package com.example.demo.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @path:com.example.demo.entity.MyTask.java
 * @className:MyTask.java
 * @description:任務(wù)實(shí)體
 * @author:tanyp
 * @dateTime:2020/7/23 21:41 
 * @editNote:
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MyTask {

    /**
     * 任務(wù)id
     */
    private String taskId;

    /**
     * 任務(wù)執(zhí)行規(guī)則時(shí)間
     */
    private String expression;
}

二、測(cè)試

初始化三個(gè)任務(wù),分別為:

10秒執(zhí)行一次(*/10 * * * * ?)
15秒執(zhí)行一次(*/15 * * * * ?)
20秒執(zhí)行一次(*/20 * * * * ?)

三、動(dòng)態(tài)使用方式

1、啟動(dòng)方式有兩種:

  • 啟動(dòng)項(xiàng)目后,手動(dòng)調(diào)用ScheduledTask.refreshTask(List tasks),并初始化任務(wù)列表;

  • 使用我測(cè)試中的方式,配置項(xiàng)目啟動(dòng)完成后自動(dòng)調(diào)用初始任務(wù)的方法,并初始化任務(wù)列表。

2、數(shù)據(jù)初始化

只需要給 List集合賦值并調(diào)用refreshTask()方法即可:

  • 根據(jù)業(yè)務(wù)需求修改MyTask實(shí)體類;

  • 這里的初始化數(shù)據(jù)可以從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)賦值給集合;

例如:從mysql讀取任務(wù)配置表的數(shù)據(jù),調(diào)用refreshTask()方法。

3、如何動(dòng)態(tài)?

  • 修改:修改某一項(xiàng)正在執(zhí)行的任務(wù)規(guī)則;

  • 添加:添加一項(xiàng)新的任務(wù);

  • 刪除:停止某一項(xiàng)正在執(zhí)行的任務(wù)。

例如:我們有一張任務(wù)配置表,此時(shí)進(jìn)行分別新增一條或多條數(shù)據(jù)、刪除一條或多條數(shù)據(jù)、改一條數(shù)據(jù),只需要完成以上任何一項(xiàng)操作后,重新調(diào)用一下refreshTask()方法即可。

怎么重新調(diào)用 refreshTask()方法:可以另外啟一個(gè)任務(wù)實(shí)時(shí)監(jiān)控任務(wù)表的數(shù)據(jù)變化。

以上是“Springboot中怎么實(shí)現(xiàn)多線程并發(fā)定時(shí)任務(wù)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI