溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spring中異步和計劃任務(wù)如何實現(xiàn)

發(fā)布時間:2021-09-14 09:29:01 來源:億速云 閱讀:125 作者:小新 欄目:編程語言

這篇文章將為大家詳細(xì)講解有關(guān)Spring中異步和計劃任務(wù)如何實現(xiàn),小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

什么是Spring中的異步任務(wù)?

在我們正式的進(jìn)入話題之前,對于Spring,我們需要理解下它實現(xiàn)的兩個不同的概念:異步任務(wù)和調(diào)度任務(wù)。顯然,兩者有一個很大的共同點:都在后臺工作。但是,它們之間存在了很大差異。調(diào)度任務(wù)與異步不同,其作用與Linux中的CRON job完全相同(windows里面也有計劃任務(wù))。舉個栗子,有一個任務(wù)必須每40分鐘執(zhí)行一次,那么,可以通過XML文件或者注解來進(jìn)行此配置。簡單的異步任務(wù)在后臺執(zhí)行就好,無需配置執(zhí)行頻率。

因為它們是兩種不同的任務(wù)類型,它們兩個的執(zhí)行者自然也就不同。第一個看起來有點像Java的并發(fā)執(zhí)行器(concurrency executor),這里會有專門去寫一篇關(guān)于Java中的執(zhí)行器來具體了解。根據(jù)Spring文檔TaskExecutor所述,它提供了基于Spring的抽象來處理線程池,這點,也可以通過其類的注釋去了解。另一個抽象接口是TaskScheduler,它用于在將來給定的時間點來安排任務(wù),并執(zhí)行一次或定期執(zhí)行。

在分析源碼的過程中,發(fā)現(xiàn)另一個比較有趣的點是觸發(fā)器。它存在兩種類型:CronTrigger或PeriodTrigger。第一個模擬CRON任務(wù)的行為。所以我們可以在將來確切時間點提交一個任務(wù)的執(zhí)行。另一個觸發(fā)器可用于定期執(zhí)行任務(wù)。

Spring的異步任務(wù)類

讓我們從org.springframework.core.task.TaskExecutor類的分析開始。你會發(fā)現(xiàn),其簡單的不行,它是一個擴(kuò)展Java的Executor接口的接口。它的唯一方法也就是是執(zhí)行,在參數(shù)中使用Runnable類型的任務(wù)。

package org.springframework.core.task;
import java.util.concurrent.Executor;
/**
 * Simple task executor interface that abstracts the execution
 * of a {@link Runnable}.
 *
 * <p>Implementations can use all sorts of different execution strategies,
 * such as: synchronous, asynchronous, using a thread pool, and more.
 *
 * <p>Equivalent to JDK 1.5's {@link java.util.concurrent.Executor}
 * interface; extending it now in Spring 3.0, so that clients may declare
 * a dependency on an Executor and receive any TaskExecutor implementation.
 * This interface remains separate from the standard Executor interface
 * mainly for backwards compatibility with JDK 1.4 in Spring 2.x.
 *
 * @author Juergen Hoeller
 * @since 2.0
 * @see java.util.concurrent.Executor
 */
@FunctionalInterface
public interface TaskExecutor extends Executor {
 /**
 * Execute the given {@code task}.
 * <p>The call might return immediately if the implementation uses
 * an asynchronous execution strategy, or might block in the case
 * of synchronous execution.
 * @param task the {@code Runnable} to execute (never {@code null})
 * @throws TaskRejectedException if the given task was not accepted
 */
 @Override
 void execute(Runnable task);
}

相對來說,org.springframework.scheduling.TaskScheduler接口就有點復(fù)雜了。它定義了一組以schedule開頭的名稱的方法允許我們定義將來要執(zhí)行的任務(wù)。所有 schedule* 方法返回java.util.concurrent.ScheduledFuture實例。Spring5中對scheduleAtFixedRate方法做了進(jìn)一步的充實,其實最終調(diào)用的還是ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);

