Java并发工具之线程池

灰太狼 2023-07-12 06:26 174阅读 0赞

1. 什么是线程池?

  1. 线程的创建和销毁是有一定的开销的,为了减少开销(内存和垃圾回收),我们通过创建线程池来执行大量的任务,避免反复创建并销毁线程所带来的问题,如果不用线程池,一旦任务数量过多,新建线程过多可能会OOM异常,即内存溢出
  2. 好处:

    • 限制线程资源的总量,复用每一个线程,同一管理资源
    • 加快响应速度,合理利用CPU和内存
  3. 适用场景:

    1. 服务器接收大量请求(Tomcat本身就是用线程池来实现的)
    2. 开发中,如果需要创建5个以上的线程,就可以用线程池

2. 创建和停止线程池?

2.1. 线程池构造函数的参数


































参数 含义
corePoolSize 核心线程数
maxPoolSize 最大线程数
keepAliveTime 保持存活时间
workQueue 任务存储队列
threadFactory 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程
Handler 由于线程池无法接受你所提交的任务的拒绝策略
  1. corePoolSize :线程完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务;
  2. maxPoolSize:线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上线,就是最大线程数;

    什么情况下扩展到maxPoolSize?

    1. 1. 如果线程数小于corePoolSize ,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务;
    2. 2. 如果线程数等于或大于corePoolSize 但是少于maxPoolSize,则将任务放入workQueue队列(指定队列的容量);
    3. 3. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务;
    4. 4. 如果队列已满,并在线程数大于maxPoolSize,则执行Handler拒绝策略;

    总之,是否需要增加线程的判断顺序是:

    1. corePoolSize -》workQueue-》maxPoolSize

    增减线程的特点:

    1. 1. 通过corePoolSize maxPoolSize相同,就可以创建固定大小的线程池;
    2. 2. 线程池希望保持较少的线程数,并且只有在负载变得很大的时候才增加它;
    3. 3. 通过设置maxPoolSize为很高的值,可以允许线程池容纳任意数量的并发任务;
    4. 4. 只有在队列填满时才多于corePoolSize 的线程,所以如果你使用的是无界队列,那么线程数就不会超过corePoolSize
  3. keepAliveTime:如果线程池当前的线程数多于corePoolSize ,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止,比如:corePoolSize 为5,当前线程数为10,如果线程空闲了,就会恢复到5;
  4. ThreadFactory:用来创建线程,默认是非守护线程,优先级是5,一共有10个等级;
  5. workQueue:有3种最常见的队列类型:

    1. 1. 直接交接:SynchronousQueue,本身没有容量,maxpoolSize要设置大一些;
    2. 2. 无界队列:LinkedBlockingQueuemaxpoolSize设置多大都没有用,会一直缓存新增的任务,有可能造成OOM
    3. 3. 有界队列:ArrayBlockingQueue,队列容量指定,满了之后,就创建新的线程;

2.2 线程池应该手动创建还是自动创建

Excutors类创建线程池的方法如下:

  1. 1. newFixedThreadPool:由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成OOM
  2. 2. newSingleThreadExecutor:和newFixedThreadPool的原理基本一样,都采用LinkedBlockingQueue无界队列,只不过把corePoolSize maxPoolSize都改成了1,请求堆积的时候也会占用大量的内存。
  3. 3. newCachedThreadPool:可缓存线程池,使用SynchronousQueue直接队列,corePoolSize 0maxPoolSize设置为Integer.Max_Value,线程可以无限创建,但是请求过多且无法及时处理完毕,也会导致OOM
  4. 4. newScheduledThreadPool:支持定时以及周期性任务执行的线程池,采用DelayedWorkQueue延迟队列,可以指定延迟时间执行任务,还可以以一定时间间隔来运行。

正确创建线程池的方法是手动创建:根据不同的业务场景,自己设置线程池参数,比如内存大小,线程名字等等。

2.3 线程池里的线程数量设定为多少比较合适?

  • CPU密集型:比如八核处理器,那可以把corePoolSize 设置为核心数的1-2倍;
  • 耗时IO型(读写文件):这种类型的CPU是不工作的,可以设置为核心数的很多倍
  • 线程数=CPU核心数*(1+平均等待时间/平均工作时间)

