Java并发包之CountDownLatch

快来打我* 2022-02-24 06:18 369阅读 0赞

今天主要来看一下JUC中的CountDownLatch:

  1. public class CountDownLatchTest implements Runnable{
  2. static final CountDownLatch countDownLatch = new CountDownLatch(10);
  3. static final CountDownLatchTest t = new CountDownLatchTest();
  4. @Override
  5. public void run(){
  6. try{
  7. Thread.sleep(2000);
  8. System.out.print("complete , " );
  9. countDownLatch.countDown();
  10. }catch (Exception e){
  11. e.printStackTrace();
  12. }
  13. }
  14. public static void main(String[] args) throws InterruptedException{
  15. ExecutorService executorService = Executors.newFixedThreadPool(10);
  16. for (int i = 0; i < 10; i++){
  17. executorService.execute(t);
  18. }
  19. /**
  20. * 与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。
  21. * 这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
  22. * 其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。
  23. * 这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。
  24. * 所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
  25. */
  26. countDownLatch.await();
  27. /**
  28. * 主线程最后执行 等待10个子线程执行完成 然后技术器为0
  29. */
  30. System.out.println("主线程执行");
  31. /**
  32. *
  33. */
  34. executorService.shutdown();
  35. /**
  36. * 这两个方法都可以关闭ExecutorService,
  37. * 这将导致其拒绝新任务.shutdown()方法在终止前允许执行以前提交的任务,
  38. * 而shutdownNow()方法阻止等待任务启动并试图停止当前正在执行的任务。
  39. * 终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。
  40. * 应该关闭未使用的ExecutorService以允许回收其资源。
  41. */
  42. }
  43. }
  44. 输出结果:complete complete complete complete complete complete complete complete complete complete end

然后看一些源码:

  1. // 构造器,必须指定一个大于零的计数
  2. public CountDownLatch(int count) {
  3. if (count < 0) throw new IllegalArgumentException("count < 0");
  4. this.sync = new Sync(count);
  5. }
  6. // 线程阻塞,直到计数为0的时候唤醒;可以响应线程中断退出阻塞
  7. public void await() throws InterruptedException {
  8. sync.acquireSharedInterruptibly(1);
  9. }
  10. // 线程阻塞一段时间,如果计数依然不是0,则返回false;否则返回true
  11. public boolean await(long timeout, TimeUnit unit)
  12. throws InterruptedException {
  13. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  14. }
  15. // 计数-1
  16. public void countDown() {
  17. sync.releaseShared(1);
  18. }
  19. // 获取计数
  20. public long getCount() {
  21. return sync.getCount();
  22. }