public interface TaskScheduler {
 /**
 * Schedule the given {@link Runnable}, invoking it whenever the trigger
 * indicates a next execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param trigger an implementation of the {@link Trigger} interface,
 * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
 * wrapping a cron expression
 * @return a {@link ScheduledFuture} representing pending completion of the task,
 * or {@code null} if the given Trigger object never fires (i.e. returns
 * {@code null} from {@link Trigger#nextExecutionTime})
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @see org.springframework.scheduling.support.CronTrigger
 */
 @Nullable
 ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
 
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * 使用了默認(rèn)實現(xiàn),值得我們學(xué)習(xí)使用的,Java9中同樣可以有私有實現(xiàn)的,從這里我們可以做到我只通過  * 一個接口你來實現(xiàn),我把其他相應(yīng)的功能默認(rèn)實現(xiàn)下,最后調(diào)用你自定義實現(xiàn)的接口即可,使接口功能更  * 加一目了然
 * @since 5.0
 * @see #schedule(Runnable, Date)
 */
 default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
 return schedule(task, Date.from(startTime));
 }
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 */
 ScheduledFuture<?> schedule(Runnable task, Date startTime);
...
/**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time
 * and subsequently with the given period.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired first execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @param period the interval between successive executions of the task
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @since 5.0
 * @see #scheduleAtFixedRate(Runnable, Date, long)
 */
 default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
 return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());
 }
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time
 * and subsequently with the given period.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired first execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @param period the interval between successive executions of the task (in milliseconds)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 */
 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
...
}

之前提到兩個觸發(fā)器組件,都實現(xiàn)了org.springframework.scheduling.Trigger接口。這里,我們只需關(guān)注一個的方法nextExecutionTime ,其定義下一個觸發(fā)任務(wù)的執(zhí)行時間。它的兩個實現(xiàn),CronTrigger和PeriodicTrigger,由org.springframework.scheduling.TriggerContext來實現(xiàn)信息的存儲,由此,我們可以很輕松獲得一個任務(wù)的最后一個執(zhí)行時間(lastScheduledExecutionTime),給定任務(wù)的最后完成時間(lastCompletionTime)或最后一個實際執(zhí)行時間(lastActualExecutionTime)。接下來,我們通過閱讀源代碼來簡單的了解下這些東西。org.springframework.scheduling.concurrent.ConcurrentTaskScheduler包含一個私有類EnterpriseConcurrentTriggerScheduler。在這個class里面,我們可以找到schedule方法:

public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
 ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
 return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
  @Override
  public Date getNextRunTime(LastExecution le, Date taskScheduledTime) {
   return trigger.nextExecutionTime(le != null ?
    new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
    new SimpleTriggerContext());
  }
  @Override
  public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
   return false;
  }
 });
}

SimpleTriggerContext從其名字就可以看到很多東西了,因為它實現(xiàn)了TriggerContext接口。

/**
 * Simple data holder implementation of the {@link TriggerContext} interface.
 *
 * @author Juergen Hoeller
 * @since 3.0
 */
public class SimpleTriggerContext implements TriggerContext {
 @Nullable
 private volatile Date lastScheduledExecutionTime;
 @Nullable
 private volatile Date lastActualExecutionTime;
 @Nullable
 private volatile Date lastCompletionTime;
...
 /**
 * Create a SimpleTriggerContext with the given time values.
 * @param lastScheduledExecutionTime last <i>scheduled</i> execution time
 * @param lastActualExecutionTime last <i>actual</i> execution time
 * @param lastCompletionTime last completion time
 */
 public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) {
 this.lastScheduledExecutionTime = lastScheduledExecutionTime;
 this.lastActualExecutionTime = lastActualExecutionTime;
 this.lastCompletionTime = lastCompletionTime;
 }
 ...
}

也正如你看到的,在構(gòu)造函數(shù)中設(shè)置的時間值來自javax.enterprise.concurrent.LastExecution的實現(xiàn),其中:

  • getScheduledStart:返回上次開始執(zhí)行任務(wù)的時間。它對應(yīng)于TriggerContext的lastScheduledExecutionTime屬性。

  • getRunStart:返回給定任務(wù)開始運行的時間。在TriggerContext中,它對應(yīng)于lastActualExecutionTime。

  • getRunEnd:任務(wù)終止時返回。它用于在TriggerContext中設(shè)置lastCompletionTime。

