Java多线程——深入理解线程池

水深无声 2021-12-03 05:19 713阅读 0赞

Java多线程——深入理解线程池

一、线程池简介

  1、线程池的应用

   几乎所有需要异步或者并发执行任务的程序都可以使⽤线程池

  为什么需要引入线程池?线程池的好处到底体现在哪?

  2、线程池的优点

   ①降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的损耗
   ②提高响应速度:当新任务到达时,任务不需要等待线程创建就可以立即执行
   ③提高线程的可管理性:使用线程池可以统一进行线程的分配、调度与监控

  线程池的分类有哪些?相互之间有什么关系?

  3、线程池的分类

在这里插入图片描述


  线程池内部到底是如何工作的?线程池有怎样的执行流程?

二、线程池执行任务的流程

  当一个Runnable或Callable对象到达线程池时,执行策略如下:

在这里插入图片描述

  1、首先判断核心线程池中的线程是否都在执行任务,如果不是(有空闲线程或还有核心线程没有被创建),则创建一个新的核心工作线程(即便还有空闲线程)来执行任务。否则执行第2步(核心线程池中数量达到最大值且都在跑任务)

  2、判断工作队列(BlockingQueue)是否已满,如果工作队列没有满,将提交任务存储到工作队列中等待核心池的调度,否则,若工作队列已满,进入步骤3。

  3、判断当前线程池中的线程数是否达到最大值maxiumSize,若未达到最大值,则创建新的工作线程执行任务,否则,若已达到最大值且所有线程均在跑,将任务交给饱和策略处理。

  ThreadPoolExecutor执行execute()的工作流程?

  上面所述都是逻辑性的东西,那么在实际开发中,ThreadPoolExecutor执行execute()后,到底会是怎样的过程。