2.4 停止线程池的正确方法

  • shutdown,初始化整个关闭过程,不是直接关闭,把正在执行以及队列中等待的任务执行完毕再关闭,然后拒绝新的任务;

    public class StopThreadPool {

    1. public static void main(String[] args) throws InterruptedException {
    2. ExecutorService executorService = Executors.newFixedThreadPool(10);
    3. for (int i = 0; i < 50; i++) {
    4. executorService.execute(new Runnable() {
    5. @Override
    6. public void run() {
    7. try {
    8. Thread.sleep(500);
    9. System.out.println("执行已经提交的线程:" +Thread.currentThread().getName());
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. });
    15. }
    16. Thread.sleep(1000);
    17. executorService.shutdown();
    18. executorService.execute(new Runnable() {
    19. @Override
    20. public void run() {
    21. System.out.println("继续提交线程池");
    22. }
    23. });
    24. }

    }

    执行已经提交的线程:pool-1-thread-1
    执行已经提交的线程:pool-1-thread-2
    执行已经提交的线程:pool-1-thread-7
    执行已经提交的线程:pool-1-thread-8
    执行已经提交的线程:pool-1-thread-3
    执行已经提交的线程:pool-1-thread-4
    执行已经提交的线程:pool-1-thread-5
    执行已经提交的线程:pool-1-thread-6
    Exception in thread “main” 执行已经提交的线程:pool-1-thread-1
    执行已经提交的线程:pool-1-thread-9
    执行已经提交的线程:pool-1-thread-10
    执行已经提交的线程:pool-1-thread-2
    执行已经提交的线程:pool-1-thread-7
    执行已经提交的线程:pool-1-thread-8
    java.util.concurrent.RejectedExecutionException: Task tools.threadpool.StopThreadPool$2@4b67cf4d rejected from java.util.concurrent.ThreadPoolExecutor@7ea987ac[Shutting down, pool size = 10, active threads = 10, queued tasks = 26, completed tasks = 14]

    1. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    2. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    3. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    4. at tools.threadpool.StopThreadPool.main(StopThreadPool.java:24)

    执行已经提交的线程:pool-1-thread-4
    执行已经提交的线程:pool-1-thread-3
    执行已经提交的线程:pool-1-thread-6
    执行已经提交的线程:pool-1-thread-5

可以看出,shutdown后,再次提交是被拒绝的,但是已经提交的任务得先执行完毕。

  • awaitTermination,检测一段时间后线程池是否完全终止,是用来检测的;
  • isShutdown,判断是否被shutdown了;
  • isTerminated,判断线程池是否完全终止;
  • shutdownNow,立刻停止线程池,如何立刻停止:利用线程InterruptedException中断抛出异常停止线程,同时该方法会返回一个集合,包含已经放进队列中还没来得急执行的线程对象;

    public class StopThreadPool {

    1. public static void main(String[] args) throws InterruptedException {
    2. ExecutorService executorService = Executors.newFixedThreadPool(10);
    3. for (int i = 0; i < 50; i++) {
    4. executorService.execute(new Runnable() {
    5. @Override
    6. public void run() {
    7. try {
    8. Thread.sleep(500);
    9. System.out.println("执行已经提交的线程:" +Thread.currentThread().getName());
    10. } catch (InterruptedException e) {
    11. System.out.println(Thread.currentThread().getName() + "被中断了");
    12. }
    13. }
    14. });
    15. }
    16. Thread.sleep(1000);
    17. executorService.shutdownNow();

    // System.out.println(executorService.shutdownNow());

    1. executorService.execute(new Runnable() {
    2. @Override
    3. public void run() {
    4. System.out.println("继续提交线程池");
    5. }
    6. });
    7. }

    }

    执行已经提交的线程:pool-1-thread-3
    执行已经提交的线程:pool-1-thread-8
    pool-1-thread-7被中断了
    pool-1-thread-6被中断了
    pool-1-thread-1被中断了
    pool-1-thread-4被中断了
    pool-1-thread-10被中断了
    pool-1-thread-9被中断了
    pool-1-thread-2被中断了
    pool-1-thread-5被中断了
    Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task tools.threadpool.StopThreadPool$2@4b67cf4d rejected from java.util.concurrent.ThreadPoolExecutor@7ea987ac[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20]

    1. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    2. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    3. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    4. at tools.threadpool.StopThreadPool.main(StopThreadPool.java:25)

3. 常见线程池的特点和用法?









































线程池创建方法 corePoolSize maxPoolSize keepAliveTime workQueue
newFixedThreadPool 指定 等于maxPoolSize 0s LinkedBlockingQueue
newSingleThreadExecutor 1 1 0s LinkedBlockingQueue
newCachedThreadPool 0 Integer.MAX_VALUE 60s SynchronousQueue
newScheduledThreadPool 指定 Integer.MAX_VALUE 0s DelayedWorkQueue

4. 任务太多,怎么拒绝?

  • 一旦线程池shutDown,后续的任务就被拒绝了;
  • 当Executor对最大线程maxPoolSize 和工作队列容量使用ArrayBlockingQueue有限边界并饱和时候,就直接拒绝了;
  • 4种拒绝策略:

    1. 1. AbortPolicy,抛出异常;
    2. 2. DiscardPolicy,默默抛弃,不抛出异常;
    3. 3. DiscardOldestPolicy,丢弃最老的,存在时间最久的任务给丢掉;
    4. 4. CallerRunsPolicy,如果线程池没办法处理,谁提交的任务谁去跑,比如主线程提交的任务,那么线程池饱和的情况下,就交还给主线程执行这个任务;

5. 钩子方法

  • 每个任务执行前后,做一些事情,比如日志、统计等;

    public class PauseableThreadPool extends ThreadPoolExecutor {

    1. private final Lock lock = new ReentrantLock();
    2. private Condition unpaused = lock.newCondition();
    3. private boolean isPaused;
    4. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    5. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    6. }
    7. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    8. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    9. }
    10. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    11. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    12. }
    13. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    14. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    15. }
    16. @Override
    17. protected void beforeExecute(Thread t, Runnable r) {
    18. super.beforeExecute(t, r);
    19. lock.lock();
    20. try {
    21. while (isPaused) {
    22. unpaused.await();
    23. }
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. } finally {
    27. lock.unlock();
    28. }
    29. }
    30. private void pause() {
    31. lock.lock();
    32. try {
    33. isPaused = true;
    34. } finally {
    35. lock.unlock();
    36. }
    37. }
    38. private void resume() {
    39. lock.lock();
    40. try {
    41. isPaused = false;
    42. unpaused.signalAll();
    43. } finally {
    44. lock.unlock();
    45. }
    46. }
    47. public static void main(String[] args) throws InterruptedException {
    48. PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    49. Runnable runnable = new Runnable() {
    50. @Override
    51. public void run() {
    52. System.out.println("执行中...;");
    53. try {
    54. Thread.sleep(10);
    55. } catch (InterruptedException e) {
    56. e.printStackTrace();
    57. }
    58. }
    59. };
    60. for (int i = 0; i < 10000; i++) {
    61. pauseableThreadPool.execute(runnable);
    62. }
    63. Thread.sleep(1500);
    64. //暂停
    65. pauseableThreadPool.pause();
    66. System.out.println("线程池被暂停了");
    67. Thread.sleep(1500);
    68. //恢复
    69. System.out.println("线程池被恢复了");
    70. pauseableThreadPool.resume();
    71. }

    }

