从ThreadPoolExecutor看线程池

矫情吗;* 2022-12-20 06:12 262阅读 0赞

首先复习下创建线程的几种方式

  1. 1、实现runnable接口
  2. new Thread(() -> log.info("方式一:实现runnable接口")).start();
  3. 2、实现callable接口
  4. FutureTask<String> task = new FutureTask<>(() -> "方式二:实现callable接口");
  5. new Thread(task).start();
  6. 3、继承Thread
  7. class MyThread extends Thread{
  8. @Override
  9. public void run() {
  10. log.info("方式三:继承thread类");
  11. }
  12. };
  13. 4、使用线程池 这个只是创建了线程池,但是线程具体需要执行的任务还是需要在方法中去指定
  14. ExecutorService executorService = Executors.newCachedThreadPool();
  15. executorService.submit(() -> log.info("方式四:使用线程池"));

在这里插入图片描述
ok 现在我们开始来熟悉线程池

1、创建线程池

  1. public ThreadPoolExecutor(int corePoolSize, //核心线程数
  2. int maximumPoolSize, //最大线程数
  3. long keepAliveTime, //存活时间
  4. TimeUnit unit, //时间单位
  5. BlockingQueue<Runnable> workQueue, //阻塞队列,用来储存线程
  6. ThreadFactory threadFactory, //创建线程的工厂
  7. RejectedExecutionHandler handler //拒绝策略 ) {
  8. }
  9. //具体代码引发的问题
  10. public class ThreadPoolListener implements BeanPostProcessor , InitializingBean {
  11. //设置阻塞队列大小为2
  12. BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2);
  13. //设置核心线程数为4 最大线程数为6
  14. ThreadPoolExecutor pool = new ThreadPoolExecutor(4, 6, 10, TimeUnit.SECONDS, queue, (runnable, executor) -> {
  15. //拒绝策略需要处理的业务逻辑
  16. log.info("do something ~");
  17. log.info(runnable.toString());
  18. log.info(executor.toString());
  19. });
  20. //定时任务处理业务~~~
  21. @Scheduled(cron = "*/5 * * * * *")
  22. @Async
  23. public void logCollection(){
  24. //打印线程池的工作中的线程数以及线程id 队列的大小
  25. pool.execute(() ->{
  26. log.info("测试线程池工作原理 线程数:{},当前线程id:{}",pool.getPoolSize(),Thread.currentThread().getId());
  27. log.info(queue.size()+"----->>队列大小");
  28. if(queue.size()>0){
  29. log.info("队列中的数据:{}",queue.peek());
  30. }
  31. try {
  32. Thread.sleep(100000);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. }
  38. @Override
  39. public void afterPropertiesSet() throws Exception {
  40. log.info("属性设置完毕 队列容量:{}",queue.size());
  41. }
  42. }

在这里插入图片描述
从图中可以看到,在线程数量超过了核心线程之后,队列立马就满了 这是为什么?

从execute入手,看看他都做了什么事情

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. //ctl:原子计数器,来保存当前的正在执行的线程数以及线程状态(高3位存状态,低29位存线程数)
  5. int c = ctl.get();
  6. //执行中的线程数是否小于核心线程数
  7. if (workerCountOf(c) < corePoolSize) {
  8. //添加一个worker来处理任务
  9. if (addWorker(command, true))
  10. return;
  11. //走到这里说明worker添加失败了 再次获取下线程的状态数
  12. c = ctl.get();
  13. }
  14. //如果线程池是运行的状态,将本任务加入到队列中
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. int recheck = ctl.get();
  17. //再判断线程池是不是运行中 不是的话,将本线程移出队列
  18. if (! isRunning(recheck) && remove(command))
  19. // 然后执行拒绝策略
  20. reject(command);
  21. //线程数量为0的话, 开启创建一个新的worker 注意传的参数
  22. else if (workerCountOf(recheck) == 0)
  23. //开启一个新worker,去处理队列中的任务 这里false,在方法里面会作为判断允许线程的数量的条件
  24. //上面一行话看不懂,可以去看看这个方法,就自然懂了
  25. addWorker(null, false);
  26. }
  27. //线程池不是运行中 或者任务加入队列失败 再次尝试添加一个worker去处理任务
  28. else if (!addWorker(command, false))
  29. //添加失败,执行拒绝策略
  30. reject(command);
  31. }

从上面我们可以看到,在判断完线程池的状态以及数量之后,要么执行拒绝策略,要么创建一个worker,要么将任务加入到队列中,所以具体是怎么去处理线程任务?我们来分析addWorker

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. //如果线程已经关闭 并且 任务为null,队列不为空 直接返回
  7. //(因为线程已经关闭,任务队列也为空,当前也没有需要执行的任务。所以直接返回)
  8. //(如果任务队列不为空,或者当前需要执行的任务不为null,并且线程池状态为shutdown,
  9. //线程池会执行完当前的任务,如果状态为stop,则直接返回,所有任务都会被抛弃)
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
  12. return false;
  13. for (;;) {
  14. int wc = workerCountOf(c);
  15. //当前线程数量线程最大允许数量 直接返回
  16. if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
  17. return false;
  18. //尝试增加线程数量
  19. if (compareAndIncrementWorkerCount(c))
  20. //成功的话,跳出外层循环
  21. break retry;
  22. //走到这里说明由于并发导致线程数量增加失败
  23. c = ctl.get();
  24. //再次判断下线程池状态 如果状态被改变了,那么执行下一次的外层循环 在去获取线程池状态、数量等信息
  25. if (runStateOf(c) != rs)
  26. continue retry;
  27. }
  28. }
  29. //到了这里,说明已经成功增加了线程数量,才会跳出上面的循环
  30. boolean workerStarted = false;
  31. boolean workerAdded = false;
  32. Worker w = null;
  33. try {
  34. //增加一个新的worker,将我们传过来的任务传进去,作为线程的第一个任务去执行
  35. //worker中会创建一个线程,一个worker对应一个线程
  36. w = new Worker(firstTask);
  37. //拿到worker中的线程
  38. final Thread t = w.thread;
  39. if (t != null) {
  40. final ReentrantLock mainLock = this.mainLock;
  41. //加独占锁 这一块就与aqs关联起来了 怎么加锁,查看我前面的文章
  42. mainLock.lock();
  43. try {
  44. int rs = runStateOf(ctl.get());
  45. //线程池是运行中的状态 或者是shutdown,并且本任务不为null
  46. if (rs < SHUTDOWN ||
  47. (rs == SHUTDOWN && firstTask == null)) {
  48. //本线程已经在运转,直接抛异常
  49. if (t.isAlive()) // precheck that t is startable
  50. throw new IllegalThreadStateException();
  51. //将这个worker加入到工作组中
  52. workers.add(w);
  53. int s = workers.size();
  54. if (s > largestPoolSize)
  55. largestPoolSize = s;
  56. workerAdded = true;
  57. }
  58. } finally {
  59. //释放锁 看到这里,我们可以知道,
  60. //上面枷锁,只是为了保证在添加woker到工作组中,线程池的状态不被改变
  61. mainLock.unlock();
  62. }
  63. //worker被成功加入到工作组中了。再来启动这个worker去工作
  64. if (workerAdded) {
  65. //重点!!!
  66. t.start();
  67. workerStarted = true;
  68. }
  69. }
  70. } finally {
  71. if (! workerStarted)
  72. //worker没启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
  73. addWorkerFailed(w);
  74. }
  75. //返回worker的工作状态
  76. return workerStarted;
  77. }

我们都知道,在调用了thread.start()方法之后,他会启动线程,然后去执行线程的run方法。同样,我们在将worker启动之后,也会调用他的run方法。我们来看run里面都做了什么

这里做一个简单说明:在创建worker的时候,他们将我们需要执行的任务作为必要的构造器参数传进去,在构造方法里面,会对他做一个包装之后,在赋给worker中的thread字段。

  1. public void run() {
  2. runWorker(this);
  3. }
  4. final void runWorker(Worker w) {
  5. Thread wt = Thread.currentThread();
  6. //获取第一个任务
  7. Runnable task = w.firstTask;
  8. //将第一个任务置空
  9. w.firstTask = null;
  10. w.unlock(); // allow interrupts
  11. boolean completedAbruptly = true;
  12. try {
  13. //第一个任务不为null 或者为null,那么就去取任务
  14. while (task != null || (task = getTask()) != null) {
  15. //加锁
  16. w.lock();
  17. //线程池状态为stop,那么本线程直接中断进行
  18. if ((runStateAtLeast(ctl.get(), STOP) ||
  19. (Thread.interrupted() &&
  20. runStateAtLeast(ctl.get(), STOP))) &&
  21. !wt.isInterrupted())
  22. wt.interrupt();
  23. try {
  24. //执行之前的处理 这一块为空方法 由此可见我们可以继承然后重写这个方法
  25. beforeExecute(wt, task);
  26. Throwable thrown = null;
  27. try {
  28. //执行我们的业务
  29. task.run();
  30. } catch (RuntimeException x) {
  31. thrown = x; throw x;
  32. } catch (Error x) {
  33. thrown = x; throw x;
  34. } catch (Throwable x) {
  35. thrown = x; throw new Error(x);
  36. } finally {
  37. //执行之后的处理
  38. afterExecute(task, thrown);
  39. }
  40. } finally {
  41. //任务置空 那么就可以获取下一个任务
  42. task = null;
  43. //完成的任务数
  44. w.completedTasks++;
  45. //释放锁
  46. w.unlock();
  47. }
  48. }
  49. completedAbruptly = false;
  50. } finally {
  51. //到这里有两种情况
  52. //1 任务为空 那么正常走到这里completedAbruptly 已被改为false
  53. //2 出现了异常 此时completedAbruptly由于异常,所以他的值还是true
  54. processWorkerExit(w, completedAbruptly);
  55. }
  56. }

任务为空 或者出现异常之后,来在这里做处理

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. //如果有异常,那么减少线程的数量
  3. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  4. decrementWorkerCount();
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. completedTaskCount += w.completedTasks;
  9. //将本worker移除工作组
  10. workers.remove(w);
  11. } finally {
  12. mainLock.unlock();
  13. }
  14. //更改线程池状态 略过吧 不是重点 感兴趣的可以自己看 也很简单 反正我没看
  15. tryTerminate();
  16. int c = ctl.get();
  17. //如果线程池的状态<stop
  18. if (runStateLessThan(c, STOP)) {
  19. //没出现异常
  20. if (!completedAbruptly) {
  21. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  22. //队列不为空,则保证最少有一个线程存在,来处理这个队列
  23. if (min == 0 && ! workQueue.isEmpty())
  24. min = 1;
  25. if (workerCountOf(c) >= min)
  26. return; // replacement not needed
  27. }
  28. addWorker(null, false);
  29. }
  30. }

再来看看 线程是如何获取任务的

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. //线程池状态>=stop 或者线程池状态=shutdown,队列为空 也是再次保证,线程shutdown之后,不在接收新的创建线程然后去处理任务的请求 但是已经存在的任务还是可以正常去处理
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. //减少线程的数量
  9. decrementWorkerCount();
  10. return null;
  11. }
  12. int wc = workerCountOf(c);
  13. //正在工作的线程是否超过核心线程数
  14. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  15. //队列为空 或者线程池状态改变 那么也返回null
  16. if ((wc > maximumPoolSize || (timed && timedOut))
  17. && (wc > 1 || workQueue.isEmpty())) {
  18. if (compareAndDecrementWorkerCount(c))
  19. return null;
  20. continue;
  21. }
  22. try {
  23. //获取队列中的任务
  24. Runnable r = timed ?
  25. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  26. workQueue.take();
  27. if (r != null)
  28. return r;
  29. timedOut = true;
  30. } catch (InterruptedException retry) {
  31. // 如果此 worker 发生了中断,采取的方案是重试
  32. // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法。
  33. // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
  34. // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
  35. timedOut = false;
  36. }
  37. }
  38. }

至此,关于任务是怎么加入到队列,以及又是怎么被取出来然后去执行的已经很清晰了

其他一些细节点
corePoolSize

核心线程数,不要抠字眼,反正先记着有这么个属性就可以了。

maximumPoolSize

最大线程数,线程池允许创建的最大线程数。

workQueue

任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

keepAliveTime

空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收。

threadFactory

用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 类似这样。

handler:

当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑,这个之后再说。

参考文章:https://javadoop.com/post/java-thread-pool\#toc\_4

发表评论

表情:
评论列表 (有 0 条评论,262人围观)

还没有评论,来说两句吧...

相关阅读