springboot中解决事务中调用异步Async方法

柔光的暖阳◎ 2022-09-10 13:28 801阅读 0赞

解决思路
在数据添加的时候同时上传文件,采用异步进行文件服务器存储实现,问题是不能保证文件一定可以上传成功。
异步方法其实是多线程执行任务,需要做的是进行阻塞。

  1. 创建线程池并注入

    @Configuration
    @Slf4j
    public class ThreadPoolConfig {

    1. @Value("${asyncThreadPool.corePoolSize:10}")
    2. private int corePoolSize;
    3. @Value("${asyncThreadPool.maxPoolSize:20}")
    4. private int maxPoolSize;
    5. @Value("${asyncThreadPool.queueCapacity:20}")
    6. private int queueCapacity;
    7. @Value("${asyncThreadPool.keepAliveSeconds:3}")
    8. private int keepAliveSeconds;
    9. @Value("${asyncThreadPool.awaitTerminationSeconds:5}")
    10. private int awaitTerminationSeconds;
    11. @Value("${asyncThreadPool.threadNamePrefix:thread}")
    12. private String threadNamePrefix;
    13. /**
    14. * 线程池配置
    15. * @param
    16. * @return java.util.concurrent.Executor
    17. * @author wliduo
    18. * @date 2019/2/15 14:44
    19. */
    20. @Bean(name = "threadPoolTaskExecutor")
    21. public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    22. log.info("---------- 线程池开始加载 ----------");
    23. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    24. // 核心线程池大小
    25. threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
    26. // 最大线程数
    27. threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
    28. // 队列容量----LinkedBlockingQueue 默认使用无边界的阻塞队列 ArrayBlockingQueue 有边界队列
    29. threadPoolTaskExecutor.setQueueCapacity(keepAliveSeconds);
    30. // 活跃时间---
    31. threadPoolTaskExecutor.setKeepAliveSeconds(queueCapacity);
    32. // 主线程等待子线程执行时间---也有可能是拒绝策略
    33. threadPoolTaskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds);
    34. // 线程名字前缀
    35. threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
    36. // RejectedExecutionHandler:当pool已经达到max-size的时候,如何处理新任务
    37. // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
    38. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    39. // 初始化
    40. threadPoolTaskExecutor.initialize();
    41. log.info("---------- 线程池加载完成 ----------");
    42. return threadPoolTaskExecutor;
    43. }

    }

  2. 文件存储功能实现(异步注解)

    @Repository
    public class OssMybatisReposity implements OssReposity {

    1. @Autowired(required=false)
    2. private OssHolder ossHolder;
    3. @Value("${oss.bucketName:zjyh-yygc}")
    4. private String bucketName;
    5. @Override
    6. @Async("threadPoolTaskExecutor")
    7. public Future<String> upload(MultipartFile file, String attachmentKey) {
    8. int i = 1/0;
    9. String result = null;
    10. //自动关闭,防止oom情况出现
    11. try(InputStream inputStream = file.getInputStream()) {
    12. result = ossHolder.uploadFile(bucketName, inputStream, attachmentKey);
    13. } catch (Exception e) {
    14. e.printStackTrace();
    15. }
    16. System.out.println("upload-----------------------");
    17. return new AsyncResult<>(result);
    18. }

    }

  3. get阻塞,异常处理。
    如果不采用get,则不会发生阻塞。只需要建立异常库进行记录即可

    @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)

    1. public AttachmentDTO saveAttachment(MultipartFile multipartFiles,AttachmentDTO attachmentDTO) {
    2. try {
    3. String attachmentName = multipartFiles.getOriginalFilename();
    4. String attachmentKey = StringUtils.getFileKey(attachmentName);
    5. String attachmentType = attachmentName.substring(attachmentName.lastIndexOf("."));
    6. InvocationUserInfo userInfo = LocalInvocationContext.getContext();
    7. String uploadStr = ossReposity.upload(multipartFiles,attachmentKey).get();
    8. System.out.println("saveAttachment--------------------");
    9. if (Objects.nonNull(userInfo)) {
    10. attachmentDTO.setCreator(userInfo.getEmployeeName());
    11. }
    12. attachmentDTO.setAttachmentType(attachmentType);
    13. attachmentDTO.setAttachmentKey(attachmentKey);
    14. attachmentDTO.setAttachmentName(attachmentName);
    15. }catch (Exception e) {
    16. throw new BusinessException("新增文件失败");
    17. }
    18. Attachment attachment = new Attachment();
    19. BeanUtils.copyProperties(attachmentDTO, attachment);
    20. attachment = attachmentDomainService.createAttachment(attachment);
    21. AttachmentDTO newAttachmentDTO = new AttachmentDTO();
    22. BeanUtils.copyProperties(attachment, newAttachmentDTO);
    23. return newAttachmentDTO;
    24. }
  4. 异常信息入库

    @Configuration
    @EnableAsync
    @Slf4j
    public class AsyncConfig implements AsyncConfigurer {

  1. /**
  2. * 只能捕获无返回值的异步方法,有返回值的被主线程处理
  3. */
  4. @Override
  5. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
  6. return new CustomAsyncExceptionHandler();
  7. }
  8. /***
  9. * 处理异步方法中未捕获的异常
  10. */
  11. class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
  12. @Override
  13. public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
  14. log.info("Exception message - {}", throwable.getMessage());
  15. log.info("Method name - {}", method.getName());
  16. log.info("Parameter values - {}", Arrays.toString(obj));
  17. if (throwable instanceof Exception) {
  18. 入库;
  19. }
  20. throwable.printStackTrace();
  21. }
  22. }
  23. }

发表评论

表情:
评论列表 (有 0 条评论,801人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot使用@Async异步调用方法

    1、业务场景,在使用阿里大鱼发送短信时,不知因何原因,后端接口发送短信较耗时,前端请求后端接口很快出现请求错误,这跟前端设置的响应时间相关,可以让前端增加时间,但这并不是一个好