Spring調(diào)度和異步執(zhí)行中的另一個重要類是org.springframework.core.task.support.TaskExecutorAdapter。它是一個將java.util.concurrent.Executor作為Spring基本的執(zhí)行器的適配器(描述的有點拗口,看下面代碼就明了了),之前已經(jīng)描述了TaskExecutor。實際上,它引用了Java的ExecutorService,它也是繼承了Executor接口。此引用用于完成所有提交的任務(wù)。

/**
 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and
 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
 * Also detects an extended {@code java.util.concurrent.ExecutorService 從此解釋上面的說明}, adapting
 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
 *
 * @author Juergen Hoeller
 * @since 3.0
 * @see java.util.concurrent.Executor
 * @see java.util.concurrent.ExecutorService 
 * @see java.util.concurrent.Executors
 */
public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
 private final Executor concurrentExecutor;
 @Nullable
 private TaskDecorator taskDecorator;
 ...
  /**
 * Create a new TaskExecutorAdapter,
 * using the given JDK concurrent executor.
 * @param concurrentExecutor the JDK concurrent executor to delegate to
 */
 public TaskExecutorAdapter(Executor concurrentExecutor) {
 Assert.notNull(concurrentExecutor, "Executor must not be null");
 this.concurrentExecutor = concurrentExecutor;
 }
  /**
 * Delegates to the specified JDK concurrent executor.
 * @see java.util.concurrent.Executor#execute(Runnable)
 */
 @Override
 public void execute(Runnable task) {
 try {
  doExecute(this.concurrentExecutor, this.taskDecorator, task);
 }
 catch (RejectedExecutionException ex) {
  throw new TaskRejectedException(
   "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
 }
 }
 @Override
 public void execute(Runnable task, long startTimeout) {
 execute(task);
 }
 @Override
 public Future<?> submit(Runnable task) {
 try {
  if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
  return ((ExecutorService) this.concurrentExecutor).submit(task);
  }
  else {
  FutureTask<Object> future = new FutureTask<>(task, null);
  doExecute(this.concurrentExecutor, this.taskDecorator, future);
  return future;
  }
 }
 catch (RejectedExecutionException ex) {
  throw new TaskRejectedException(
   "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
 }
 }
 ...
}

在Spring中配置異步和計劃任務(wù)

下面我們通過代碼的方式來實現(xiàn)異步任務(wù)。首先,我們需要通過注解來啟用配置。它的XML配置如下:

<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="2" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
<context:component-scan base-package="com.migo.async"/>

