多线程事务如何保证效率和原子性

ゞ 浴缸里的玫瑰 2023-09-23 23:42 134阅读 0赞

多线程事务

在Spring开发时,遇到一个从Excel表导入数据到数据库的需求,当然最简单的方法就是先使用EasyExcel把数据读出到集合中,然后依次插入到数据库中。

但如何保证效率,原子性呢?我们一步步优化方案。这里不会引入不必要的组件,而是自己模拟类似的思想。

方法1:依次顺序插入

  1. void test() {
  2. List<User> users = getAllUsers();
  3. users.forEach(user -> userService.save(user));
  4. }
  5. 复制代码

方法2:使用批处理,一次操作中执行多条SQL

  1. void test() {
  2. List<User> users = getAllUsers();
  3. userService.saveBatch(users);
  4. }
  5. 复制代码

方法3:使用多线程+批处理,每个线程插入多条数据

需要注意的一点,Spring容器不允许线程注入,也就是没办法在多线程直接使用Bean操作,例如:

  1. void testThread() {
  2. // 下面两种方式是无效的,不会执行任何东西
  3. Runnable runnable = () -> {
  4. userService.save(new User());
  5. };
  6. // 方法1
  7. new Thread(runnable).start();
  8. // 方法2
  9. Executors.newFixedThreadPool(1).submit(runnable);
  10. }
  11. 复制代码

我们需要下面的方式进行执行

  1. void testThread() {
  2. Runnable runnable = () -> {
  3. userService.save(new User());
  4. };
  5. ExecutorService executorService = Executors.newFixedThreadPool(1);
  6. CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executorService);
  7. future.join();
  8. }
  9. 复制代码
  1. void testThread() {
  2. int threadSize = 5;
  3. ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
  4. List<List<User>> list = new ArrayList<>();
  5. for (int i = 0; i < threadSize; i++) {
  6. // 我们假设数据拆分为五分
  7. list.add(getAllUsers());
  8. }
  9. for (List<User> users : list) {
  10. CompletableFuture.runAsync(()->{
  11. userService.saveBatch(users);
  12. },executorService).join();
  13. }
  14. System.out.println("插入成功");
  15. }
  16. 复制代码

方法4:这时候速度已经很快了,但是如果其中一个线程插入数据时发生错误进行回滚,其他线程是无法得知的,因为事务是针对线程的,所以这里我们需要用一些方式保证每个线程之间的状态是被共享的。

  1. // UserService#saveUserSyn()
  2. @Override
  3. public boolean saveUserSyn(List<User> users, CountDownLatch threadLatch, CountDownLatch mainLatch, UserError hasError) {
  4. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  5. System.out.println("子线程:" + Thread.currentThread().getName());
  6. try {
  7. users.forEach(this::save);
  8. } catch (Throwable e) {
  9. hasError.setHasError(true);
  10. } finally {
  11. threadLatch.countDown(); // 切换到主线程执行
  12. }
  13. try {
  14. mainLatch.await(); //等待主线程执行
  15. } catch (Throwable e) {
  16. hasError.setHasError(true);
  17. }
  18. // 判断是否有错误,如有错误 就回滚事务
  19. if (hasError.isHasError()) {
  20. dataSourceTransactionManager.rollback(transactionStatus);
  21. } else {
  22. dataSourceTransactionManager.commit(transactionStatus);
  23. }
  24. return true;
  25. }
  26. 复制代码
  27. // 测试方法
  28. @Test
  29. void userSaveSyn() {
  30. List<User> userList = getAllUsers();
  31. // 添加一个错误数据
  32. User user = new User();
  33. user.setUserAccount(null);
  34. user.setUserPassword("123456");
  35. userList.add(user);
  36. // 线程数量
  37. final Integer threadCount = 4;
  38. //每个线程处理的数据量
  39. final Integer dataPartionLength = (userList.size() + threadCount - 1) / threadCount;
  40. // 创建多线程处理任务
  41. ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  42. CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
  43. CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
  44. for (int i = 0; i < threadCount; i++) {
  45. // 每个线程处理的数据
  46. List<User> threadDatas = userList.stream()
  47. .skip(i * dataPartionLength).limit(dataPartionLength)
  48. .collect(Collectors.toList());
  49. studentThreadPool.execute(() -> {
  50. userService.saveUserSyn(threadDatas, threadLatchs, mainLatch, hasError);
  51. });
  52. }
  53. try {
  54. // 倒计时锁设置超时时间 30s
  55. boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
  56. if (!await) { // 等待超时,事务回滚
  57. hasError.setHasError(true);
  58. }
  59. } catch (Throwable e) {
  60. e.printStackTrace();
  61. hasError.setHasError(true);
  62. }
  63. mainLatch.countDown(); // 切换到子线程执行
  64. studentThreadPool.shutdown(); //关闭线程池
  65. System.out.println("主线程完成");
  66. }
  67. 复制代码

这里我们使用CountDownLatchVolatile来解决这个问题。

Volatile保证线程间数据的可见性

2PC(两阶段提交),这个属于分布式事务的一个理论,这里模拟了这样的业务场景,大致流程为:

  • 每个线程开启事务,插入数据,但不提交,向主线程通知说,我这里已经好了
  • 主线程等待一段时间,看是否所有的子线程都没问题了。如果超时也算是异常
  • 如果没有异常,主线程向所有子线程通知,可以提交事务
  • 如果有异常,主线程向所有子线程通知,进行回滚操作
  • 而中间使用Volatile修饰的hasError对象进行传达,是否出现异常。需要注意如果只是传递普通的boolean对象,可能会发生不一致的情况,我测试时没法通过。
  • CountDownLatch则保证子线程在主线程没有通知前,是不能提交事务的。

这里细心些就会发现,即便是主线程通知子线程可以提交了,子线程依然有可能出现提交失败的可能,那其他线程提交事务是无法得知这边的失败的消息的。这就是我们其实无法在一个Java进程中保证多线程的原子性。

发表评论

表情:
评论列表 (有 0 条评论,134人围观)

还没有评论,来说两句吧...

相关阅读