并发编程之ForkJoin并行处理框架

以你之姓@ 2024-03-26 00:22 263阅读 0赞

文章目录

    • 前言
    • 何为ForkJoin
    • JDK ForkJoin运用
    • ForkJoin原理
    • ForkJoin源码解析
      • ForkJoinPool源码解析
      • ForkJoinTask源码解析
      • ForkJoinWorkerThread源码解析
      • ForkJoinTask实现类
    • ForkJoin实战
    • 结论

前言

随着当代应用系统要求的QPS/TPS日益增加,很多的业务编程都需要用到多线程并发编程才能满足实际的性能要求。在前面的博文中,我们讲解了很多的JUC并发编程工具类,能够在高并发环境下保证多线程数据的安全性;也提到了Thread Pool线程池可以减少频繁创建、销毁线程在尽可能多的压榨资源的同时提升系统运行效率。那么,现在有没有一种方式在有限的线程下能够尽快尽最大限度完成任务呢?答案就是今天的线程并行主角——ForkJoin 并行处理框架。

何为ForkJoin

ForkJoin 是 JUC并发包下的一个并行处理框架。大家都知道并发就多个线程争抢CPU资源,不一定每个线程可以同时执行,只是拥有同时执行的权利,具体执行需要看CPU资源。但并行则是多个线程同时运行,比如有5个CPU核心,那么同时会有5个线程并行运行。所以 ForkJoin 框架是为多线程提供并行处理的一种架构,我们可以运用ForkJoin 框架下的工具类实现任务的‘分而治之’。

JDK ForkJoin运用

说到ForkJoin并行处理框架,我们就不得不提到JDK1.8中的Parallel Stream并行流。Parallel Stream并行流底层也是采用ForkJoin框架中的ForkJoinPool 并行线程池进行处理的。
案例:

  1. /**
  2. * parallelStreamTest
  3. * @author senfel
  4. * @date 2023/3/23 9:43
  5. * @return void
  6. */
  7. @Test
  8. public void parallelStreamTest(){
  9. List<Integer> list = new ArrayList<>(Arrays.asList(13, 2, 5, 9, 1, 6, 0, 45, 11));
  10. list.parallelStream().forEach(i->{
  11. System.err.println(i);
  12. System.err.println(Thread.currentThread().getName());
  13. });
  14. }

执行结果:
在这里插入图片描述

如图所示并行流内部是由主线程和ForkJoinPool线程一起执行的,大大缩短执行时间。
我们先来源码赏析:

  1. //返回一个并行流
  2. default Stream<E> parallelStream() {
  3. return StreamSupport.stream(spliterator(), true);
  4. }

如源码所示传入默认true并行标识。

继续进入执行源码:

  1. @Override
  2. public <S> Void evaluateParallel(PipelineHelper<T> helper,
  3. Spliterator<S> spliterator) {
  4. if (ordered)
  5. new ForEachOrderedTask<>(helper, spliterator, this).invoke();
  6. else
  7. new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
  8. return null;
  9. }
  10. static final class ForEachTask<S, T> extends CountedCompleter<Void>
  11. abstract class CountedCompleter<T> extends ForkJoinTask<T>

如上源码所示parallelStream 内部执行还是用的 ForkJoinTask。因为ForkJoinPool中少量工作
线程能够运行大量的ForkJoinTask。

ForkJoin原理

ForkJoin框架是JDK1.7引入新特性,内部实现了ExecutorService接口,它使用了一个无限队列还缓存任务,在使用过程中传入线程数量,如果不传入默认CPU核心数量作为线程数量。

ForkJoin主要包含ForkJoinPool 并行线程池、ForkJoinTask并行任务、ForkJoinWorkerThread并行工作线程等工具类组成。ForkJoin框架通过这三个主要类实现任务的‘分而治之’。

ForkJoin框架就好比我们的快速排序算法。快速排序其原理是将一个数组拆分为左右两个数组进行排序,左右两个数据内部再次拆分为左右两个数组内部进行排序,依次类推将一个大数组分为多个小数组进排序。同样的道理,ForkJoinTask的Fork()就是将一个大任务重复拆分为很多个子任务,join()方法则是将子任务执行结果汇聚给主任务。

ForkJoin源码解析

ForkJoin主要包含ForkJoinPool 并行线程池、ForkJoinTask并行任务、ForkJoinWorkerThread并行工作线程等工具类组成。

ForkJoinPool源码解析

ForkJoinPool并行线程池,内部实现了ExecutorService接口。
进入JUC ForkJoinPool查看源码

  1. /**
  2. * Creates a {
  3. @code ForkJoinPool} with parallelism equal to {
  4. @link
  5. * java.lang.Runtime#availableProcessors}, using the {@linkplain
  6. * #defaultForkJoinWorkerThreadFactory default thread factory},
  7. * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  8. *
  9. * @throws SecurityException if a security manager exists and
  10. * the caller is not permitted to modify threads
  11. * because it does not hold {
  12. @link
  13. * java.lang.RuntimePermission}{
  14. @code ("modifyThread")}
  15. */
  16. public ForkJoinPool() {
  17. this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
  18. defaultForkJoinWorkerThreadFactory, null, false);
  19. }
  20. /**
  21. * Creates a {
  22. @code ForkJoinPool} with the indicated parallelism
  23. * level, the {
  24. @linkplain
  25. * #defaultForkJoinWorkerThreadFactory default thread factory},
  26. * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  27. *
  28. * @param parallelism the parallelism level
  29. * @throws IllegalArgumentException if parallelism less than or
  30. * equal to zero, or greater than implementation limit
  31. * @throws SecurityException if a security manager exists and
  32. * the caller is not permitted to modify threads
  33. * because it does not hold {
  34. @link
  35. * java.lang.RuntimePermission}{
  36. @code ("modifyThread")}
  37. */
  38. public ForkJoinPool(int parallelism) {
  39. this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
  40. }

如源码所示,ForkJoinPool 可以它使用了一个无限队列还缓存任务,在使用过程中传入线程数量,如果不传入默认CPU核心数量作为线程数量。

继续查看源码submit()方法

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. /**
  8. * @throws RejectedExecutionException {
  9. @inheritDoc}
  10. * @throws NullPointerException {
  11. @inheritDoc}
  12. */
  13. public <T> Future<T> submit(Runnable task, T result) {
  14. if (task == null) throw new NullPointerException();
  15. RunnableFuture<T> ftask = newTaskFor(task, result);
  16. execute(ftask);
  17. return ftask;
  18. }
  19. /**
  20. * @throws RejectedExecutionException {
  21. @inheritDoc}
  22. * @throws NullPointerException {
  23. @inheritDoc}
  24. */
  25. public <T> Future<T> submit(Callable<T> task) {
  26. if (task == null) throw new NullPointerException();
  27. RunnableFuture<T> ftask = newTaskFor(task);
  28. execute(ftask);
  29. return ftask;
  30. }

发现ForkJoinPool的submit()方法,submit()方法主要是提交ForkJoinTask并行任务。

综上所述,ForkJoinPool 是有个并行线程池,由一个无限队列保存ForkJoinTask任务,而且执行线程数量可以传入并默认为CPU核心数。ForkJoinPool 也提供execute()执行方法,execute(task)执行方法来触发ForkJoinTask并行任务的执行。

ForkJoinTask源码解析

ForkJoinTask 是 ForkJoin框架的任务抽象类,内部封装了任务数据和计算并支持并行执行。ForkJoinTask 比线程轻量,可以在少量的ForkJoinWorkerThread运行大量的ForkJoinTask。

先查看源码

  1. public abstract class ForkJoinTask<V> implements Future<V>, Serializable

如源码所示ForkJoinTask是一个抽象类,并实现Future可以响应任务执行结果

