溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

淺談Spring @Async異步線程池用法總結(jié)

發(fā)布時(shí)間:2020-09-10 11:35:21 來源:腳本之家 閱讀:157 作者:hry2015 欄目:編程語(yǔ)言

本文介紹了Spring @Async異步線程池用法總結(jié),分享給大家,希望對(duì)大家有幫助

1. TaskExecutor

spring異步線程池的接口類,其實(shí)質(zhì)是Java.util.concurrent.Executor

Spring 已經(jīng)實(shí)現(xiàn)的異常線程池:

1. SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。
2. SyncTaskExecutor:這個(gè)類沒有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地方
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時(shí)被quartz和非quartz使用,才需要使用此類
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實(shí)質(zhì)是對(duì)java.util.concurrent.ThreadPoolExecutor的包裝

2. @Async

spring對(duì)過@Async定義異步任務(wù)

異步的方法有3種

1. 最簡(jiǎn)單的異步調(diào)用,返回值為void
2. 帶參數(shù)的異步調(diào)用 異步方法可以傳入?yún)?shù)
3. 異常調(diào)用返回Future

詳細(xì)見代碼:

@Component
public class AsyncDemo {
  private static final Logger log = LoggerFactory.getLogger(AsyncDemo.class);

  /**
   * 最簡(jiǎn)單的異步調(diào)用,返回值為void
   */
  @Async
  public void asyncInvokeSimplest() {
    log.info("asyncSimplest");
  }

  /**
   * 帶參數(shù)的異步調(diào)用 異步方法可以傳入?yún)?shù)
   * 
   * @param s
   */
  @Async
  public void asyncInvokeWithParameter(String s) {
    log.info("asyncInvokeWithParameter, parementer={}", s);
  }

  /**
   * 異常調(diào)用返回Future
   * 
   * @param i
   * @return
   */
  @Async
  public Future<String> asyncInvokeReturnFuture(int i) {
    log.info("asyncInvokeReturnFuture, parementer={}", i);
    Future<String> future;
    try {
      Thread.sleep(1000 * 1);
      future = new AsyncResult<String>("success:" + i);
    } catch (InterruptedException e) {
      future = new AsyncResult<String>("error");
    }
    return future;
  }

}

以上的異步方法和普通的方法調(diào)用相同

asyncDemo.asyncInvokeSimplest();
asyncDemo.asyncInvokeWithException("test");
Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
System.out.println(future.get());

3. Spring 開啟異步配置

Spring有兩種方法啟動(dòng)配置

1. 注解
2. XML

3.1 通過注解實(shí)現(xiàn)

要啟動(dòng)異常方法還需要以下配置

1. @EnableAsync 此注解開戶異步調(diào)用功能

2. public AsyncTaskExecutor taskExecutor() 方法自定義自己的線程池,線程池前綴”Anno-Executor”。如果不定義,則使用系統(tǒng)默認(rèn)的線程池。

@SpringBootApplication
@EnableAsync // 啟動(dòng)異步調(diào)用
public class AsyncApplicationWithAnnotation {
  private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAnnotation.class);

  /**
   * 自定義異步線程池
   * @return
   */
  @Bean
  public AsyncTaskExecutor taskExecutor() { 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    executor.setThreadNamePrefix("Anno-Executor");
    executor.setMaxPoolSize(10); 

    // 設(shè)置拒絕策略
    executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
      @Override
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // .....
      }
    });
    // 使用預(yù)定義的異常處理類
    // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    return executor; 
  } 

  public static void main(String[] args) {
    log.info("Start AsyncApplication.. ");
    SpringApplication.run(AsyncApplicationWithAnnotation.class, args);
  }
}

以上的異常方法和普通的方法調(diào)用相同

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithAnnotation.class)
public class AsyncApplicationWithAnnotationTests {
  @Autowired
  private AsyncDemo asyncDemo;

  @Test
  public void contextLoads() throws InterruptedException, ExecutionException {
    asyncDemo.asyncInvokeSimplest();
    asyncDemo.asyncInvokeWithParameter("test");
    Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
    System.out.println(future.get());
  }
}

執(zhí)行測(cè)試用例,輸出內(nèi)容如下:

可以看出主線程的名稱為main; 異步方法則使用 Anno-Executor1,可見異常線程池起作用了

2017-03-28 20:00:07.731 INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo : asyncSimplest
2017-03-28 20:00:07.732 INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo : asyncInvokeWithParameter, parementer=test
2017-03-28 20:00:07.751 INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo : asyncInvokeReturnFuture, parementer=100
success:100
2017-03-28 20:00:08.757 INFO 5144 --- [    Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@47af7f3d: startup date [Tue Mar 28 20:00:06 CST 2017]; root of context hierarchy

3.2 通過XML實(shí)現(xiàn)

Bean文件配置: spring_async.xml

1. 線程的前綴為xmlExecutor
2. 啟動(dòng)異步線程池配置

  <!-- 等價(jià)于 @EnableAsync, executor指定線程池 -->
  <task:annotation-driven executor="xmlExecutor"/>
  <!-- id指定線程池產(chǎn)生線程名稱的前綴 -->
  <task:executor
    id="xmlExecutor"
    pool-size="5-25"
    queue-capacity="100"
    keep-alive="120"
    rejection-policy="CALLER_RUNS"/>

線程池參數(shù)說明

1. ‘id' : 線程的名稱的前綴

2. ‘pool-size':線程池的大小。支持范圍”min-max”和固定值(此時(shí)線程池core和max sizes相同)

3. ‘queue-capacity' :排隊(duì)隊(duì)列長(zhǎng)度

○ The main idea is that when a task is submitted, the executor will first try to use a free thread if the number of active threads is currently less than the core size.
○ If the core size has been reached, then the task will be added to the queue as long as its capacity has not yet been reached.
○ Only then, if the queue's capacity has been reached, will the executor create a new thread beyond the core size.
○ If the max size has also been reached, then the executor will reject the task.
○ By default, the queue is unbounded, but this is rarely the desired configuration because it can lead to OutOfMemoryErrors if enough tasks are added to that queue while all pool threads are busy.

4. ‘rejection-policy': 對(duì)拒絕的任務(wù)處理策略

○ In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
○ In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
○ In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
○ In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

5. ‘keep-alive' : 線程?;顣r(shí)間(單位秒)

setting determines the time limit (in seconds) for which threads may remain idle before being terminated. If there are more than the core number of threads currently in the pool, after waiting this amount of time without processing a task, excess threads will get terminated. A time value of zero will cause excess threads to terminate immediately after executing a task without remaining follow-up work in the task queue()

異步線程池

@SpringBootApplication
@ImportResource("classpath:/async/spring_async.xml")
public class AsyncApplicationWithXML {
  private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithXML.class);

  public static void main(String[] args) {
    log.info("Start AsyncApplication.. ");
    SpringApplication.run(AsyncApplicationWithXML.class, args);
  }
}

測(cè)試用例

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithXML.class)
public class AsyncApplicationWithXMLTest {
  @Autowired
  private AsyncDemo asyncDemo;

  @Test
  public void contextLoads() throws InterruptedException, ExecutionException {
    asyncDemo.asyncInvokeSimplest();
    asyncDemo.asyncInvokeWithParameter("test");
    Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
    System.out.println(future.get());
  }
}

運(yùn)行測(cè)試用例,輸出內(nèi)容如下:

可以看出主線程的名稱為main; 異步方法則使用 xmlExecutor-x,可見異常線程池起作用了

2017-03-28 20:12:10.540 INFO 12948 --- [      main] c.h.s.a.xml.AsyncApplicationWithXMLTest : Started AsyncApplicationWithXMLTest in 1.441 seconds (JVM running for 2.201)
2017-03-28 20:12:10.718 INFO 12948 --- [ xmlExecutor-2] com.hry.spring.async.xml.AsyncDemo    : asyncInvokeWithParameter, parementer=test
2017-03-28 20:12:10.721 INFO 12948 --- [ xmlExecutor-1] com.hry.spring.async.xml.AsyncDemo    : asyncSimplest
2017-03-28 20:12:10.722 INFO 12948 --- [ xmlExecutor-3] com.hry.spring.async.xml.AsyncDemo    : asyncInvokeReturnFuture, parementer=100
success:100
2017-03-28 20:12:11.729 INFO 12948 --- [    Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@71809907: startup date [Tue Mar 28 20:12:09 CST 2017]; root of context hierarchy

4. 對(duì)異步方法的異常處理

在調(diào)用方法時(shí),可能出現(xiàn)方法中拋出異常的情況。在異步中主要有有兩種異常處理方法:

1. 對(duì)于方法返回值是Futrue的異步方法: a) 一種是在調(diào)用future的get時(shí)捕獲異常; b) 在異常方法中直接捕獲異常

2. 對(duì)于返回值是void的異步方法:通過AsyncUncaughtExceptionHandler處理異常

AsyncExceptionDemo:

@Component
public class AsyncExceptionDemo {
  private static final Logger log = LoggerFactory.getLogger(AsyncExceptionDemo.class);

  /**
   * 最簡(jiǎn)單的異步調(diào)用,返回值為void
   */
  @Async
  public void asyncInvokeSimplest() {
    log.info("asyncSimplest");
  }

  /**
   * 帶參數(shù)的異步調(diào)用 異步方法可以傳入?yún)?shù)
   * 對(duì)于返回值是void,異常會(huì)被AsyncUncaughtExceptionHandler處理掉
   * @param s
   */
  @Async
  public void asyncInvokeWithException(String s) {
    log.info("asyncInvokeWithParameter, parementer={}", s);
    throw new IllegalArgumentException(s);
  }

  /**
   * 異常調(diào)用返回Future
   * 對(duì)于返回值是Future,不會(huì)被AsyncUncaughtExceptionHandler處理,需要我們?cè)诜椒ㄖ胁东@異常并處理
   * 或者在調(diào)用方在調(diào)用Futrue.get時(shí)捕獲異常進(jìn)行處理
   * 
   * @param i
   * @return
   */
  @Async
  public Future<String> asyncInvokeReturnFuture(int i) {
    log.info("asyncInvokeReturnFuture, parementer={}", i);
    Future<String> future;
    try {
      Thread.sleep(1000 * 1);
      future = new AsyncResult<String>("success:" + i);
      throw new IllegalArgumentException("a");
    } catch (InterruptedException e) {
      future = new AsyncResult<String>("error");
    } catch(IllegalArgumentException e){
      future = new AsyncResult<String>("error-IllegalArgumentException");
    }
    return future;
  }

}

實(shí)現(xiàn)AsyncConfigurer接口對(duì)異常線程池更加細(xì)粒度的控制

a) 創(chuàng)建線程自己的線程池

b) 對(duì)void方法拋出的異常處理的類AsyncUncaughtExceptionHandler

/**
 * 通過實(shí)現(xiàn)AsyncConfigurer自定義異常線程池,包含異常處理
 * 
 * @author hry
 *
 */
@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
  private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);

  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); 
    threadPool.setCorePoolSize(1); 
    threadPool.setMaxPoolSize(1); 
    threadPool.setWaitForTasksToCompleteOnShutdown(true); 
    threadPool.setAwaitTerminationSeconds(60 * 15); 
    threadPool.setThreadNamePrefix("MyAsync-");
    threadPool.initialize();
    return threadPool; 
  }

  @Override
  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
     return new MyAsyncExceptionHandler(); 
  }

  /**
   * 自定義異常處理類
   * @author hry
   *
   */
  class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { 

    @Override 
    public void handleUncaughtException(Throwable throwable, Method method, Object... obj) { 
      log.info("Exception message - " + throwable.getMessage()); 
      log.info("Method name - " + method.getName()); 
      for (Object param : obj) { 
        log.info("Parameter value - " + param); 
      } 
    } 

  } 

}

@SpringBootApplication
@EnableAsync // 啟動(dòng)異步調(diào)用
public class AsyncApplicationWithAsyncConfigurer {
  private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAsyncConfigurer.class);

  public static void main(String[] args) {
    log.info("Start AsyncApplication.. ");
    SpringApplication.run(AsyncApplicationWithAsyncConfigurer.class, args);
  }


}

測(cè)試代碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithAsyncConfigurer.class)
public class AsyncApplicationWithAsyncConfigurerTests {
  @Autowired
  private AsyncExceptionDemo asyncDemo;

  @Test
  public void contextLoads() throws InterruptedException, ExecutionException {
    asyncDemo.asyncInvokeSimplest();
    asyncDemo.asyncInvokeWithException("test");
    Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
    System.out.println(future.get());
  }
}

運(yùn)行測(cè)試用例

MyAsyncConfigurer 捕獲AsyncExceptionDemo 對(duì)象在調(diào)用asyncInvokeWithException的異常

2017-04-02 16:01:45.591 INFO 11152 --- [   MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo   : asyncSimplest
2017-04-02 16:01:45.605 INFO 11152 --- [   MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo   : asyncInvokeWithParameter, parementer=test
2017-04-02 16:01:45.608 INFO 11152 --- [   MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer : Exception message - test
2017-04-02 16:01:45.608 INFO 11152 --- [   MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer : Method name - asyncInvokeWithException
2017-04-02 16:01:45.608 INFO 11152 --- [   MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer : Parameter value - test
2017-04-02 16:01:45.608 INFO 11152 --- [   MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo   : asyncInvokeReturnFuture, parementer=100
error-IllegalArgumentException
2017-04-02 16:01:46.656 INFO 11152 --- [    Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@47af7f3d: startup date [Sun Apr 02 16:01:44 CST 2017]; root of context hierarchy

5. 源碼地址

代碼的GITHUB地址

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI