并发编程(十六):线程池

小鱼儿 2023-06-04 11:50 178阅读 0赞

一,ThreadPoolExecutor 概述

1,线程池优势

  1. Java中,如果每个请求到达就创建一个线程,创建线程和销毁线程对系统资源的消耗都非常大,甚至可能比实际业务处理消耗的资源都大。同时,如果在JVM中创建太多的线程,也可能由于过度消耗内存或调度切换从而导致系统资源不足。
  2. 为了解决上面提出的问题,就有了线程池的概念。线程池,就是在一个线程容器中提前放置一定量的初始化线程,如果业务需要创建线程进行业务处理,则直接从线程池中获取一个线程进行执行,执行完成后归还线程到线程池,同时线程池也会对创建线程进行限制,不会无休止的创建下去。

使用线程池后,可以后下面几点优势:

* 减低创建线程和销毁线程的开销

* 提高响应速度,当有新任务需要执行时不需要等待线程创建时就可以直接执行(如果有空闲线程)

* 合理的设置线程池的大小可以避免因为硬件瓶颈带来的性能问题

2,类图

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTE5NzYzODg_size_16_color_FFFFFF_t_70

3,线程池常用API

  1. // 线程池初始化
  2. public ThreadPoolExecutor(
  3. // 核心线程数
  4. int corePoolSize,
  5. // 最大线程数
  6. int maximumPoolSize,
  7. // 线程空闲保留时间
  8. long keepAliveTime,
  9. // 线程保留单位
  10. TimeUnit unit,
  11. // 线程任务阻塞容器
  12. BlockingQueue<Runnable> workQueue,
  13. // 线程工厂,一般取默认
  14. ThreadFactory threadFactory,
  15. // 拒绝策略
  16. RejectedExecutionHandler handler);
  17. // 线程执行,不带返回值
  18. public void execute(Runnable command);
  19. // 线程执行,带返回值
  20. public <T> Future<T> submit(Callable<T> task);
  21. public <T> Future<T> submit(Runnable task, T result);
  22. public Future<?> submit(Runnable task);
  23. // 关闭线程池
  24. public void shutdown();

4,线程池常量解析,分为线程池常量和Future常量两部分

  1. /********************* 线程池常量 **********************/
  2. // 数量为,该值是29
  3. // 线程池把 Integer 的32位拆分为高3位和低29位,
  4. // 通过高三位表示状态,低29位表示正在执行的线程数量,
  5. private static final int COUNT_BITS = Integer.SIZE - 3;
  6. // 最大允许执行的线程数量,因为高三位表示状态,所以天然限制为29位
  7. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  8. // 线程池运行状态
  9. // -1 的二进制是 11111111 11111111 11111111 11111111
  10. // 右移29位就是 11100000 00000000 00000000 00000000
  11. // 高三位表示状态,所以 RUNNING 的状态就是 111
  12. private static final int RUNNING = -1 << COUNT_BITS;
  13. // 线程池关闭状态,不接收新任务,但是执行队列中的人物
  14. // 状态为 0
  15. private static final int SHUTDOWN = 0 << COUNT_BITS;
  16. // 线程池停止状态,不接受新任务,不执行队列人物,终止执行任务
  17. // 状态为 1
  18. private static final int STOP = 1 << COUNT_BITS;
  19. // 所有任务都已经结束,线程数量为0,处于该状态的线程池即将调用 terminated()方法
  20. // 状态为 10
  21. private static final int TIDYING = 2 << COUNT_BITS;
  22. // terminated()方法执行完成
  23. // 状态为 11
  24. private static final int TERMINATED = 3 << COUNT_BITS;
  25. /********************* Future常量 **********************/
  26. // Future 常量主要是对 state 状态的几种情况分析
  27. // NEW 新建状态,表示这个 FutureTask还没有开始运行
  28. private static final int NEW = 0;
  29. // COMPLETING 完成状态,表示 FutureTask 任务已经计算完毕了
  30. // 但是还有一些后续操作,例如唤醒等待线程操作,还没有完成。
  31. private static final int COMPLETING = 1;
  32. // FutureTask 任务完结,正常完成,没有发生异常
  33. private static final int NORMAL = 2;
  34. // FutureTask 任务完结,因为发生异常。
  35. private static final int EXCEPTIONAL = 3;
  36. // FutureTask 任务完结,因为取消任务
  37. private static final int CANCELLED = 4;
  38. // FutureTask 任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求
  39. private static final int INTERRUPTING = 5;
  40. // FutureTask 任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求
  41. private static final int INTERRUPTED = 6;

5,JDK提供的几种常用线程池,阿里开发手册不提倡用内置的线程池,建议通过构造器自行初始化,后续自行构造的执行流程会分析,该部分参考即可;跟源码可以发下,各种初始化方式最终也是通过构造器初始化!

  1. // 初始化定长线程池
  2. // 内置指定的核心线程数和最大线程数,并按照线程池流程执行
  3. public static ExecutorService newFixedThreadPool(int nThreads);
  4. // 初始化缓存的线程池
  5. // 没有核心线程数,默认最大线程数为 Integer.MAX_VALUE,阻塞队列为 SynchronousQueue,不保存数据
  6. // 所有对于缓存的线程池来说,接收一个任务的同时就需要执行一个任务
  7. public static ExecutorService newCachedThreadPool();
  8. // 初始化单例的线程池
  9. // 一次最多执行一个线程任务,多线程竞争时,添加到阻塞队列等候
  10. public static ExecutorService newSingleThreadExecutor();
  11. // 初始化定时的线程池
  12. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);

6,功能DEMO

  1. package com.gupao.concurrent;
  2. import java.util.concurrent.*;
  3. /**
  4. * @author pj_zhang
  5. * @create 2019-10-31 21:56
  6. **/
  7. public class ThreadPoolTest {
  8. private static ThreadPoolExecutor executor =
  9. new ThreadPoolExecutor(20, 20,
  10. 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
  11. public static void main(String[] args) throws ExecutionException, InterruptedException {
  12. Thread thread = new Thread(() -> {
  13. System.out.println("THREAD 线程执行");
  14. });
  15. Runnable runnable = () -> {
  16. System.out.println("RUNNABLE 线程执行");
  17. };
  18. Callable<String> callable = () -> {
  19. Thread.sleep(2000);
  20. return "Callable 线程执行";
  21. };
  22. executor.execute(thread);
  23. executor.execute(runnable);
  24. // 此处打印时间,是为了演示 Future 的阻塞获取结果
  25. System.out.println("callable执行前:" + System.currentTimeMillis());
  26. Future<String> future = executor.submit(callable);
  27. String callableResult = future.get();
  28. System.out.println("callable执行后:" + System.currentTimeMillis() + ", " + callableResult);
  29. }
  30. }

2019103122063457.png

二,源码分析

1,底层方法分析

1.1,ctlOf(int rs, int wc):获取线程池状态+数量的 Integer 值,

  1. private static int ctlOf(int rs, int wc) {
  2. return rs | wc;
  3. }

1.2,workerCountOf(int c):获取工作线程数量

  1. private static int workerCountOf(int c) {
  2. // 根据 ctlOf 获取到的值,用低29位进行与运算,获取到线程数量
  3. return c & CAPACITY;
  4. }

1.3,runStateOf(int c):获取线程状态

  1. private static int runStateOf(int c) {
  2. return c & ~CAPACITY;
  3. }

2,ThreadPoolExecutor 初始化及执行流程

2.1,ThreadPoolExecutor()

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. // 核心线程数
  19. this.corePoolSize = corePoolSize;
  20. // 最大线程数
  21. this.maximumPoolSize = maximumPoolSize;
  22. // 阻塞队列
  23. this.workQueue = workQueue;
  24. // 保持存活时间
  25. this.keepAliveTime = unit.toNanos(keepAliveTime);
  26. // 线程工厂,这个直接取默认
  27. this.threadFactory = threadFactory;
  28. // 拒绝策略
  29. this.handler = handler;
  30. }

2.2,线程池执行流程

  1. \* 接收到线程任务后,首先判断核心线程有没有被全部占用;没有被全部占用,随机构建一个线程执行任务
  2. \* 如果核心线程全部被占用,查看阻塞队列是否已满;如果没有满,添加到阻塞队列
  3. \* 如果阻塞队列已满,继续看最大线程数有没有被全部占用;如果存在空闲,构建线程执行任务
  4. \* 如果最大线程数已经全部占用,根据定义的拒绝策略进行拒绝操作

2.3,线程池拒绝策略

  1. \* 线程池提供了四种拒绝策略,分别是 *RejectedExecutionHandler* 接口的四种实现
  2. // java.util.concurrent.ThreadPoolExecutor.AbortPolicy
  3. // 异常处理
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. throw new RejectedExecutionException("Task " + r.toString() +
  6. " rejected from " +
  7. e.toString());
  8. }
  9. // java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
  10. // 丢弃最前面的任务,重新添加执行
  11. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  12. if (!e.isShutdown()) {
  13. e.getQueue().poll();
  14. e.execute(r);
  15. }
  16. }
  17. // java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
  18. // 什么都不做,即丢弃当前任务
  19. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  20. }
  21. // java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
  22. // 只要线程池没有关闭,则直接开线程运行,该策略建议慎用,不收控制
  23. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  24. if (!e.isShutdown()) {
  25. r.run();
  26. }
  27. }
  28. // 最后,业务可以自定义拒绝方式,只需要实现 RejectedExecutionHandler 接口,然后重写其接口方法