可以通過將@EnableScheduling和@EnableAsync注解添加到配置類(用@Configuration注解)來激活兩者。完事,我們就可以開始著手實現(xiàn)調(diào)度和異步任務(wù)。為了實現(xiàn)調(diào)度任務(wù),我們可以使用@Scheduled注解。我們可以從org.springframework.scheduling.annotation包中找到這個注解。它包含了以下幾個屬性:

  • cron:使用CRON風(fēng)格(Linux配置定時任務(wù)的風(fēng)格)的配置來配置需要啟動的帶注解的任務(wù)。

  • zone:要解析CRON表達(dá)式的時區(qū)。

  • fixedDelay或fixedDelayString:在固定延遲時間后執(zhí)行任務(wù)。即任務(wù)將在最后一次調(diào)用結(jié)束和下一次調(diào)用的開始之間的這個固定時間段后執(zhí)行。

  • fixedRate或fixedRateString:使用fixedRate注解的方法的調(diào)用將以固定的時間段(例如:每10秒鐘)進(jìn)行,與執(zhí)行生命周期(開始,結(jié)束)無關(guān)。

  • initialDelay或initialDelayString:延遲首次執(zhí)行調(diào)度方法的時間。請注意,所有值(fixedDelay ,fixedRate ,initialDelay )必須以毫秒表示。 需要特別記住的是 ,用@Scheduled注解的方法不能接受任何參數(shù),并且不返回任何內(nèi)容(void),如果有返回值,返回值也會被忽略掉的,沒什么卵用。定時任務(wù)方法由容器管理,而不是由調(diào)用者在運行時調(diào)用。它們由 org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor來解析,其中包含以下方法來拒絕執(zhí)行所有不正確定義的函數(shù):

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
 try {
  Assert.isTrue(method.getParameterCount() == 0,
   "Only no-arg methods may be annotated with @Scheduled");
 /**
 *  之前的版本中直接把返回值非空的給拒掉了,在Spring 4.3 Spring5 的版本中就沒那么嚴(yán)格了
   * Assert.isTrue(void.class.equals(method.getReturnType()),
   *        "Only void-returning methods may be annotated with @Scheduled");
   **/        
// ...
/**
 * 注釋很重要
 * An annotation that marks a method to be scheduled. Exactly one of
 * the {@link #cron()}, {@link #fixedDelay()}, or {@link #fixedRate()}
 * attributes must be specified.
 *
 * <p>The annotated method must expect no arguments. It will typically have
 * a {@code void} return type; if not, the returned value will be ignored
 * when called through the scheduler.
 *
 * <p>Processing of {@code @Scheduled} annotations is performed by
 * registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
 * done manually or, more conveniently, through the {@code <task:annotation-driven/>}
 * element or @{@link EnableScheduling} annotation.
 *
 * <p>This annotation may be used as a <em>meta-annotation</em> to create custom
 * <em>composed annotations</em> with attribute overrides.
 *
 * @author Mark Fisher
 * @author Dave Syer
 * @author Chris Beams
 * @since 3.0
 * @see EnableScheduling
 * @see ScheduledAnnotationBeanPostProcessor
 * @see Schedules
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
...
}

使用@Async注解標(biāo)記一個方法或一個類(通過標(biāo)記一個類,我們自動將其所有方法標(biāo)記為異步)。與@Scheduled不同,異步任務(wù)可以接受參數(shù),并可能返回某些東西。

寫一個在Spring中執(zhí)行異步任務(wù)的Demo

有了上面這些知識,我們可以來編寫異步和計劃任務(wù)。我們將通過測試用例來展示。我們從不同的任務(wù)執(zhí)行器(task executors)的測試開始:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class TaskExecutorsTest {
 
 @Test
 public void simpeAsync() throws InterruptedException {
  /**
   * SimpleAsyncTaskExecutor creates new Thread for every task and executes it asynchronously. The threads aren't reused as in 
   * native Java's thread pools.
   * 
   * The number of concurrently executed threads can be specified through concurrencyLimit bean property 
   * (concurrencyLimit XML attribute). Here it's more simple to invoke setConcurrencyLimit method. 
   * Here the tasks will be executed by 2 simultaneous threads. Without specifying this value,
   * the number of executed threads will be indefinite.
   * 
   * You can observe that only 2 tasks are executed at a given time - even if 3 are submitted to execution (lines 40-42).
   **/
  SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("thread_name_prefix_____");
  executor.setConcurrencyLimit(2);
  executor.execute(new SimpleTask("SimpleAsyncTask-1", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-2", Counters.simpleAsyncTask, 1000));
 
  Thread.sleep(1050);
  assertTrue("2 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 2);
 
  executor.execute(new SimpleTask("SimpleAsyncTask-3", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-4", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-5", Counters.simpleAsyncTask, 2000));
   
  Thread.sleep(1050);
  assertTrue("4 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 4);
  executor.execute(new SimpleTask("SimpleAsyncTask-6", Counters.simpleAsyncTask, 1000));
 
  Thread.sleep(1050);
  assertTrue("6 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", 
   Counters.simpleAsyncTask.getNb() == 6);
 }
  
 @Test
 public void syncTaskTest() {
  /**
   * SyncTask works almost as Java's CountDownLatch. In fact, this executor is synchronous with the calling thread. In our case,
   * SyncTaskExecutor tasks will be synchronous with JUnit thread. It means that the testing thread will sleep 5 
   * seconds after executing the third task ('SyncTask-3'). To prove that, we check if the total execution time is ~5 seconds.
   **/
  long start = System.currentTimeMillis();
  SyncTaskExecutor executor = new SyncTaskExecutor();
  executor.execute(new SimpleTask("SyncTask-1", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-2", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-3", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-4", Counters.syncTask, 5000));
  executor.execute(new SimpleTask("SyncTask-5", Counters.syncTask, 0));
  long end = System.currentTimeMillis();
  int execTime = Math.round((end-start)/1000);
  assertTrue("Execution time should be 5 seconds but was "+execTime+" seconds", execTime == 5); 
 }
  
 @Test
 public void threadPoolTest() throws InterruptedException {
  /**
   * This executor can be used to expose Java's native ThreadPoolExecutor as Spring bean, with the 
   * possibility to set core pool size, max pool size and queue capacity through bean properties.
   * 
   * It works exactly as ThreadPoolExecutor from java.util.concurrent package. It means that our pool starts 
   * with 2 threads (core pool size) and can be growth until 3 (max pool size).
   * In additionally, 1 task can be stored in the queue. This task will be treated 
   * as soon as one from 3 threads ends to execute provided task. In our case, we try to execute 5 tasks
   * in 3 places pool and 1 place queue. So the 5th task should be rejected and TaskRejectedException should be thrown.
   **/
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(2);
  executor.setMaxPoolSize(3);
  executor.setQueueCapacity(1);
  executor.initialize();
 
  executor.execute(new SimpleTask("ThreadPoolTask-1", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-2", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-3", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-4", Counters.threadPool, 1000));
  boolean wasTre = false;
  try {
   executor.execute(new SimpleTask("ThreadPoolTask-5", Counters.threadPool, 1000));
  } catch (TaskRejectedException tre) {
   wasTre = true;
  }
  assertTrue("The last task should throw a TaskRejectedException but it wasn't", wasTre);
 
  Thread.sleep(3000);
 
  assertTrue("4 tasks should be terminated, but "+Counters.threadPool.getNb()+" were instead", 
   Counters.threadPool.getNb()==4);
 }
 
}
 
class SimpleTask implements Runnable {
 private String name;
 private Counters counter;
 private int sleepTime;
  
 public SimpleTask(String name, Counters counter, int sleepTime) {
  this.name = name;
  this.counter = counter;
  this.sleepTime = sleepTime;
 }
  
 @Override
 public void run() {
  try {
   Thread.sleep(this.sleepTime);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  this.counter.increment();
  System.out.println("Running task '"+this.name+"' in Thread "+Thread.currentThread().getName());
 }
  
 @Override
 public String toString() {
     return "Task {"+this.name+"}";
 }
}
 
enum Counters {
     
 simpleAsyncTask(0),
 syncTask(0),
 threadPool(0);
  
 private int nb;
  
 public int getNb() {
  return this.nb;
 }
  
 public synchronized void increment() {
  this.nb++;
 }
 
 private Counters(int n) {
  this.nb = n;
 }
}

在過去,我們可以有更多的執(zhí)行器可以使用(SimpleThreadPoolTaskExecutor,TimerTaskExecutor 這些都2.x 3.x的老古董了)。但都被棄用并由本地Java的執(zhí)行器取代成為Spring的首選??纯摧敵龅慕Y(jié)果:

Running task 'SimpleAsyncTask-1' in Thread thread_name_prefix_____1
Running task 'SimpleAsyncTask-2' in Thread thread_name_prefix_____2
Running task 'SimpleAsyncTask-3' in Thread thread_name_prefix_____3
Running task 'SimpleAsyncTask-4' in Thread thread_name_prefix_____4
Running task 'SimpleAsyncTask-5' in Thread thread_name_prefix_____5
Running task 'SimpleAsyncTask-6' in Thread thread_name_prefix_____6
Running task 'SyncTask-1' in Thread main
Running task 'SyncTask-2' in Thread main
Running task 'SyncTask-3' in Thread main
Running task 'SyncTask-4' in Thread main
Running task 'SyncTask-5' in Thread main
Running task 'ThreadPoolTask-2' in Thread ThreadPoolTaskExecutor-2
Running task 'ThreadPoolTask-1' in Thread ThreadPoolTaskExecutor-1
Running task 'ThreadPoolTask-4' in Thread ThreadPoolTaskExecutor-3
Running task 'ThreadPoolTask-3' in Thread ThreadPoolTaskExecutor-2

以此我們可以推斷出,第一個測試為每個任務(wù)創(chuàng)建新的線程。通過使用不同的線程名稱,我們可以看到相應(yīng)區(qū)別。第二個,同步執(zhí)行器,應(yīng)該執(zhí)行所調(diào)用線程中的任務(wù)。這里可以看到'main'是主線程的名稱,它的主線程調(diào)用執(zhí)行同步所有任務(wù)。最后一種例子涉及最大可創(chuàng)建3個線程的線程池。從結(jié)果可以看到,他們也確實只有3個創(chuàng)建線程。

現(xiàn)在,我們將編寫一些單元測試來看看@Async和@Scheduled實現(xiàn)。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class AnnotationTest {
 
 @Autowired
 private GenericApplicationContext context;
     
 @Test
 public void testScheduled() throws InterruptedException {
 
   System.out.println("Start sleeping");
   Thread.sleep(6000);
   System.out.println("Wake up !");
 
   TestScheduledTask scheduledTask = (TestScheduledTask) context.getBean("testScheduledTask");
    /**
    * Test fixed delay. It's executed every 6 seconds. The first execution is registered after application's context start. 
    **/
   assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedDelayCounter(), 
    scheduledTask.getFixedDelayCounter() == 2);
    
    /**
    * Test fixed rate. It's executed every 6 seconds. The first execution is registered after application's context start. 
    * Unlike fixed delay, a fixed rate configuration executes one task with specified time. For example, it will execute on 
    * 6 seconds delayed task at 10:30:30, 10:30:36, 10:30:42 and so on - even if the task 10:30:30 taken 30 seconds to 
    * be terminated. In teh case of fixed delay, if the first task takes 30 seconds, the next will be executed 6 seconds 
    * after the first one, so the execution flow will be: 10:30:30, 10:31:06, 10:31:12.
    **/
   assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedRateCounter(), 
    scheduledTask.getFixedRateCounter() == 2);
    /**
    * Test fixed rate with initial delay attribute. The initialDelay attribute is set to 6 seconds. It causes that 
    * scheduled method is executed 6 seconds after application's context start. In our case, it should be executed 
    * only once because of previous Thread.sleep(6000) invocation.
    **/
   assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getInitialDelayCounter(), scheduledTask.getInitialDelayCounter() == 1);
    /**
    * Test cron scheduled task. Cron is scheduled to be executed every 6 seconds. It's executed only once, 
    * so we can deduce that it's not invoked directly before applications 
    * context start, but only after configured time (6 seconds in our case).
    **/
   assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getCronCounter(), scheduledTask.getCronCounter() == 1);
 }
     
 @Test
 public void testAsyc() throws InterruptedException {
    /**
    * To test @Async annotation, we can create a bean in-the-fly. AsyncCounter bean is a
    * simple counter which value should be equals to 2 at the end of the test. A supplementary test
    * concerns threads which execute both of AsyncCounter methods: one which 
    * isn't annotated with @Async and another one which is annotated with it. For the first one, invoking
    * thread should have the same name as the main thread. For annotated method, it can't be executed in 
    * the main thread. It must be executed asynchrounously in a new thread.
    **/
   context.registerBeanDefinition("asyncCounter", new RootBeanDefinition(AsyncCounter.class));
    
   String currentName = Thread.currentThread().getName();
   AsyncCounter asyncCounter = context.getBean("asyncCounter", AsyncCounter.class);
   asyncCounter.incrementNormal();
   assertTrue("Thread executing normal increment should be the same as JUnit thread but it wasn't (expected '"+currentName+"', got '"+asyncCounter.getNormalThreadName()+"')",
           asyncCounter.getNormalThreadName().equals(currentName));
   asyncCounter.incrementAsync();
   // sleep 50ms and give some time to AsyncCounter to update asyncThreadName value
   Thread.sleep(50);
 
   assertFalse("Thread executing @Async increment shouldn't be the same as JUnit thread but it wasn (JUnit thread '"+currentName+"', @Async thread '"+asyncCounter.getAsyncThreadName()+"')",
           asyncCounter.getAsyncThreadName().equals(currentName));
   System.out.println("Main thread execution's name: "+currentName);
   System.out.println("AsyncCounter normal increment thread execution's name: "+asyncCounter.getNormalThreadName());
   System.out.println("AsyncCounter @Async increment thread execution's name: "+asyncCounter.getAsyncThreadName());
   assertTrue("Counter should be 2, but was "+asyncCounter.getCounter(), asyncCounter.getCounter()==2);
 }
 
}
 
class AsyncCounter {
     
 private int counter = 0;
 private String normalThreadName;
 private String asyncThreadName;
  
 public void incrementNormal() {
  normalThreadName = Thread.currentThread().getName();
  this.counter++;
 }
  
 @Async
 public void incrementAsync() {
  asyncThreadName = Thread.currentThread().getName();
  this.counter++;
 }
  
 public String getAsyncThreadName() {
  return asyncThreadName;
 }
  
 public String getNormalThreadName() {
  return normalThreadName;
 }
  
 public int getCounter() {
  return this.counter;
 }
     
}

另外,我們需要創(chuàng)建新的配置文件和一個包含定時任務(wù)方法的類:

<!-- imported configuration file first -->
<!-- Activates various annotations to be detected in bean classes -->
<context:annotation-config />
 
<!-- Scans the classpath for annotated components that will be auto-registered as Spring beans.
 For example @Controller and @Service. Make sure to set the correct base-package-->
<context:component-scan base-package="com.migo.test.schedulers" />
 
<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="40" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
// scheduled methods after, all are executed every 6 seconds (scheduledAtFixedRate and scheduledAtFixedDelay start to execute at
// application context start, two other methods begin 6 seconds after application's context start)
@Component
public class TestScheduledTask {
 
 private int fixedRateCounter = 0;
 private int fixedDelayCounter = 0;
 private int initialDelayCounter = 0;
 private int cronCounter = 0;
 
 @Scheduled(fixedRate = 6000)
 public void scheduledAtFixedRate() {
  System.out.println("<R> Increment at fixed rate");
  fixedRateCounter++;
 }
  
 @Scheduled(fixedDelay = 6000)
 public void scheduledAtFixedDelay() {
  System.out.println("<D> Incrementing at fixed delay");
  fixedDelayCounter++;
 }
  
 @Scheduled(fixedDelay = 6000, initialDelay = 6000)
 public void scheduledWithInitialDelay() {
  System.out.println("<DI> Incrementing with initial delay");
  initialDelayCounter++;
 }
  
 @Scheduled(cron = "**/6 ** ** ** ** **")
 public void scheduledWithCron() {
  System.out.println("<C> Incrementing with cron");
  cronCounter++;
      
 }
  
 public int getFixedRateCounter() {
  return this.fixedRateCounter;
 }
  
 public int getFixedDelayCounter() {
  return this.fixedDelayCounter;
 }
  
 public int getInitialDelayCounter() {
  return this.initialDelayCounter;
 }
  
 public int getCronCounter() {
  return this.cronCounter;
 }
     
}

該測試的輸出:

<R> Increment at fixed rate
<D> Incrementing at fixed delay
Start sleeping
<C> Incrementing with cron
<DI> Incrementing with initial delay
<R> Increment at fixed rate
<D> Incrementing at fixed delay
Wake up !
Main thread execution's name: main
AsyncCounter normal increment thread execution's name: main
AsyncCounter @Async increment thread execution's name: taskExecutor-1

關(guān)于“Spring中異步和計劃任務(wù)如何實現(xiàn)”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI