SpringBoot @Async异步并行执行任务

淩亂°似流年 2021-10-13 04:19 646阅读 0赞

说@Async注解时,得先说说异步任务的由来,按我的理解,从Java5版本就提供 Future 接口,Future接口可以充分利用多核CPU处理能力,它允许任务在一个新的核上开启一个新的子线程,子线程和原来的任务同时运行,因为Future 的局限性Java8对Future进行重新实现,实现类 CompletableFuture,而Spring对Future接口进行了封装,使用 @Async 注解可以方便的处理异步任务
Future 与 CompletableFuture ,有位网友写的很好,大家可以看看:https://segmentfault.com/a/1190000014479792

异步的好处是,可以提高程序吞吐量,一个任务,让耗时的异步处理,并继续同步处理后面的任务,异步任务可以返回结果,拿到结果后可结合同步处理过程中的变量一起处理计算
在Spring中运用 Async注解 需要注意几点:
  • 1.方法名必须是public进行修饰的,且不能是static方法
  • 2.不能与调用的方法在同一个类中
  • 3.需要把该方法注入到Spring容器中,就是在一个类中添加异步方法,并在此类上使用@Component之类的注解加入到容器

直接上代码,有3个类,并运行测试方法看效果

  • 1.AsyncTest.java,测试类,调用异步任务,同时执行同步方法
  • 2.OrderService.java,异步任务类,提供异步方法
  • 3.AsyncThreadPoolConfig.java,异步任务线程池配置类,配置异步任务运行的线程池大小等

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class AsyncTest {
    @Autowired

    1. OrderService orderService;

    @Test

    1. public void testAsyncFuture() throws InterruptedException,ExecutionException{
    2. LogUtil.info("开始async");
    3. Future<List<String>> future1 = orderService.getList();
    4. Future<List<String>> future2 = orderService.getList2();
    5. //同步执行for循环
    6. for(int i=0; i < 10;i++){
    7. LogUtil.info("i:" + i);
    8. }
    9. //获取异步任务的处理结果,异步任务没有处理完成,会一直阻塞,可以设置超时时间,使用 get 的重载方法
    10. List<String> list1 = future1.get();
    11. LogUtil.info("list size1:" + list1.size());
    12. List<String> list2 = future1.get();
    13. LogUtil.info("list size2:" + list2.size());
    14. }

    }

    @Service
    public class OrderService{

    1. /**
    2. * 异步任务1,返回处理结果
    3. * @return
    4. */
    5. @Async("taskExecutor")
    6. public Future<List<String>> getList(){
    7. List<String> list = new ArrayList<>();
    8. try {
    9. LogUtil.info("开始处理任务1");
    10. for(int i=0; i < 100000;i++){
    11. list.add(i+"");
    12. }
    13. //让线程睡2秒
    14. Thread.sleep(1500);
    15. LogUtil.info("任务1处理完成");
    16. } catch (Exception e) {
    17. }
    18. return new AsyncResult<>(list);
    19. }
    20. /**
    21. * 异步任务2,返回处理结果
    22. * @return
    23. */
    24. @Async("taskExecutor")
    25. public Future<List<String>> getList2(){
    26. List<String> list = new ArrayList<>();
    27. try {
    28. LogUtil.info("开始处理任务2");
    29. for(int i=0; i < 100000;i++){
    30. list.add(i+"");
    31. }
    32. //让线程睡2秒
    33. Thread.sleep(1500);
    34. LogUtil.info("任务2处理完成");
    35. } catch (Exception e) {
    36. }
    37. return new AsyncResult<>(list);
    38. }

    }

    /**

    • 把context传递到线程中
      */
      public class ContextCopyingDecorator implements TaskDecorator {
      @Override
      public Runnable decorate(Runnable runnable) {
      1. RequestAttributes context = RequestContextHolder.currentRequestAttributes();
      2. return () -> {
      3. try {
      4. RequestContextHolder.setRequestAttributes(context);
      5. runnable.run();
      6. } finally {
      7. RequestContextHolder.resetRequestAttributes();
      8. }
      9. };
      }
      }

    @Configuration
    @EnableAsync
    public class AsyncThreadPoolConfig {

    1. private static final int cpu = Runtime.getRuntime().availableProcessors();//获取当前机器CPU数量
    2. private static final int corePoolSize = cpu; // 核心线程数(默认线程数)
    3. private static final int maxPoolSize = cpu * 2; // 最大线程数
    4. private static final int keepAliveTime = 60; // 允许线程空闲时间(单位:默认为秒)
    5. private static final int queueCapacity = 200; // 缓冲队列数
    6. private static final String threadNamePrefix = "taskExecutor-"; // 线程池名前缀
    7. @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
    8. public ThreadPoolTaskExecutor taskExecutor(){
    9. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    10. executor.setTaskDecorator(new ContextCopyingDecorator());
    11. executor.setCorePoolSize(corePoolSize);
    12. executor.setMaxPoolSize(maxPoolSize);
    13. executor.setQueueCapacity(queueCapacity);
    14. executor.setKeepAliveSeconds(keepAliveTime);
    15. executor.setThreadNamePrefix(threadNamePrefix);
    16. // 线程池对拒绝任务的处理策略
    17. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    18. // 初始化
    19. executor.initialize();
    20. return executor;
    21. }

    }

运行测试方法,结果如下,

  • 1.打印开始进入方法
  • 2.打印异步任务1开始处理
  • 3.打印同步任务开始处理,并马上处理异步任务2
  • 4.同步任务处理完后,会一直阻塞等待异步任务处理完,拿到异步任务的结果
  • 5.因配置了异步任务的处理线程池配置,可以看到同步任务是在main线程上完成的,异步任务是在 taskExceutor 上完成的,且2个异步任务分别在不同的线程上处理,通过合理调整该线程池数量的大小可以提供更高的吞吐量
    在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,646人围观)

还没有评论,来说两句吧...

相关阅读