3,execute()

  1. \* execute(Runnable command):刚才对线程池的大概执行流程进行了分析,该方法内可以看出代码实现
  2. public void execute(Runnable command) {
  3. if (command == null)
  4. throw new NullPointerException();
  5. // 获取状态位 + 数量位的 int 对象
  6. int c = ctl.get();
  7. // 获取工作线程数量,首先判断核心线程数
  8. if (workerCountOf(c) < corePoolSize) {
  9. // 存在空闲的核心线程,进行线程执行
  10. if (addWorker(command, true))
  11. return;
  12. // 如果执行失败,重新对 c 赋值,说明存在线程竞争
  13. c = ctl.get();
  14. }
  15. // isRunning(c):线程池依旧运行状态
  16. // workQueue.offer(command):添加到队列成功
  17. // 次数是判断加入到阻塞队列是否成功
  18. if (isRunning(c) && workQueue.offer(command)) {
  19. int recheck = ctl.get();
  20. // 如果线程池不处于运行状态,则从队列中移除当前任务
  21. // 并执行拒绝策略
  22. if (! isRunning(recheck) && remove(command))
  23. reject(command);
  24. // 如果之前的线程已经被销毁完,新建一个线程
  25. else if (workerCountOf(recheck) == 0)
  26. addWorker(null, false);
  27. }
  28. // 阻塞队列添加失败,唤醒最大线程执行,
  29. else if (!addWorker(command, false))
  30. // 最大线程执行失败,进行拒绝策略处理
  31. reject(command);
  32. }
  33. \* addWorker(Runnable firstTask, boolean core)
  34. // Runnable firstTask:当前线程任务
  35. // boolean core:是否核心线程
  36. private boolean addWorker(Runnable firstTask, boolean core) {
  37. retry:
  38. for (;;) {
  39. // 获取线程执行状态
  40. int c = ctl.get();
  41. // 这部分计算后续搞明白再填充 TODO
  42. int rs = runStateOf(c);
  43. // 线程池已经关闭,不再接受新任务
  44. // SHUTDOWN 状态不接受新任务,但仍然会执行已经加入任务队列的任务
  45. // 所以当进入 SHUTDOWN 状态,而传进来的任务为空,并且任务队列不为空的时候,是允许添加新线程的,
  46. // 如果把这个条件取反,就表示不允许添加 worker
  47. if (rs >= SHUTDOWN &&
  48. ! (rs == SHUTDOWN &&
  49. firstTask == null &&
  50. ! workQueue.isEmpty()))
  51. return false;
  52. // 自旋添加任务
  53. for (;;) {
  54. // 获取工作线程
  55. int wc = workerCountOf(c);
  56. // 首先判断是否大于最大允许数量
  57. // 然后根据是否核心线程判断是否大于核心线程数或者最大线程数
  58. if (wc >= CAPACITY ||
  59. wc >= (core ? corePoolSize : maximumPoolSize))
  60. return false;
  61. // 对c递增,也就是工作线程数递增
  62. if (compareAndIncrementWorkerCount(c))
  63. break retry;
  64. // 递增失败,说明存在线程竞争或者状态变更,继续自旋处理
  65. c = ctl.get();
  66. // 此处不等于,说明存在线程池状态变更
  67. // 等于,说明只是存在线程竞争造成的CAS失败
  68. if (runStateOf(c) != rs)
  69. continue retry;
  70. }
  71. }
  72. // 上半部分基本是对线程池状态及工作线程数进行判断,并最终对工作线程+1,表示当前线程已经抢占到一个线程位置
  73. boolean workerStarted = false;
  74. boolean workerAdded = false;
  75. Worker w = null;
  76. try {
  77. // 包装线程对象为 Worker 对象
  78. w = new Worker(firstTask);
  79. // 通过 ThreadFactory 构建一个新的线程
  80. final Thread t = w.thread;
  81. if (t != null) {
  82. // 此处加重入锁
  83. final ReentrantLock mainLock = this.mainLock;
  84. mainLock.lock();
  85. try {
  86. // 继续获取线程池状态进行判断
  87. int rs = runStateOf(ctl.get());
  88. if (rs < SHUTDOWN ||
  89. (rs == SHUTDOWN && firstTask == null)) {
  90. // 如果线程已经运行中,则说明存在问题,此处线程还没有启用
  91. if (t.isAlive())
  92. throw new IllegalThreadStateException();
  93. // 添加线程包装后的Worker对象到列表中
  94. workers.add(w);
  95. int s = workers.size();
  96. if (s > largestPoolSize)
  97. largestPoolSize = s;
  98. // 表示工作线程创建成功
  99. workerAdded = true;
  100. }
  101. } finally {
  102. mainLock.unlock();
  103. }
  104. if (workerAdded) {
  105. // 线程启动,
  106. // 此处注意,Worker 实现了 Runnable接口,则此处是调用 Worker.run()
  107. t.start();
  108. workerStarted = true;
  109. }
  110. }
  111. } finally {
  112. // 如果添加失败,则递减工作线程数
  113. if (! workerStarted)
  114. addWorkerFailed(w);
  115. }
  116. return workerStarted;
  117. }
  118. \* runWorker(Worker w):Worker run() 方法内部调用该方法
  119. final void runWorker(Worker w) {
  120. Thread wt = Thread.currentThread();
  121. // 获取初始化 Worker时传递的线程
  122. // 此处分析的是 execute() 触发,如果是 submit() 触发,此处的Runnable实现类应该为 FutureTask,
  123. // task.run() 最终调用 FutureTask.run()方法,此处会对 Future的状态进行处理,实现阻塞获取的功能
  124. Runnable task = w.firstTask;
  125. w.firstTask = null;
  126. // unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,
  127. // 此处是调用Worker 类的 tryRelease()方法,将 state 置为 0,
  128. // 而 interruptIfStarted()中只有 state>=0 才允许调用中断
  129. w.unlock();
  130. boolean completedAbruptly = true;
  131. try {
  132. // 任务不为空,则持续执行
  133. // getTask():此处表示不断从阻塞队列中获取元素
  134. while (task != null || (task = getTask()) != null) {
  135. w.lock();
  136. // 线程池状态为stop时不接受新任务,并中断正在执行的人物
  137. // (Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)确保线程中断标志位为 true 且是 stop 状态以上,接着清除了中断标志
  138. // !wt.isInterrupted()则再一次检查保证线程需要设置中断标志位
  139. if ((runStateAtLeast(ctl.get(), STOP) ||
  140. (Thread.interrupted() &&
  141. runStateAtLeast(ctl.get(), STOP))) &&
  142. !wt.isInterrupted())
  143. wt.interrupt();
  144. try {
  145. // 默认没有实现功能,前置处理
  146. beforeExecute(wt, task);
  147. Throwable thrown = null;
  148. try {
  149. // 此处注意 task.run() 的task的不同实现
  150. // 在submit()触发的功能中,表示FutureTask
  151. task.run();
  152. } catch (RuntimeException x) {
  153. thrown = x; throw x;
  154. } catch (Error x) {
  155. thrown = x; throw x;
  156. } catch (Throwable x) {
  157. thrown = x; throw new Error(x);
  158. } finally {
  159. // 后置执行,没有处理
  160. afterExecute(task, thrown);
  161. }
  162. } finally {
  163. task = null;
  164. w.completedTasks++;
  165. w.unlock();
  166. }
  167. }
  168. completedAbruptly = false;
  169. } finally {
  170. // 将入参 worker 从数组 workers 里删除掉;
  171. // 根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组workers
  172. processWorkerExit(w, completedAbruptly);
  173. }
  174. }
  175. \* getTask():从阻塞队列中获取下一个有效任务。线程池定义的超时处理再该部分实现
  176. private Runnable getTask() {
  177. boolean timedOut = false;
  178. for (;;) {
  179. int c = ctl.get();
  180. int rs = runStateOf(c);
  181. // 校验线程池状态
  182. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  183. decrementWorkerCount();
  184. return null;
  185. }
  186. // 获取工作线程
  187. int wc = workerCountOf(c);
  188. // 对超时线程进行时间控制
  189. // allowCoreThreadTimeOut默认为false,表示核心线程不收控制
  190. // wc > corePoolSize:超过核心线程,即最大线程数,则触发控制
  191. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  192. // timedOut为true,说明上次阻塞操作已经超时,则工作线程数-1,并返回null
  193. if ((wc > maximumPoolSize || (timed && timedOut))
  194. && (wc > 1 || workQueue.isEmpty())) {
  195. if (compareAndDecrementWorkerCount(c))
  196. return null;
  197. continue;
  198. }
  199. try {
  200. // 此处就是对超时控制的处理,在从队列中获取数据时,阻塞获取
  201. Runnable r = timed ?
  202. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  203. workQueue.take();
  204. // 如果拿到任务,则直接返回去进行处理
  205. if (r != null)
  206. return r;
  207. // 走到这一步,说明超时,在下一步时候进行回收处理
  208. timedOut = true;
  209. } catch (InterruptedException retry) {
  210. timedOut = false;
  211. }
  212. }
  213. }
  214. \* addWorkerFailed(Worker w):添加工作线程失败
  215. private void addWorkerFailed(Worker w) {
  216. final ReentrantLock mainLock = this.mainLock;
  217. mainLock.lock();
  218. try {
  219. // 从列表中移除当前 Worker
  220. if (w != null)
  221. workers.remove(w);
  222. // 工作线程数递减
  223. decrementWorkerCount();
  224. // 尝试修改线程状态为 Terminate
  225. tryTerminate();
  226. } finally {
  227. mainLock.unlock();
  228. }
  229. }
  230. private void decrementWorkerCount() {
  231. do {} while (! compareAndDecrementWorkerCount(ctl.get()));
  232. }
  233. \* reject(Runnable command):拒绝策略
  234. final void reject(Runnable command) {
  235. // 直接执行拒绝策略
  236. handler.rejectedExecution(command, this);
  237. }