继续查看源码

  1. // public methods
  2. /**
  3. * Arranges to asynchronously execute this task in the pool the
  4. * current task is running in, if applicable, or using the {
  5. @link
  6. * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
  7. * it is not necessarily enforced, it is a usage error to fork a
  8. * task more than once unless it has completed and been
  9. * reinitialized. Subsequent modifications to the state of this
  10. * task or any data it operates on are not necessarily
  11. * consistently observable by any thread other than the one
  12. * executing it unless preceded by a call to {
  13. @link #join} or
  14. * related methods, or a call to {
  15. @link #isDone} returning {@code
  16. * true}.
  17. *
  18. * @return {
  19. @code this}, to simplify usage
  20. */
  21. public final ForkJoinTask<V> fork() {
  22. Thread t;
  23. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
  24. ((ForkJoinWorkerThread)t).workQueue.push(this);
  25. else
  26. ForkJoinPool.common.externalPush(this);
  27. return this;
  28. }
  29. /**
  30. * Returns the result of the computation when it {
  31. @link #isDone is
  32. * done}. This method differs from {
  33. @link #get()} in that
  34. * abnormal completion results in {
  35. @code RuntimeException} or
  36. * {
  37. @code Error}, not {
  38. @code ExecutionException}, and that
  39. * interrupts of the calling thread do <em>not</em> cause the
  40. * method to abruptly return by throwing {
  41. @code
  42. * InterruptedException}.
  43. *
  44. * @return the computed result
  45. */
  46. public final V join() {
  47. int s;
  48. if ((s = doJoin() & DONE_MASK) != NORMAL)
  49. reportException(s);
  50. return getRawResult();
  51. }

如源码所示,ForkJoinTask提供fork()方法与Join()方法。fork()方法将主任务分解为子任务加入ForkJoinPool线程池阻塞队列等待执行,Join()方法则是监听ForkJoinTask子任务并在子任务完成后将结果返回给主任务。

ForkJoinWorkerThread源码解析

ForkJoinWorkerThread是ForkJoin框架中的并行工作线程,在一个ForkJoinPool并行线程池下会有定量的ForkJoinWorkerThread并行工作线程,而在一个ForkJoinWorkerThread工作线程下会有多个ForkJoinTask并行任务。

进入ForkJoinWorkerThread查看源码

  1. public class ForkJoinWorkerThread extends Thread

ForkJoinWorkerThread继承Thread 是一个不择不扣的线程。

继续查看源码

  1. public void run() {
  2. if (workQueue.array == null) {
  3. // only run once
  4. Throwable exception = null;
  5. try {
  6. onStart();
  7. pool.runWorker(workQueue);
  8. } catch (Throwable ex) {
  9. exception = ex;
  10. } finally {
  11. try {
  12. onTermination(exception);
  13. } catch (Throwable ex) {
  14. if (exception == null)
  15. exception = ex;
  16. } finally {
  17. pool.deregisterWorker(this, exception);
  18. }
  19. }
  20. }
  21. }

由ForkJoinWorkerThread线程run()方法可以知道运行线程是从workQueue工作阻塞队列里面取出任务进行执行,当然deregisterWorker()则是善后工作。

ForkJoinTask实现类

在这里插入图片描述

如图所示ForkJoinTask抽象类提供了三种实现类,分别是:
1、RecursiveTask 有返回值的并行任务
2、RecursiveAction 无返回值的并行任务
3、CountedCompleter 带有钩子函数的并行任务,完成任务后可以触发其他任务

ForkJoin实战

