JAVA并发工具包-线程控制 £神魔★判官ぃ 2022-03-20 14:45 219阅读 0赞 ## 闭锁CountDownLatch ## java.util.concurrent.CountDownLatch 是一个并发控制器,协调一个或多个线程合作执行,使用CountDownLatch构造器实例对象时指定countDown的次数,如果线程A调用CountDownLatch.await方法,那么当countDown次数递减到零时,线程A方可继续向下执行。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70] package com.hk; import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(3); Thread1 th1 = new Thread1(latch); Thread2 th2 = new Thread2(latch); new Thread(th1).start(); new Thread(th2).start(); Thread.sleep(4000); } } class Thread1 implements Runnable{ CountDownLatch latch = null; public Thread1(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程一释放..."); } } class Thread2 implements Runnable { CountDownLatch latch = null; public Thread2(CountDownLatch latch) { this.latch = latch; } public void run() { try { Thread.sleep(500); this.latch.countDown(); Thread.sleep(500); this.latch.countDown(); Thread.sleep(500); this.latch.countDown(); System.out.println("线程二释放..."); } catch (InterruptedException e) { e.printStackTrace(); } } } ## 栅栏CyclicBarrier ## java.util.concurrent.CyclicBarrier 类协调多线程同步执行,所有线程必须等待的一个栅栏对象,直到所有线程都到达此处,然后所有线程才可以继续做其他事情。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70 1] 定义栅栏对象时需要指定多少个线程同时到达栅栏对象时释放线程,需要指定一个数量,栅栏释放线程时可以执行一个操作(此操作是一个线程对象)。如下代码: package com.hk; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier { public static void main(String[] args) { Runnable action1 = new Runnable() { public void run() { System.out.println("栅栏动作一执行..."); } }; Runnable action2 = new Runnable() { public void run() { System.out.println("栅栏动作啊执行..."); } }; CyclicBarrier barrier1 = new CyclicBarrier(2, action1); CyclicBarrier barrier2 = new CyclicBarrier(2, action2); MyThread barrierRunnable1 = new MyThread(barrier1, barrier2); MyThread barrierRunnable2 = new MyThread(barrier1, barrier2); new Thread(barrierRunnable1).start(); new Thread(barrierRunnable2).start(); } } class MyThread implements Runnable{ CyclicBarrier barrier1 = null; CyclicBarrier barrier2 = null; public MyThread( CyclicBarrier barrier1, CyclicBarrier barrier2) { this.barrier1 = barrier1; this.barrier2 = barrier2; } public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 栅栏一 等待所有线程完成"); this.barrier1.await(); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 栅栏一 等待所有线程完成"); this.barrier2.await(); System.out.println(Thread.currentThread().getName() + " 完成..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } ## 交换机Exchanger ## java.util.concurrent.Exchanger 类表示一种两个线程可以进行互相交换对象的会和点。两个线程可以通过Exchanger交换必须拥有的对象。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70 2] package com.hk; import java.util.concurrent.Exchanger; public class TestExchanger { public static void main(String[] args) { Exchanger exchanger = new Exchanger(); ExgThread exchangerRunnable1 = new ExgThread(exchanger, "A"); ExgThread exchangerRunnable2 = new ExgThread(exchanger, "B"); new Thread(exchangerRunnable1).start(); new Thread(exchangerRunnable2).start(); } } class ExgThread implements Runnable{ Exchanger exchanger = null; Object object = null; public ExgThread(Exchanger exchanger, Object object) { this.exchanger = exchanger; this.object = object; } public void run() { try { Object previous = this.object; this.object = this.exchanger.exchange(this.object); System.out.println( Thread.currentThread().getName() + " 交换对象 " + previous + " 为 " + this.object ); } catch (InterruptedException e) { e.printStackTrace(); } } } ## 信号量 Semaphore ## java.util.concurrent.Semaphore 类是一个计数信号量。这就意味着它具备两个主要方法: acquire() release() 计数信号量由一个指定数量的 “许可” 初始化。每调用一次 acquire(),一个许可会被调用线程取走。每调用一次 release(),一个许可会被返还给信号量。因此,在没有任何 release() 调用时,最多有 N 个线程能够通过 acquire() 方法,N 是该信号量初始化时的许可的指定数量。 Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。Semaphore有两个构造函数,参数permits表示许可数,它最后传递给了AQS的state值。线程在运行时首先获取许可,如果成功,许可数就减1,线程运行,当线程运行结束就释放许可,许可数就加1。如果许可数为0,则获取失败,线程位于AQS的等待队列中,它会被其它释放许可的线程唤醒。在创建Semaphore对象的时候还可以指定它的公平性。 package com.hk; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { private Semaphore smp = new Semaphore(3); private Random rnd = new Random(); //成员内部类 class TaskThread implements Runnable{ private String id; TaskThread(String id){ this.id = id; } public void run(){ try { smp.acquire(); System.out.println("线程 " + id + " 运行..."); Thread.sleep(rnd.nextInt(1000)); smp.release(); System.out.println("线程 " + id + " 终止..."); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args){ SemaphoreTest semaphore = new SemaphoreTest(); //注意我创建的线程池类型, ExecutorService se = Executors.newCachedThreadPool(); se.submit(semaphore.new TaskThread("a")); se.submit(semaphore.new TaskThread("b")); se.submit(semaphore.new TaskThread("c")); se.submit(semaphore.new TaskThread("d")); se.submit(semaphore.new TaskThread("e")); se.submit(semaphore.new TaskThread("f")); se.shutdown(); } } ## 执行器服务 ExecutorService ## java.util.concurrent.ExecutorService 接口表示一个异步执行机制,其实现类可以使用多钟方式在后台执行任务。两个实现类有ThreadPoolExecutor和ScheduledThreadPoolExecutor。ExecutorService 接口提供的方法有: execute(Runnable) 异步执行java.lang.Runnable 对象; package com.hk; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorServiceTest { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("任务异步执行..."); } }); executorService.shutdown(); } } submit(Runnable) 异步执行java.lang.Runnable, 它返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕 package com.hk; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Runnable() { public void run() { System.out.println("异步任务执行..."); } }); Object obj = future.get();//返回null,异步任务执行成功 System.out.println("Obj = "+obj); executorService.shutdown(); } } submit(Callable) 异步执行java.lang.Runnable,Callable 实例除了它的 call() 方法能够返回一个结果。 package com.hk; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("异步任务执行..."); return "Callable Success"; } }); System.out.println("future.get() = " + future.get()); executorService.shutdown(); } } invokeAny(…) 方法要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。无法保证返回的是哪个 Callable 的结果 - 只能表明其中一个已执行结束。 package com.hk; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "任务一"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "任务二"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "任务三"; } }); String result = executorService.invokeAny(callables); System.out.println("执行的异步任务 = " + result); executorService.shutdown(); } } invokeAll(…) 方法将调用你在集合中传给 ExecutorService 的所有 Callable 对象。invokeAll() 返回一系列的 Future 对象,通过它们你可以获取每个 Callable 的执行结果。 package com.hk; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "任务一"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "任务二"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "任务三"; } }); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("执行结果 = " + future.get()); } executorService.shutdown(); } } ## 线程池执行者 ThreadPoolExecutor ## ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务; ThreadPoolExecutor构造方法参数 corePoolSize: 核心线程池大小 maximumPoolSize: 最大线程池大小 keepAliveTime: 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以 allowCoreThreadTimeOut(true)使得核心线程有效时间 TimeUnit: keepAliveTime时间单位 workQueue: 阻塞任务队列 threadFactory: 新建线程工厂 RejectedExecutionHandler: 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70 3] int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 5000; ExecutorService threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() ); ## 定时执行者服务 ScheduledExecutorService ## java.util.concurrent.ScheduledExecutorService 是一个 ExecutorService, 它能够将任务延后执行,或者间隔固定时间多次执行。 任务由一个工作者线程异步执行。ScheduledExecutorService,是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行。 package com.hk; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ExecutorServiceTest { public static void main(String[] args) throws Exception { ScheduledExecutorService service = Executors.newScheduledThreadPool(10); long initialDelay = 1; long period = 1; long period2 = 2; // 从1秒钟之后,每隔1秒钟执行一次job1 service.scheduleAtFixedRate(new MyJob("job1"), initialDelay, period, TimeUnit.SECONDS); // 从1秒钟之后,每隔2秒钟执行一次job2 service.scheduleWithFixedDelay(new MyJob("job2"), initialDelay, period2, TimeUnit.SECONDS); //service.shutdown(); } } class MyJob implements Runnable { private String jobName; MyJob() { } MyJob(String jobName) { this.jobName = jobName; } public void run() { System.out.println(jobName + " is running"); } } ScheduledExecutorService的方法 schedule (Callable task, long delay, TimeUnit timeunit) 这个方法计划指定的 Callable 在给定的延迟之后执行。 这个方法返回一个 ScheduledFuture,通过它你可以在它被执行之前对它进行取消,或者在它执行之后获取结果。 package com.hk; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class ExecutorServiceTest { public static void main(String[] args) throws Exception { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); //延时5秒后执行,使用scheduledFuture取得返回结果 ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() { public Object call() throws Exception { System.out.println("Executed!"); return "Called!"; } }, 5, TimeUnit.SECONDS); //取消任务执行 //scheduledFuture.cancel(true); System.out.println("返回结果 = " + scheduledFuture.get()); scheduledExecutorService.shutdown(); } } schedule (Runnable task, long delay, TimeUnit timeunit) 延时执行任务,无法获取任务执行结果。 scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit) 定时执行某一个任务,如果给定任务的执行抛出了异常,该任务将不再执行。如果没有任何异常的话,这个任务将会持续循环执行到 ScheduledExecutorService 被关闭。 如果一个任务占用了比计划的时间间隔更长的时候,下一次执行将在当前执行结束执行才开始。计划任务在同一时间不会有多个线程同时执行。 scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit) 除了 period 有不同的解释之外这个方法和 scheduleAtFixedRate() 非常像。 scheduleAtFixedRate() 方法中,period 被解释为前一个执行的开始和下一个执行的开始之间的间隔时间。 而在本方法中,period 则被解释为前一个执行的结束和下一个执行的结束之间的间隔。因此这个延迟是执行结束之间的间隔,而不是执行开始之间的间隔。 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70]: /images/20220320/fa9c0b1618b346e0812e6337dfe4a93a.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70 1]: /images/20220320/bd9a66e9239d461faa9e9877c5a26b05.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70 2]: /images/20220320/6c8f5f29c74c412bb3173e8c29191cb5.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FpeGlhbmdfY2hlbg_size_16_color_FFFFFF_t_70 3]: /images/20220320/151c05efc1634b2294e79c1b274d1cab.png
相关 Java多线程并发控制不当的实例 在Java多线程编程中,如果不正确地进行并发控制,可能会导致数据不一致、死锁等问题。以下是一个简单的实例: **场景:银行存款系统** 1. **问题:未同步修改账户余额* 深藏阁楼爱情的钟/ 2024年09月15日 15:51/ 0 赞/ 17 阅读
相关 Java多线程并发控制不当示例 在Java多线程编程中,如果并发控制不当,可能会导致数据竞争、死锁等问题。以下是一个简单的示例: ```java // 假设我们有一个共享资源——一个整数变量count cl 野性酷女/ 2024年09月12日 21:09/ 0 赞/ 25 阅读
相关 Java多线程并发控制问题实例 在Java多线程编程中,经常会遇到并发控制问题。以下是一个常见的实例: 案例:生产者-消费者模型 1. 问题描述: - 生产者(Producer)负责生成产品,并将其 亦凉/ 2024年09月12日 08:12/ 0 赞/ 20 阅读
相关 Java并发控制:如何避免线程饥饿问题 线程饥饿(Thread Starvation)是多线程编程中常见的问题,主要发生在资源有限(如CPU时间、内存空间等)的场景。 以下是一些避免线程饥饿的方法: 1. **公 客官°小女子只卖身不卖艺/ 2024年09月11日 06:09/ 0 赞/ 31 阅读
相关 并发控制难题:Java多线程安全问题 Java多线程安全问题是由于并发编程中的一些特性导致的,主要包括以下几点: 1. 数据共享:多个线程同时访问同一个对象,如果没有正确地同步,就可能出现数据不一致的问题。 2 朴灿烈づ我的快乐病毒、/ 2024年09月10日 09:33/ 0 赞/ 24 阅读
相关 Java并发工具包-countDownLatch,同步屏障CyclicBarrier,控制并发线程数的Semaphore (信号量) 1 countDownLatch countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。是通过一个计数器来实现的,计数器的初始值是线程的数量 偏执的太偏执、/ 2022年11月27日 15:48/ 0 赞/ 149 阅读
相关 理解Java并发工具包线程池的设计 为什么需要线程池? 答:主要原因是因为创建一个线程开销太大,尤其是对大量的小任务需要执行这种场景。 在Java里面创建一个线程,需要包含的东西: (1)它为一个线程堆 一时失言乱红尘/ 2022年05月13日 12:28/ 0 赞/ 128 阅读
相关 JAVA并发工具包-线程控制 闭锁CountDownLatch java.util.concurrent.CountDownLatch 是一个并发控制器,协调一个或多个线程合作执行,使用CountDo £神魔★判官ぃ/ 2022年03月20日 14:45/ 0 赞/ 220 阅读
相关 实现 Java 多线程并发控制框架 2006 年 8 月 14 日 > Java 提供了语言级别的线程支持,所以在 Java 中使用多线程相对于 C,C++ 来说更简单便捷,但本文并不是介绍如何在 Java 中 秒速五厘米/ 2021年12月24日 12:05/ 0 赞/ 203 阅读
还没有评论,来说两句吧...