4,submit():同时对 Future 进行分析

  1. \* submit(Callable<T> task):触发线程执行
  2. public <T> Future<T> submit(Callable<T> task) {
  3. if (task == null) throw new NullPointerException();
  4. // 初始化化一个 FutureTask,实现了 Runnable 接口
  5. RunnableFuture<T> ftask = newTaskFor(task);
  6. // 直接执行 execute 方法
  7. execute(ftask);
  8. return ftask;
  9. }
  10. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  11. return new FutureTask<T>(callable);
  12. }
  13. \* runWorker() 方法中,触发 task.run(),实际调用的是 Future.run()方法
  14. public void run() {
  15. // 状态不为new,直接执行异常
  16. if (state != NEW ||
  17. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  18. null, Thread.currentThread()))
  19. return;
  20. try {
  21. Callable<V> c = callable;
  22. // 线程一切正常,准备执行
  23. if (c != null && state == NEW) {
  24. V result;
  25. boolean ran;
  26. try {
  27. // 直接调用Callable的call方法,并返回结果
  28. result = c.call();
  29. ran = true;
  30. } catch (Throwable ex) {
  31. result = null;
  32. ran = false;
  33. setException(ex);
  34. }
  35. // 设置结果
  36. if (ran)
  37. set(result);
  38. }
  39. } finally {
  40. runner = null;
  41. int s = state;
  42. if (s >= INTERRUPTING)
  43. handlePossibleCancellationInterrupt(s);
  44. }
  45. }
  46. \* set(V v):设置执行结果
  47. protected void set(V v) {
  48. // 设置状态为完成
  49. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  50. outcome = v;
  51. // 设置正常结束
  52. UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
  53. // 完成后续处理,唤醒等待节点
  54. finishCompletion();
  55. }
  56. }
  57. \* finishCompletion():该方法主要是唤醒等待节点,去返回结果中拿数据
  58. private void finishCompletion() {
  59. // 获取等待节点
  60. for (WaitNode q; (q = waiters) != null;) {
  61. if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
  62. for (;;) {
  63. // 等待节点线程优先,直接唤醒
  64. Thread t = q.thread;
  65. if (t != null) {
  66. q.thread = null;
  67. LockSupport.unpark(t);
  68. }
  69. // 递归下一个等待节点同步处理
  70. WaitNode next = q.next;
  71. if (next == null)
  72. break;
  73. q.next = null; // unlink to help gc
  74. q = next;
  75. }
  76. break;
  77. }
  78. }
  79. done();
  80. callable = null; // to reduce footprint
  81. }