案例:
比如我们需要计算1-10000这10000个数相加之和是多少,此时我们就可以采用ForkJoin框架来计算。经分析运算需要返回结果,则选用可以返回任务执行结果的 RecursiveTask来演示。
1、创建MyForkJoinTask继承RecursiveTask

  1. /**
  2. * MyForkJoinTask
  3. * @author senfel
  4. * @version 1.0
  5. * @date 2023/3/23 14:07
  6. */
  7. public class MyForkJoinTask extends RecursiveTask<Integer> {
  8. /**
  9. * 开始元素
  10. */
  11. private Integer startElement;
  12. /**
  13. * 结束元素
  14. */
  15. private Integer endElement;
  16. public MyForkJoinTask(Integer startElement, Integer endElement) {
  17. this.startElement = startElement;
  18. this.endElement = endElement;
  19. }
  20. /**
  21. * compute
  22. * @author senfel
  23. * @date 2023/3/23 14:08
  24. * @return java.lang.Integer
  25. */
  26. @Override
  27. protected Integer compute() {
  28. int result = 0;
  29. if(startElement > endElement){
  30. int temp = endElement;
  31. endElement = startElement;
  32. startElement = temp;
  33. }
  34. //间隔两个数之内不进行拆分
  35. if(endElement - startElement <= 2){
  36. for(int i = endElement;i >= startElement;i--) {
  37. result += i;
  38. }
  39. }else{
  40. //获取两个数中间值
  41. int middle = (endElement + startElement) / 2;
  42. MyForkJoinTask taskByLeft = new MyForkJoinTask(startElement, middle);
  43. MyForkJoinTask taskByRight = new MyForkJoinTask(middle+1, endElement);
  44. //执行子任务
  45. taskByLeft.fork();
  46. taskByRight.fork();
  47. //等待子任务完成
  48. Integer joinByLeft = taskByLeft.join();
  49. Integer joinByRight = taskByRight.join();
  50. result = joinByLeft+joinByRight;
  51. }
  52. return result;
  53. }
  54. }

2、测试用例直接用当前CPU核心数线程,将并行任务放入并行线程池执行

  1. /**
  2. * forkJoinTest
  3. * @author senfel
  4. * @date 2023/3/23 13:39
  5. * @return void
  6. */
  7. @Test
  8. public void forkJoinTest() throws Exception{
  9. //可执行线程
  10. int processors = Runtime.getRuntime().availableProcessors();
  11. System.err.println("当前计算机可执行线程数:"+processors);
  12. long startTime = System.currentTimeMillis();
  13. //创建一个并行线程池
  14. ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
  15. //创建并行任务
  16. MyForkJoinTask task = new MyForkJoinTask(1,10000);
  17. //用并行线程池执行任务
  18. ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
  19. //获取执行结果
  20. Integer result = submit.get();
  21. long endTime = System.currentTimeMillis();
  22. System.err.println("1-10000所有数字之和为:"+result+","+processors+"个线程运行计算时间为:"+(endTime-startTime)+"毫秒。");
  23. }

3、运行结果

当前计算机可执行线程数:12
1-10000所有数字之和为:50005000,运行计算时间为:22毫秒。

结论

通过本篇文章我们知道了ForkJoin并行框架一个将任务‘分而治之’的并行架构,我们可以通过ForkJoinTask 实现一个具体的并行任务类,并用fork()\join()方法实现任务的分解和结果合并,再将ForkJoinTask 通过 ForkJoinPool并行线程池进行submit()创建ForkJoinWorkerThread执行。

ForkJoinTask抽象类主要有三种实现类RecursiveTask有返回值的并行任务 、RecursiveAction无返回值的并行任务 与CountedCompleter 带有钩子函数的并行任务。

在实际生产业务开发过程中,遇见可以并行计算的场景可以用ForkJoin框架来提升运算速度。

发表评论

表情:
评论列表 (有 0 条评论,263人围观)

还没有评论,来说两句吧...

相关阅读

    相关 并发编程ForkJoin

    大家好,我是小黑,一个在互联网苟且偷生的农民工。 > 在JDK1.7中引入了一种新的Fork/Join线程池,它可以将一个大的任务拆分成多个小的任务并行执行并汇总执行结果。

    相关 ForkJoin框架

    介绍 Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情: Fo

    相关 ForkJoin框架

    介绍 Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情: Fo