《java并发编程实战》 第七章 任务、线程的取消与关闭 男娘i 2022-03-22 14:44 197阅读 0赞 ### 《java并发编程实战笔记》 ### * 第七章 任务、线程的取消与关闭 * * 任务如何取消 * * 通过判断volatile类型的状态量----不一定可靠 * 通过中断-----实现取消最合理方法 * 可中断的阻塞函数如何响应中断 * 通过Future实现取消 * 当阻塞方法时是不可中断如何中断 * 线程服务如何停止 * * 同时关闭生产消费者:关闭生产者---消费者日志服务 * 事先约定好当生产消费某个数据:关闭生产者---消费者日志服务 * 局部变量Executor创建一批一次性任务,任务生命周期由方法私有 * 跟踪执行到一半被shutdownNow强行尝试取消的任务 * Runtime类注册关闭钩子 # 第七章 任务、线程的取消与关闭 # 要使任务和线程安全、快速、可靠的停止下来,并不是一件容易的事情。**java没有提供任何机制来安全地终止线程,但提供了中断协作机制,中断能使一个线程终止另一个线程的当前工作。** ## 任务如何取消 ## 任务取消的原因有很多,根据需求取消。在java中没有一种安全的抢占式方法来停止线程。 ### 通过判断volatile类型的状态量----不一定可靠 ### 根据书提供部分代码改写测试: public class PrimeGenerator implements Runnable{ private final ArrayList<BigInteger> primes = new ArrayList<BigInteger>(); private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; while(! cancelled) { p = p.nextProbablePrime(); synchronized(this){ primes.add(p); } } System.out.println("result is : "+p); } public void cancel() { cancelled = true;} public synchronized List<BigInteger> get(){ return new ArrayList<BigInteger>(primes); } } 通过在主线程中修改volatile类型的状态变量,PrimeGenerator线程根据cancelled进行判断,是否结束线程run方法运行。run方法跑完,并不意味着PrimeGenerator类不可用了,仍可以通过get方法获得PrimeGenerator中的素数搜索结果。 public class TestaSencondPrimes { public static void main(String[] args) { TestaSencondPrimes testaSencondPrimes = new TestaSencondPrimes(); try { List<BigInteger> result = testaSencondPrimes.aSecondOfPrimes(); System.out.println("reseach primes result:"); for(BigInteger i:result) System.out.println(i); } catch (InterruptedException e) { e.printStackTrace(); } } public List<BigInteger> aSecondOfPrimes() throws InterruptedException{ PrimeGenerator generator = new PrimeGenerator(); new Thread(generator).start(); try { Thread.sleep(10); //主线程休眠10毫秒 } finally { generator.cancel(); } return generator.get(); } } ![在这里插入图片描述][20190128232047245.png] ### 通过中断-----实现取消最合理方法 ### 通过不断查看volatile类型的状态变量是一种简单的取消策略,然而,如果任务除了检查状态变量外执行了阻塞方法,任务可能永远不会检查取消状态标志,此时永远不会结束任务。 例如当上诉例子改成BlockQueue后,producer会发生阻塞,此时只有用中断才能实现取消。例如: public class BrokenPrimeProducer extends Thread{ private final BlockingQueue<BigInteger> queue; private volatile boolean cancelled = false; public BrokenPrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger pBigInteger = BigInteger.ONE; while(! cancelled) { queue.put(pBigInteger = pBigInteger.nextProbablePrime()); System.out.println("成功往队列中插入一个素数"); } } catch (InterruptedException consumed) { } System.out.println("producer执行结束"); } public void cancel() { cancelled = true; } } 消费者端: public class TestBrokenPrimeProducer { public static void main(String[] args) throws InterruptedException { BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(2);//FIFO 队列大小设置为2 BrokenPrimeProducer producer = new BrokenPrimeProducer(primes); producer.start(); try { System.out.println(primes.take()); System.out.println(primes.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println("主线程休眠1s producer进入阻塞状态"); Thread.sleep(1000); System.out.println("producer阻塞1s后 中断produer"); producer.cancel(); System.out.println("中断producer线程后"); for(BigInteger integer:primes) { System.out.println("队列中仍有:"+integer); } } } } 运行结果: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70] 对于中断正确理解:并不会真正中断一个正在运行的线程,而只是发出了中断请求,然后由线程在下一个合适的时刻中断自己。 public class PrimeGenerator implements Runnable{ private final ArrayList<BigInteger> primes = new ArrayList<BigInteger>(); private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; while(! cancelled) { p = p.nextProbablePrime(); synchronized(this){ primes.add(p); } } System.out.println("result is : "+p); } public void cancel() { cancelled = true;} public synchronized List<BigInteger> get(){ return new ArrayList<BigInteger>(primes); } } 通过在主线程中修改volatile类型的状态变量,PrimeGenerator线程根据cancelled进行判断,是否结束线程run方法运行。run方法跑完,并不意味着PrimeGenerator类不可用了,仍可以通过get方法获得PrimeGenerator中的素数搜索结果。Thread类重要方法有如下几个。 public class Thread{ public void interrupt(){ .. } //中断线程 public boolean isInterrupted() { ....} //返回线程中断状态 public static boolean interrupted(){ ....} //若返回true原有的中断状态会被清除,必须对其作出处理 } 无论任务把中断视为取消还是其他某个中断响应操作,都应该小心保存执行线程的中断状态。如果除了将InterruptedException传递给调用者外还需要执行其他操作,那么应该在捕获InterruptedException之后恢复中断状态。**由于每个线程拥有各自的中断策略,除非你清楚中断对该线程的含义,否则就不该中断这个线程,捕获InterruptedException之后也要恢复中断状态。** 调用 Thread.currentThread().interrupt(); ### 可中断的阻塞函数如何响应中断 ### 当调用可中断的阻塞函数,例如Thread.sleep或者BlockingQueue.put等,有两种实用策略可用于处理InterruptedException: 1、传递异常,throws InterruptedException,从而使你的方法也成为可中断的阻塞方法。如果你不传递,在catch块中捕获了异常缺不做任何处理,你一定要清楚线程的中断策略,在调用栈中已经没有上层代码需要指定中断信息,否则都应该保存中断状态。 2、恢复中断状态,从而使调用栈的上层代码能对其进行处理 ### 通过Future实现取消 ### ExecutorService.submit将返回一个Future来描述任务,Future拥有cancel方法,该方法带有一个boolean类型参数mayInterruptIfRunning。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。 Future.cancel(boolean mayInterruptIfRunning) true:如果应该中断执行此任务的线程,则为 true Future.cancel(boolean mayInterruptIfRunning) false:允许正在运行的任务运行完成, 当executorService.submit一个Callable任务,且call方法有返回值,future的get方法才能返回任务的计算结果,若submit一个Runnable任务,get没有计算结果。 public class PrimeProducerRunnable implements Runnable { private final BlockingQueue<BigInteger> queue; public PrimeProducerRunnable(BlockingQueue<BigInteger> queue) { this.queue = queue; } @Override public void run() { // TODO Auto-generated method stub BigInteger pBigInteger = BigInteger.ONE; while(true) { try { queue.put(pBigInteger = pBigInteger.nextProbablePrime()); System.out.println("成功往队列中插入一个素数"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("在阻塞过程中发生了中断"); } } } } 通过ExecutorService.submit后,得到Future来描述任务,再通过cancel方法取消任务。 public class TestPrimerProducerFuture { public static void main(String[] args) throws InterruptedException { BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<>(3); ExecutorService executorService = new ScheduledThreadPoolExecutor(5); PrimeProducerRunnable primeProducerRunnable = new PrimeProducerRunnable(queue); Future<?> primeTask = executorService.submit(primeProducerRunnable); try { System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("阻塞1s,主线程休眠1s"); Thread.sleep(1000); primeTask.cancel(true); System.out.println("中断producer线程后"); for(BigInteger integer:queue) { System.out.println("队列中仍有:"+integer); } } } 运行结果: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70 1] ### 当阻塞方法时是不可中断如何中断 ### 许多可阻塞方法都是提前返回或者抛出InterruptedException来响应中断请求,然而并非所有的可阻塞方法或者机制都是能响应中断,**对于特殊的不可中断操作而被阻塞的线程,具体阻塞原因具体分析,可类似中断的手段来停止这些线程。** 常见的不可中断而阻塞的情行: **java.io包中同步Socket I/O**:在最常见的阻塞I/O形式就是对套接字的读写,虽然InputStream和OutputStream的read和write方法都不会响应中断,但是可以通过关闭底层的套接字,可以使执行read或者write方法而阻塞的线程抛出一个SocketException。 **java.io包中同步 I/O**:当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这回使其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在此链路上阻塞的线程都抛出AsynchronousCloseException,大多数标准的Channel都实现了InterruptibleChannel。 **Selector的异步I/O**:如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或者wakeup方法会使线程抛出ClosedSelectorException并提前返回。 **等待内置锁**:若一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以它不理会中断请求。 对于同步Socket阻塞情形,重新重写Thread的interrupt方法,即可关闭套接字从而中断socket阻塞,也可中断线程。 public class ReaderThread extends Thread{ private final Socket socket; private final InputStream inputStream; public ReaderThread(Socket socket) throws IOException{ this.socket = socket; this.inputStream = socket.getInputStream(); } public void interrupt() { try { socket.close(); } catch (Exception ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while(true) { int count = inputStream.read(buf); if(count < 0) break; else if(count > 0) processBuffer(buf,count); } } catch (Exception e) { // 运行线程退出 } } } ## 线程服务如何停止 ## 对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。 ### 同时关闭生产消费者:关闭生产者—消费者日志服务 ### 日志logger相当于消费者: public class LoggerThread extends Thread{ private final BlockingQueue<String> queue; public LoggerThread(BlockingQueue<String> queue) { super(); this.queue = queue; } public void run() { try { while(true) System.out.println("消费者 logger output : "+queue.take()); } catch (Exception ignored) { } } } 日志生产者:每1秒生产一条数据 public class ProducerThread extends Thread{ private final BlockingQueue<String> queue; Integer integer = 0; public ProducerThread(BlockingQueue<String> queue) { this.queue = queue; } public void run() { while(true) { try { queue.put(String.valueOf(integer)); System.out.println("往队列中生产一条数据 "+integer); integer ++; Thread.sleep(1000);//生产者每一秒生产一条数据 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 主函数测试程序,3秒后停止消费者线程,生产者并不知道消费者已停止,所以仍然往队列中写数据。 public class LogWriter { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<>(5); LoggerThread loggerThread = new LoggerThread(queue); ProducerThread producerThread = new ProducerThread(queue); loggerThread.start(); producerThread.start(); try { Thread.sleep(3000);//主线程休眠3s后关闭logger,发生队列阻塞报错 loggerThread.stop(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 运行结果:当队列满了后阻塞。因此要取消类似生产者–消费者线程时,需要同时取消两者。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70 2] ### 事先约定好当生产消费某个数据:关闭生产者—消费者日志服务 ### 使用毒丸对象:关闭生产者—消费者日志服务。毒丸的含义:书中原话,当得到这个对象时,线程立即停止。在FIFO队列中,毒丸对象将确保消费者在关闭之前首先完成队列中所有的工作,在提交毒丸对象之前提交的消费者所有工作都会被清理,而生产者在提交毒丸对象后,将不会再提交。晦涩难懂,**即在生产者消费者实现相约好,生产者生产某个对象、数据时,消费者消费到指定的数据、对象时,两者都停止工作。** ### 局部变量Executor创建一批一次性任务,任务生命周期由方法私有 ### 若某个方法要处理一批任务,并且所有的任务都处理完后才返回,那么可以通过一个私有的Executor来简化服务的生命周期管理,其中该executor的生命周期是由该方法控制的。 例如:checkMail方法能在多台主机上并行检查新邮件,通过创建一个私有的Executor,并向每台主机提交一个任务,当所有邮件检查任务都执行完后,关闭executor等待结束。 boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasNewMail = new AtomicBoolean(false); try { for (final String host : hosts) { exec.execute(new Runnable() { @Override public void run() { if (check(host)) hasNewMail.set(true); } }); } } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); } ### 跟踪执行到一半被shutdownNow强行尝试取消的任务 ### 当通过shutdownNow来强行关闭ExecutorService,会尝试取消正在执行的任务,并返回所有已经提交但是尚未开始任务,但是如何知道哪些任务已经开始但是尚未结束被强行取消了? 解决办法:重写了一个TrackingExecutor(实现了ExecutorService接口),在execute(Runnable runnable)中,尝试执行完runnable的run()方法后finally进行判断该runnable所在的线程是否发生过中断,若发生过中断视为被执行过程中shutdown的任务。 方法参考书上,书上给了个爬虫半成品用于保存没有开始爬,以及开始爬被取消的网站,由于效果不好,自己重写了一个。TrackingExecutor 实现AbstractExecutorService接口,只重写了execute方法。 public class TrackingExecutor extends AbstractExecutorService { private final Executor exec; private final Set<Runnable> taskCancelledAtShutDown = Collections.synchronizedSet(new HashSet<Runnable>()); public TrackingExecutor(Executor exec) { super(); this.exec = exec; } @Override public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public boolean isTerminated() { // TODO Auto-generated method stub return false; } @Override public List<Runnable> shutdownNow() { // TODO Auto-generated method stub return null; } @Override public void execute(Runnable runnable) { exec.execute(new Runnable() { @Override public void run() { try { runnable.run(); } finally { if(Thread.currentThread().isInterrupted()) { taskCancelledAtShutDown.add(runnable); System.out.println(Thread.currentThread().getName()+" 被尝试shutdown,但其实还是执行完了"); } } } }); } public List<Runnable> getCancelledTasks() { return new ArrayList<Runnable>(taskCancelledAtShutDown); } @Override public void shutdown() { } @Override public boolean isShutdown() { // TODO Auto-generated method stub return false; } } 多个TrackingTestRunnable 任务: public class TrackingTestRunnable implements Runnable { @Override public void run() { System.out.println("开始执行"+Thread.currentThread().getName()); for(Integer i = 0; i<60000000;i++) ; System.out.println("执行"+Thread.currentThread().getName()+"结束"); } } 主函数测试: public class TestTrackingExecutor { public static void main(String[] args) throws InterruptedException { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);//线程池大小为10 TrackingExecutor trackingExecutor = new TrackingExecutor(executor); TrackingTestRunnable runnable = new TrackingTestRunnable(); TrackingTestRunnable runnable1 = new TrackingTestRunnable(); TrackingTestRunnable runnable2 = new TrackingTestRunnable(); TrackingTestRunnable runnable3 = new TrackingTestRunnable(); TrackingTestRunnable runnable4 = new TrackingTestRunnable(); TrackingTestRunnable runnable6 = new TrackingTestRunnable(); TrackingTestRunnable runnable7 = new TrackingTestRunnable(); trackingExecutor.execute(runnable); trackingExecutor.execute(runnable1); trackingExecutor.execute(runnable2); trackingExecutor.execute(runnable3); trackingExecutor.execute(runnable4); trackingExecutor.execute(runnable6); trackingExecutor.execute(runnable7); List<Runnable> havenotCompleted = executor.shutdownNow();//虽然尽最大努力,但并不保证可以停止处理正在执行的任务 Thread.sleep(5000);//主线程如果不休眠等等各个任务尝试执行完,不会执行taskCancelledAtShutDown.add(runnable);getCancelledTasks不到被中断的任务 List<Runnable> cancelledTasks = trackingExecutor.getCancelledTasks(); System.out.println("开始运行到一般 被取消的任务有:"); for(Runnable r:cancelledTasks) System.out.println(new Thread(r).getName()); for(Runnable r:havenotCompleted) System.out.println("还没开始执行的任务是:"+new Thread(r).getName()); } } 运行结果:注意shutdownNow虽然说暴力尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表,但只是尽最大努力,但并不保证可以停止处理正在执行的任务。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70 3] ### Runtime类注册关闭钩子 ### 关闭钩子(shutdown hook)指通过Runtime.addShutdownHook注册的但尚未开始的线程。JVM不能保证关闭钩子的调用顺序,当所有的关闭钩子执行结束,那么JVM运行终结器。 关闭钩子用于实现服务或者应用程序的清理工作,例如删除临时文件夹、清除无法由操作系统自动清除的资源。关闭钩子(即尚未开始执行的线程)要为线程安全。例如通过注册一个关闭钩子来停止日志服务。 public void start(){ Runtime.getRuntime().addShutdownHook(new Thread(){ //注册一个关闭钩子 public void run() { try{ LogService.this.stop();} cath(InterruptedException ignored){ } } }) } [20190128232047245.png]: /images/20220322/b6ddda2e7df24be38bdc3993f1d320e7.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70]: /images/20220322/f758d8c3b2604e1a867ea0ac3104b95b.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70 1]: /images/20220322/ccaa16e2a6b4444db238e66456dee6d1.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70 2]: /images/20220322/0f23059dfa0342729db156e667851d0d.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTI2MjQ1Mw_size_16_color_FFFFFF_t_70 3]: /images/20220322/de277834e4dc42ffa7c0f2d20cbb8d66.png
相关 Java并发编程:线程池与定时任务实战 在Java并发编程中,线程池和定时任务是两种常用的多线程资源管理和调度方式。 1. 线程池: 线程池是一种管理多个线程的工具。创建线程池有以下优点: - 提高资源利用率:一 妖狐艹你老母/ 2024年09月15日 18:57/ 0 赞/ 16 阅读
相关 Java并发编程:线程池与任务调度实战示例 Java并发编程中的线程池和任务调度是两种常用的多线程管理方式。下面将通过实例详细解释这两种概念。 1. **线程池(ThreadPool)**: 线程池是一种预先创建固定 快来打我*/ 2024年09月11日 01:48/ 0 赞/ 22 阅读
相关 并发编程实战学习笔记(五)——取消与关闭 题记 在Java中没有一种安全的抢占方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。 响应 布满荆棘的人生/ 2022年09月28日 11:44/ 0 赞/ 122 阅读
相关 java线程池介绍(java并发编程实战第6章) 为什么需要使用线程池? one-thread-per-request可能带来的问题: 1. 线程的创建和销毁会占用一定的资源. 如果请求频繁而对请求的处理是轻量级的(大多的w 我不是女神ヾ/ 2022年07月12日 04:42/ 0 赞/ 188 阅读
相关 《Java并发编程实战》读书笔记-第2章 线程安全 1,什么是线程安全的? 当多个线程访问某个类时,不管运行时环境彩何种调试方式或者这些线程如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为 向右看齐/ 2022年07月11日 04:58/ 0 赞/ 174 阅读
相关 java多线程并发之旅-31-任务的关闭与取消 java 对于终止线程的考虑 Java没有提供任何机制来安全地(抢占式方法)终止线程, 虽然Thread.stop和suspend等方法提供了这样的机制,但是由于存在着 男娘i/ 2022年04月23日 15:38/ 0 赞/ 392 阅读
相关 《java并发编程实战》 第六章 任务执行框架 《java并发编程实战笔记》 第六章 结构化并发应用程序-----任务执行 介绍Executor框架前的背景知识 Executor框架 梦里梦外;/ 2022年04月23日 07:16/ 0 赞/ 168 阅读
相关 《java并发编程实战》 第七章 任务、线程的取消与关闭 《java并发编程实战笔记》 第七章 任务、线程的取消与关闭 任务如何取消 通过判断volatile类型的状态量----不一定可靠 男娘i/ 2022年03月22日 14:44/ 0 赞/ 198 阅读
相关 《java并发编程实战》 第八章 线程池的使用 《java并发编程实战笔记》 第八章 线程池的使用 与执行策略之间存在隐形耦合的任务(不可轻易更改线程池) 设置线程池的大 阳光穿透心脏的1/2处/ 2022年03月21日 04:59/ 0 赞/ 208 阅读
还没有评论,来说两句吧...