5,Funture.get()

  1. \* get():阻塞获取数据
  2. public V get() throws InterruptedException, ExecutionException {
  3. // 获取Future对应的线程执行状态,如果没有执行完直接等待
  4. int s = state;
  5. if (s <= COMPLETING)
  6. s = awaitDone(false, 0L);
  7. // 执行完成后,解析结果集
  8. return report(s);
  9. }
  10. \* awaitDone(boolean timed, long nanos)
  11. private int awaitDone(boolean timed, long nanos) throws InterruptedException {
  12. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  13. WaitNode q = null;
  14. boolean queued = false;
  15. for (;;) {
  16. // 线程中断,直接移除
  17. if (Thread.interrupted()) {
  18. removeWaiter(q);
  19. throw new InterruptedException();
  20. }
  21. int s = state;
  22. // 此处说明执行完成
  23. if (s > COMPLETING) {
  24. if (q != null)
  25. q.thread = null;
  26. return s;
  27. }
  28. // 还有后续操作没有执行完成,暂时让出时间片段,稍后执行
  29. else if (s == COMPLETING)
  30. Thread.yield();
  31. // 表示状态为null,构建等待节点,准备等待
  32. else if (q == null)
  33. q = new WaitNode();
  34. // 使用CAS添加等待节点到等待队列
  35. else if (!queued)
  36. queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  37. q.next = waiters, q);
  38. // 此处表示设置了超时
  39. else if (timed) {
  40. // 如果超时时间没有获取到值,则直接退出
  41. nanos = deadline - System.nanoTime();
  42. if (nanos <= 0L) {
  43. removeWaiter(q);
  44. return state;
  45. }
  46. LockSupport.parkNanos(this, nanos);
  47. }
  48. else
  49. // 添加等待队列成功后,线程挂起等待,等待执行完成后进行唤醒,完成那部分已经分析
  50. LockSupport.park(this);
  51. }
  52. }
  53. \* report(int s):获取返回值
  54. private V report(int s) throws ExecutionException {
  55. // 获取返回值
  56. Object x = outcome;
  57. if (s == NORMAL)
  58. return (V)x;
  59. if (s >= CANCELLED)
  60. throw new CancellationException();
  61. throw new ExecutionException((Throwable)x);
  62. }

#

发表评论

表情:
评论列表 (有 0 条评论,178人围观)

还没有评论,来说两句吧...

相关阅读

    相关 并发编程-线

    文章目录 线程池 线程池原理 线程池分类 线程池 为什么需要使用线程池? 线程的创建和销毁都需要消耗系统资源,线程池可以复