6. Executor家族辨析

  1. Executor:接口,ExecutorService继承于Executor ;
  2. Executors:工具类,帮我们自动创建线程池,类似于Collections;
  3. ThreadPoolExecutor:线程池类,ThreadPoolExecutor继承于ExecutorService;
  4. 所以,Executor、ExecutorService、ThreadPoolExecutor都是线程池类型,只不过是继承关系;

7. 线程池实现任务复用的原理

  • 相同线程执行不同任务,调用新的任务的run方法;

    final void runWorker(Worker w) {

    1. Runnable task = w.firstTask;
    2. while (task != null || (task = getTask()) != null) {
    3. task.run();
    4. }
    5. }

在runWork中,拿到一个又一个的task,while循环中判断如果不为空,则执行完毕task的run方法,如果task为空,即执行完毕了一个任务,则用getTask方法去取下一个任务,直到没有任务了。

8. 线程池的5种状态

  • RUNNING:接收新任务并处理排队任务,即执行execute后的状态;
  • SHUTDOWN:不接受新任务,但是处理排队任务,即执行shutdown方法后的状态;
  • STOP:不接受新任务,也不处理派对任务,并中断正在进行的任务,即执行shutdownNow方法后的状态;
  • TIDYING:整洁,所有任务都已经终止,线程会转到TIDYING状态,并将运行terminate钩子方法;
  • TERMINATED:terminate方法完成;

9. 使用线程池的注意点

  • 避免任务堆积
  • 避免线程数过度增加
  • 排查线程泄漏

发表评论

表情:
评论列表 (有 0 条评论,174人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java并发工具线应用误区

    在使用Java的并发工具,如线程池,时可能会遇到一些误区。以下是一些常见的误区: 1. **过度设计线程池大小**:如果线程池规模设置得过大,可能导致资源浪费;反之过小则可能

    相关 Java并发线

    线程池 Java中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池.在开发过程中, 合理使用线程池能够带来三个好处 : * ...

    相关 Java并发工具线

    1. 什么是线程池? 1. 线程的创建和销毁是有一定的开销的,为了减少开销(内存和垃圾回收),我们通过创建线程池来执行大量的任务,避免反复创建并销毁线程所带来的问题,如

    相关 Java并发编程】线

    为什么要用线程池? 池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对

    相关 Java并发 - 线

    Java并发 - 线程池 务必牢记:三大方法、七个参数、四个策略。 池化技术:简单点来说,\\就是提前保存大量的资源,以备不时之需。\\池化技术能够减少资源对象的创建次