再看一个例子:

  1. 线程1实现 10加到100
  2. 线程2实现 100加到200
  3. 线程3实现 线程1和线程2计算结果的和
  4. public class CountDownLatchDemo {
  5. private CountDownLatch countDownLatch;
  6. private int start = 10;
  7. private int mid = 100;
  8. private int end = 200;
  9. private volatile int tmpRes1, tmpRes2;
  10. private int add(int start, int end) {
  11. int sum = 0;
  12. for (int i = start; i <= end; i++) {
  13. sum += i;
  14. }
  15. return sum;
  16. }
  17. private int sum(int a, int b) {
  18. return a + b;
  19. }
  20. public void calculate() {
  21. countDownLatch = new CountDownLatch(2);
  22. Thread thread1 = new Thread(() -> {
  23. try {
  24. // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
  25. Thread.sleep(100);
  26. System.out.println(Thread.currentThread().getName() + " : 开始执行");
  27. tmpRes1 = add(start, mid);
  28. System.out.println(Thread.currentThread().getName() +
  29. " : calculate ans: " + tmpRes1);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. } finally {
  33. countDownLatch.countDown();
  34. }
  35. }, "线程1");
  36. Thread thread2 = new Thread(() -> {
  37. try {
  38. // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
  39. Thread.sleep(100);
  40. System.out.println(Thread.currentThread().getName() + " : 开始执行");
  41. tmpRes2 = add(mid + 1, end);
  42. System.out.println(Thread.currentThread().getName() +
  43. " : calculate ans: " + tmpRes2);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. } finally {
  47. countDownLatch.countDown();
  48. }
  49. }, "线程2");
  50. Thread thread3 = new Thread(()-> {
  51. try {
  52. System.out.println(Thread.currentThread().getName() + " : 开始执行");
  53. countDownLatch.await();
  54. int ans = sum(tmpRes1, tmpRes2);
  55. System.out.println(Thread.currentThread().getName() +
  56. " : calculate ans: " + ans);
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. }, "线程3");
  61. thread3.start();
  62. thread1.start();
  63. thread2.start();
  64. }
  65. public static void main(String[] args) throws InterruptedException {
  66. CountDownLatchDemo demo = new CountDownLatchDemo();
  67. demo.calculate();
  68. Thread.sleep(1000);
  69. }
  70. }

看了上面的定义和Demo之后,使用就会简单一点了,一般流程如

  1. 首先是创建实例 CountDownLatch countDown = new CountDownLatch(10)
  2. 需要同步的线程执行完之后,计数-1; countDown.countDown()
  3. 需要等待其他线程执行完毕之后,再运行的线程,调用 countDown.await()实现阻塞同步

注意

  • 在创建实例是,必须指定初始的计数值,且应大于0
  • 必须有线程中显示的调用了countDown()计数-1方法;必须有线程显示调用了 await()方法(没有这个就没有必要使用CountDownLatch了)
  • 由于await()方法会阻塞到计数为0,如果在代码逻辑中某个线程漏掉了计数-1,导致最终计数一直大于0,直接导致死锁了
  • 鉴于上面一点,更多的推荐 await(long, TimeUnit)来替代直接使用await()方法,至少不会造成阻塞死只能重启的情况

应用场景

前面给了一个demo演示如何用,那这个东西在实际的业务场景中是否会用到呢?

因为确实在一个业务场景中使用到了,不然也就不会单独捞出这一节…

电商的详情页,由众多的数据拼装组成,如可以分成一下几个模块

  • 交易的收发货地址,销量
  • 商品的基本信息(标题,图文详情之类的)
  • 推荐的商品列表
  • 评价的内容
  • ….

上面的几个模块信息,都是从不同的服务获取信息,且彼此没啥关联;所以为了提高响应,完全可以做成并发获取数据,如

  • 线程1获取交易相关数据
  • 线程2获取商品基本信息
  • 线程3获取推荐的信息
  • 线程4获取评价信息
  • ….

但是最终拼装数据并返回给前端,需要等到上面的所有信息都获取完毕之后,才能返回,这个场景就非常的适合 CountDownLatch来做了

  1. 在拼装完整数据的线程中调用 CountDownLatch#await(long, TimeUnit) 等待所有的模块信息返回
  2. 每个模块信息的获取,由一个独立的线程执行;执行完毕之后调用 CountDownLatch#countDown() 进行计数-1

AbstractQueuedSynchronizer (简称AQS)

AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题

AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus

  1. private transient volatile Node head;
  2. private transient volatile Node tail;
  3. private volatile int state;
  4. static final class Node {
  5. static final Node SHARED = new Node();
  6. static final Node EXCLUSIVE = null;
  7. /** waitStatus value to indicate thread has cancelled */
  8. static final int CANCELLED = 1;
  9. /** waitStatus value to indicate successor's thread needs unparking */
  10. static final int SIGNAL = -1;
  11. /** waitStatus value to indicate thread is waiting on condition */
  12. static final int CONDITION = -2;
  13. /**
  14. * waitStatus value to indicate the next acquireShared should
  15. * unconditionally propagate
  16. */
  17. static final int PROPAGATE = -3;
  18. //取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一
  19. volatile int waitStatus;
  20. volatile Node prev;
  21. volatile Node next;
  22. // Link to next node waiting on condition,
  23. // or the special value SHARED
  24. volatile Thread thread;
  25. Node nextWaiter;
  26. }

1620

1. 计数器的初始化

CountDownLatch内部实现了AQS,并覆盖了tryAcquireShared()tryReleaseShared()两个方法,下面说明干嘛用的

通过前面的使用,清楚了计数器的构造必须指定计数值,这个直接初始化了 AQS内部的state变量

  1. Sync(int count) {
  2. setState(count);
  3. }

后续的计数-1/判断是否可用都是基于sate进行的

2. countDown() 计数-1的实现

  1. // 计数-1
  2. public void countDown() {
  3. sync.releaseShared(1);
  4. }
  5. public final boolean releaseShared(int arg) {
  6. if (tryReleaseShared(arg)) { // 首先尝试释放锁
  7. doReleaseShared();
  8. return true;
  9. }
  10. return false;
  11. }
  12. protected boolean tryReleaseShared(int releases) {
  13. // Decrement count; signal when transition to zero
  14. for (;;) {
  15. int c = getState();
  16. if (c == 0) //如果计数已经为0,则返回失败
  17. return false;
  18. int nextc = c-1;
  19. // 原子操作实现计数-1
  20. if (compareAndSetState(c, nextc))
  21. return nextc == 0;
  22. }
  23. }
  24. // 唤醒被阻塞的线程
  25. private void doReleaseShared() {
  26. for (;;) {
  27. Node h = head;
  28. if (h != null && h != tail) { // 队列非空,表示有线程被阻塞
  29. int ws = h.waitStatus;
  30. if (ws == Node.SIGNAL) {
  31. // 头结点如果为SIGNAL,则唤醒头结点下个节点上关联的线程,并出队
  32. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  33. continue; // loop to recheck cases
  34. unparkSuccessor(h);
  35. }
  36. else if (ws == 0 &&
  37. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  38. continue; // loop on failed CAS
  39. }
  40. if (h == head) // 没有线程被阻塞,直接跳出
  41. break;
  42. }
  43. }

上面截出计数减1的完整调用链

  1. 尝试释放锁tryReleaseShared,实现计数-1

    • 若计数已经小于0,则直接返回false
    • 否则执行计数(AQS的state)减一
    • 若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
  2. 释放并唤醒阻塞线程 doReleaseShared

    • 如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
    • 头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队

疑问一: 看到这个实现,是不是只要countDownLatch的计数为0了,所有被阻塞的线程都会被执行?

改下上面的demo,新增线程4,实现线程2的结果-线程1的结果

  1. public class CountDownLatchDemo {
  2. // ...省略重复
  3. private int sub(int a, int b) {
  4. return a - b;
  5. }
  6. public void calculate() {
  7. countDownLatch = new CountDownLatch(2);
  8. Thread thread1 = // ... ;
  9. Thread thread2 = // ...;
  10. Thread thread3 = new Thread(()-> {
  11. try {
  12. System.out.println(Thread.currentThread().getName() + " : 开始执行");
  13. countDownLatch.await();
  14. System.out.println(Thread.currentThread().getName() + " : 唤醒");
  15. Thread.sleep(100); // 确保线程4先执行完相减
  16. int ans = sum(tmpRes1, tmpRes2);
  17. System.out.println(Thread.currentThread().getName() +
  18. " : calculate ans: " + ans);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }, "线程3");
  23. Thread thread4 = new Thread(()-> {
  24. try {
  25. System.out.println(Thread.currentThread().getName() + " : 开始执行");
  26. countDownLatch.await();
  27. System.out.println(Thread.currentThread().getName() + " : 唤醒");
  28. int ans = sub(tmpRes2, tmpRes1);
  29. Thread.sleep(200); // 保证线程3先输出执行结果,以验证线程3和线程4是否并发执行
  30. System.out.println(Thread.currentThread().getName() +
  31. " : calculate ans: " + ans);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }, "线程4");
  36. thread3.start();
  37. thread4.start();
  38. thread1.start();
  39. thread2.start();
  40. }
  41. public static void main(String[] args) throws InterruptedException {
  42. CountDownLatchDemo demo = new CountDownLatchDemo();
  43. demo.calculate();
  44. Thread.sleep(1000);
  45. }
  46. }

输出如下

  1. 线程4 : 开始执行
  2. 线程3 : 开始执行
  3. 线程2 : 开始执行
  4. 线程2 : calculate ans: 15050
  5. 线程1 : 开始执行
  6. 线程1 : calculate ans: 5005
  7. 线程3 : 唤醒
  8. 线程4 : 唤醒
  9. 线程3 : calculate ans: 20055
  10. 线程4 : calculate ans: 10045

上面的实现中,线程3中sleep一段时间,确保线程4的计算会优先执行;线程4计算完成之后的sleep时间,以保证线程3计算完成并输出结果,然后线程4才输出结果;结合输出,这个期望是准确的,也就是说,线程3和线程4被唤醒后是并发执行的,没有先后阻塞顺序

即CountDownLatch计数为0之后,所有被阻塞的线程都会被唤醒,且彼此相对独立,不会出现独占锁阻塞的问题

3. await() 阻塞等待计数为0

  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4. public final void acquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. if (Thread.interrupted()) // 若线程中端,直接抛异常
  7. throw new InterruptedException();
  8. if (tryAcquireShared(arg) < 0)
  9. doAcquireSharedInterruptibly(arg);
  10. }
  11. // 计数为0时,表示获取锁成功
  12. protected int tryAcquireShared(int acquires) {
  13. return (getState() == 0) ? 1 : -1;
  14. }
  15. // 阻塞,并入队
  16. private void doAcquireSharedInterruptibly(int arg)
  17. throws InterruptedException {
  18. final Node node = addWaiter(Node.SHARED); // 入队
  19. boolean failed = true;
  20. try {
  21. for (;;) {
  22. // 获取前驱节点
  23. final Node p = node.predecessor();
  24. if (p == head) {
  25. int r = tryAcquireShared(arg);
  26. if (r >= 0) {
  27. // 获取锁成功,设置队列头为node节点
  28. setHeadAndPropagate(node, r);
  29. p.next = null; // help GC
  30. failed = false;
  31. return;
  32. }
  33. }
  34. if (shouldParkAfterFailedAcquire(p, node) // 线程挂起
  35. && parkAndCheckInterrupt())
  36. throw new InterruptedException();
  37. }
  38. } finally {
  39. if (failed)
  40. cancelAcquire(node);
  41. }
  42. }

阻塞的逻辑相对简单

  1. 判断state计数是否为0,不是,则直接放过执行后面的代码
  2. 大于0,则表示需要阻塞等待计数为0
  3. 当前线程封装Node对象,进入阻塞队列
  4. 然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码

III. 小结

1. 使用注意

  • 在创建实例时,必须指定初始的计数值,且应大于0
  • 必须有线程中显示的调用了countDown()计数-1方法;必须有线程显示调用了await()方法(没有这个就没有必要使用CountDownLatch了)
  • 由于await()方法会阻塞到计数为0,如果在代码逻辑中某个线程漏掉了计数-1,导致最终计数一直大于0,直接导致死锁了;
  • 鉴于上面一点,更多的推荐 await(long, TimeUnit)来替代直接使用await()方法,至少不会造成阻塞死只能重启的情况
  • 允许多个线程调用await方法,当计数为0后,所有被阻塞的线程都会被唤醒

2. 实现原理

await内部实现流程:

  1. 判断state计数是否为0,不是,则直接放过执行后面的代码
  2. 大于0,则表示需要阻塞等待计数为0
  3. 当前线程封装Node对象,进入阻塞队列
  4. 然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码

countDown内部实现流程:

  1. 尝试释放锁tryReleaseShared,实现计数-1

    • 若计数已经小于0,则直接返回false
    • 否则执行计数(AQS的state)减一
    • 若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
  2. 释放并唤醒阻塞线程 doReleaseShared

    • 如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
    • 头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队。

参考:https://cloud.tencent.com/developer/article/1038486

发表评论

表情:
评论列表 (有 0 条评论,369人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java发包

    Java线程:新特征-线程池 Sun在Java5中,对Java线程的类库做了大量的扩展,其中线程池就是Java5的新特征之一,除了线程池之外,还有很多多线程相关的内容,为多线