线程的并发工具类之CountDownLatch和CyclicBarrier原理和使用(七)

Dear 丶 2022-12-19 13:09 214阅读 0赞

今天分析 CountDownLatch和CyclicBarrier原理和使用:

1、CountDownLatch 闭锁

CountDownLatch 这个类能够使一个线程等待其他线程完成各自的工 作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动 所有的框架服务之后再执行。 CountDownLatch 是通过一个计数器来实现的,计数器的初始值为初始任务 的数量。每当完成了一个任务后,计数器的值就会减 1 (CountDownLatch.countDown() 方法)。当计数器值到达 0 时,它表示所有的已 经完成了任务,然后在闭锁上等待 CountDownLatch.await() 方法的线程就可以恢 复执行任务。

应用场景:

实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。 例如,我们想测试一个单例类。如果我们创建一个初始计数为 1 的 CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成 测试。我们只需调用 一次 countDown() 方法就可以让所有的等待线程同时恢复执 行。开始执行前等待 n 个线程完成各自任务:例如应用程序启动类要确保在处理 用户请求前,所有 N 个外部系统已经启动和运行了,例如处理 excel 中多个表单。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70

参见代码如 下

  1. import java.util.concurrent.CountDownLatch;
  2. /**
  3. *类说明:演示CountDownLatch用法,
  4. * 共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
  5. */
  6. public class UseCountDownLatch {
  7. static CountDownLatch latch = new CountDownLatch(6);
  8. /*初始化线程*/
  9. private static class InitThread implements Runnable{
  10. public void run() {
  11. System.out.println("Thread_"+Thread.currentThread().getId()
  12. +" ready init work......");
  13. latch.countDown();
  14. for(int i =0;i<2;i++) {
  15. System.out.println("Thread_"+Thread.currentThread().getId()
  16. +" ........continue do its work");
  17. }
  18. }
  19. }
  20. /*业务线程等待latch的计数器为0完成*/
  21. private static class BusiThread implements Runnable{
  22. public void run() {
  23. try {
  24. latch.await();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. for(int i =0;i<3;i++) {
  29. System.out.println("BusiThread_"+Thread.currentThread().getId()
  30. +" do business-----");
  31. }
  32. }
  33. }
  34. public static void main(String[] args) throws InterruptedException {
  35. new Thread(new Runnable() {
  36. public void run() {
  37. SleepTools.ms(1);
  38. System.out.println("Thread_"+Thread.currentThread().getId()
  39. +" ready init work step 1st......");
  40. latch.countDown();
  41. System.out.println("begin step 2nd.......");
  42. SleepTools.ms(1);
  43. System.out.println("Thread_"+Thread.currentThread().getId()
  44. +" ready init work step 2nd......");
  45. latch.countDown();
  46. }
  47. }).start();
  48. new Thread(new BusiThread()).start();
  49. for(int i=0;i<=3;i++){
  50. Thread thread = new Thread(new InitThread());
  51. thread.start();
  52. }
  53. latch.await();
  54. System.out.println("Main do ites work........");
  55. }
  56. }

执行结果:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 1

2、CyclicBarrier

CyclicBarrier 的字面意思是可循环使用( Cyclic )的屏障( Barrier )。它要做 的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一 个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。 CyclicBarrier 默认的构造方法是 CyclicBarrier ( int parties ),其参数表示屏障拦截 的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然 后当前线程被阻塞。 CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrie (r int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction ,方便处理更复 杂的业务场景。CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 2

参见代码如 下

  1. import java.util.Map;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. import java.util.concurrent.CyclicBarrier;
  4. /**
  5. *类说明:演示CyclicBarrier用法,共4个子线程,他们全部完成工作后,交出自己结果,
  6. *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
  7. */
  8. public class UseCyclicBarrier {
  9. private static CyclicBarrier barrier
  10. = new CyclicBarrier(4,new CollectThread());
  11. //存放子线程工作结果的容器
  12. private static ConcurrentHashMap<String,Long> resultMap
  13. = new ConcurrentHashMap<>();
  14. public static void main(String[] args) {
  15. for(int i=0;i<4;i++){
  16. Thread thread = new Thread(new SubThread());
  17. thread.start();
  18. }
  19. }
  20. /*汇总的任务*/
  21. private static class CollectThread implements Runnable{
  22. @Override
  23. public void run() {
  24. StringBuilder result = new StringBuilder();
  25. for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
  26. result.append("["+workResult.getValue()+"]");
  27. }
  28. System.out.println(" the result = "+ result);
  29. System.out.println("do other business........");
  30. }
  31. }
  32. /*相互等待的子线程*/
  33. private static class SubThread implements Runnable{
  34. @Override
  35. public void run() {
  36. long id = Thread.currentThread().getId();
  37. resultMap.put(Thread.currentThread().getId()+"",id);
  38. try {
  39. Thread.sleep(1000+id);
  40. System.out.println("Thread_"+id+" ....do something ");
  41. barrier.await();
  42. Thread.sleep(1000+id);
  43. System.out.println("Thread_"+id+" ....do its business ");
  44. barrier.await();
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. }

结果:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 3

3、CountDownLatch CyclicBarrier 辨析

CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以反复 使用。CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工 作线程达到指定屏障,再大家一起往下走。 在控制多个线程同时运行上,CountDownLatch 可以不限线程数量,而 CyclicBarrier 是固定线程数。 同时,CyclicBarrier 还可以提供一个 barrierAction ,合并多线程计算结果。

到此两种工具类分析完成。

发表评论

表情:
评论列表 (有 0 条评论,214人围观)

还没有评论,来说两句吧...

相关阅读