在这里插入图片描述

  简单分析一下execute()的流程:

  1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

  2、如果运⾏的线程等于或多于corePoolSize,则将任务加⼊BlockingQueue。

  3、如果无法将任务加⼊BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这⼀步骤需要获取全局锁)

  4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

  为什么ThreadPoolExecutor要设计成这个样子

  为了在执⾏execute()方法时,尽可能地避免获取全局锁(那将会是⼀个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调⽤都是执行步骤2,而步骤2不需要获取全局锁。

  什么是工作线程?

  工作线程:线程池在创建线程时,会将线程封装成工作线程Worker。工作线程有两个特点

   ①在execute()方法中创建一个线程时,会让这个工作线程执行当前任务
   ②这个工作线程执行完当前任务后,会反复从BlockingQueue获取任务来执行


  如何自定义创建一个线程池并使用?

三、自定义线程池的使用

  纯手工创建线程池

  如何创建线程池?

   1、通过ThreadPoolExecutor的构造函数来创建线程池

  1. ThreadPoolExecutor构造函数:
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. (ThreadFactory threadFactory,)
  8. RejectedExecutionHandler handler)

  创建线程池时的参数都代表什么含义?

    ①corePoolSize(核心池的大小):当提交任务到线程池时,核心线程池会创建一个新的线程来执行任务,即使核心池中有其他空闲线程,也会创建新线程,一直到线程数达到核心池的大小为止。此时才会复用已有线程(使用空闲线程)

   PS:调用prestartAllCoreThreads():线程池会提前创建并启动所有核心线程

    ②workQueue(任务队列):用于保存等待执行任务的阻塞队列。可以选择以下几个阻塞队列(JDK内置)
      a)ArrayBlockingQueue:基于数组结构的有界阻塞队列,此队列按照FIFO(先入先出)原则对元素进行排序。
      b)LinkedBlockingQueue:基于链表结构的无界阻塞队列,按照FIFO排序元素,吞吐量高于ArrayBlockingQueue。定义上的无界,实际上队列的容量为Integer.MAX_VALUE。(Executors.newFixedThreadPool()采用此队列)
      c)SynchronousQueue:一个不存储元素的阻塞队列(没有容量),每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,通常吞吐量比LinkedBlockingQueue还要高。该队列的使用流程:将任务与空闲线程进行配对,本身没有容量,当任务到来时,判断线程池(除核心线程池外)中是否有空闲线程,有则配对,将任务分配给空闲线程,没有则阻塞任务。(Executors.newCachedThreadPool()采用此队列)

  不存储元素的阻塞队列SynchronousQueue是怎样执行的?

    概括的说一下SynchronousQueue的执行流程(及CachedThreadPool的执行流程),与普通的队列还是有所不同。
    由于SynchronousQueue(以下简称S)是一个没有容量的队列,因此不能保存任务,主线程执行execute()后,通过调用SynchronousQueue.offer()将任务给S队列,而S队列此时需要判断,是否有空闲线程通过SynchronousQueue.poll()执行并通知S队列
    如果有,则任务与空闲线程匹配成功,如果没有的话有两种情况:1、没任务了2、没空闲线程了,如果是没任务了,则execute()直接结束,如果没空闲线程了,CachedThreadPool再开一个新的线程执行任务。
    当空闲线程执行完当前任务是,就会执行SynchronousQueue.poll()通知S队列当前已存在一个空闲线程了,可以分配任务给它。
      d)PriorityBlockingQueue:具有优先级的无界阻塞队列。
    ③maximumPoolSize(线程池最大线程数量):线程池允许创建的最大线程数,如果队列已满并且已创建的线程数小于此参数,则线程池会再创建新的线程执行任务。否则,调用饱和策略处理。如果工作队列采用无界队列(队列不可能满),此参数无意义
    ④keepAliveTime(线程保持活动时间):线程池(不是核心线程池中的)的工作线程空闲后,保持存活的时间。若任务很多,并且每个任务执行的时间较短,可以调大此参数来提高线程的利用率
    ⑤unit:keepAliveTime的单位。如 天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)等。使用TimeUnit.xxx调用(eg:TimeUnit.HOURS)
    ⑥ThreadFactory(自定义的在线程池中创建线程):将Runnable转换为Thraed,以此对线程池的属性(名字、优先级。。)进行自定义。(利用Thread newThread(Runnable r))
    ⑦RejectedExecutionHandler (饱和策略):当队列和线程池都满了,说明线程池处于饱和状态。此时采用饱和策略来处理任务,默认采用AbortPolicy,JDK一共内置4种饱和策略
     a)AbortPolicy:表示无法处理新任务,抛出异常,JDK默认采用此策略
     b)CallerRunsPolicy:等待调用者线程(创建/调用线程池的那个线程)空闲后运行任务
     c)DiscardOldestPolicy:丢弃阻塞队列中最近的一个任务,并执行当前任务
     d)DiscardPolity:不处理,直接将新任务丢弃,也不报错

  如何使用线程池?

   2、执行线程池

    向线程池提交任务,execute()和submit()方法
    execute():用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功。
    submit():用于提交需要返回值的任务。线程池会返回⼀个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过Future的get()方法来获取返回值。
      a)FutureTask类执行任务只执行一次,并且会阻塞其他线程
      b)Future.get()会阻塞其他线程,一直等到当前Callable线程执行完毕拿到返回值为止

  如何关闭线程池?

   3、关闭线程池

     ①原理:遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止

     shutdown():将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执⾏任务的线程。让线程池把线程池里面正在执行的,暂停的,等待执行的任务执行完,才将线程池停下来。(比较安全)
     shutdownNow():将线程池的状态设置成STOP,然后尝试停⽌所有的正在执⾏暂停任务(wait/sleep...)的线程,并返回等待执⾏任务(等待在工作队列里的那些任务集合)的列表。

    ②isShutdown()和isTerminaed()
      a)isShutdown():无论是shutdown()还是shutdownNow(),isShutdown⽅法都会返回true,不应该再往里面加任务了,所以,每次加任务之前先判断一下是否停止,停止了就不能往里加任务了
      b)isTerminaed():当所有的任务(shutdown()还得执行一会)都已关闭后,才表示线程池关闭成功,这时调⽤isTerminaed⽅法(真正的终止)会返回true,线程池真正的停止了。
      PS:通常都调用shutdown()(让正在执行和等待的任务跑完),如果任务不⼀定要执行完,可以调用shutdownNow()。

      举两个栗子

  1. import java.time.LocalDateTime;
  2. import java.util.concurrent.*;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. /**
  5. * Created by xiaoaxiao on 2019/7/19
  6. * Description: 线程池创建测试 execute(包含ThreadFactory的使用)
  7. */
  8. public class ThreadPoolTest1 {
  9. public static void main(String[] args) {
  10. ExecutorService executorService = new ThreadPoolExecutor(
  11. 5,
  12. 8,
  13. 1,
  14. TimeUnit.MINUTES,
  15. new ArrayBlockingQueue<>(10),
  16. // 自定义线程池中线程的属性
  17. new ThreadFactory() {
  18. // 线程安全的Integer
  19. private final AtomicInteger id = new AtomicInteger(0);
  20. @Override
  21. public Thread newThread(Runnable r) {
  22. Thread thread = new Thread(r);
  23. thread.setName("Thread-Execute-Task-"
  24. +id.getAndIncrement());
  25. return thread;
  26. }
  27. }
  28. );
  29. for (int i=0;i<10;i++){
  30. // 线程池启动线程
  31. executorService.execute(new Runnable() {
  32. @Override
  33. public void run() {
  34. System.out.println(Thread.currentThread().getName()
  35. +" "+ LocalDateTime.now());
  36. }
  37. });
  38. }
  39. }
  40. }
  41. package com.xiaoaxiao.test.thread_test.thread_pool_test;
  42. import java.time.LocalDateTime;
  43. import java.util.ArrayList;
  44. import java.util.List;
  45. import java.util.concurrent.*;
  46. /**
  47. * Created by xiaoaxiao on 2019/7/19
  48. * Description: 线程池创建测试 submit
  49. */
  50. public class ThreadPoolTest2 {
  51. public static void main(String[] args) {
  52. // 创建一个线程池
  53. ExecutorService executorService = new ThreadPoolExecutor(
  54. 5,
  55. 8,
  56. 10,
  57. TimeUnit.MINUTES,
  58. new ArrayBlockingQueue<>(10)
  59. );
  60. final List<Future<String>> futureList = new ArrayList<>();
  61. for (int i=0;i<10;i++){
  62. Future<String> future = executorService.submit(new Callable<String>() {
  63. @Override
  64. public String call() throws Exception {
  65. return (Thread.currentThread().getName()+" "+ LocalDateTime.now());
  66. }
  67. });
  68. futureList.add(future);
  69. }
  70. for (Future<String> future : futureList){
  71. try {
  72. System.out.println(future.get());
  73. } catch (InterruptedException e) {
  74. e.printStackTrace();
  75. } catch (ExecutionException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. // 停止线程池
  80. executorService.shutdownNow();
  81. System.out.println("isShutdown?"+executorService.isShutdown());
  82. System.out.println("isTerminated?"+executorService.isTerminated());
  83. }
  84. }

   4、合理配置线程池

   ① CPU密集型应配置尽可能小的线程,N*CPU(核数)+1
   ② IO密集型应配置尽可能多的线程,2NCPU(核数)
   ③ Runtime.getRuntime().availableProcessors():获取当前设备的CPU个数
   ④采用优先级队列PriorityBlockingQueue处理,让优先级高的任务先执行,可能会导致优先级低的任务始终不能执行。(有时可以将执行时间短的线程当做优先级高的线程先执行)
   ⑤尽量使用有界队列,队列满就抛出异常,增加系统的稳定性和预警能力。不要使用无界队列,如果队列越来越大,可能会撑爆内存。

发表评论

表情:
评论列表 (有 0 条评论,713人围观)

还没有评论,来说两句吧...

相关阅读

    相关 深入理解java线

    线程池的优势 1. 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗; 2. 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线

    相关 深入理解Java线

    [Java并发编程:线程池的使用][Java] Java并发编程:线程池的使用   在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有

    相关 深入理解Java线

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频

    相关 深入理解Java线

    原文出处: 海 子 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短

    相关 深入理解java 线

     在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:   如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,