线程的并发工具类(1)----Fork/Join框架

ゞ 浴缸里的玫瑰 2023-07-14 15:00 17阅读 0赞

Fork/Join框架-分而治之

      • 1、分而治之
          • 1.1、什么是”分而治之”?
          • 1.2、什么是”工作密取”?
      • 2、Fork/Join具体实现
          • 2.1、Fork/Join使用的标准范式
          • 2.2、Fork/Join的同步用法同时演示返回结果值
          • 2.3、Fork/Join的异步用法同时演示不要求返回值

1、分而治之

1.1、什么是”分而治之”?

分而治之:规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。

Fork/Join充分提现了”分而治之”。
下面放张图说明下:
在这里插入图片描述
将一个大任务分解成若干个小任务,但不一定就是平均分割,可以任意分割。所以就会存在某一个线程提前完成任务,而别的任务还在工作中。这时为了提高程序的效率,提高资源的利用率,所以提出了另外一个概念:“工作密取”。

1.2、什么是”工作密取”?

还是上张图体现下:

在这里插入图片描述

就是当一个线程分配到的任务做完成以后,会去从别的线程里获取新的任务去执行。为什么不会发生冲突呢?看下图就知道了,为了避免执行任务冲突,空闲线程是从繁忙线程中的末尾去获取任务执行的,所以不会出现冲突的情况。

这样就会充分利用了资源,而不会造成资源的浪费。

2、Fork/Join具体实现

2.1、Fork/Join使用的标准范式

在这里插入图片描述

2.2、Fork/Join的同步用法同时演示返回结果值

统计整形数组中所有元素的和

  1. package com.ld.task;
  2. import java.util.concurrent.ForkJoinPool;
  3. import java.util.concurrent.RecursiveTask;
  4. /** * 计算数组中所有元素的和,传递给主线程(分而治之,同步调用) * 类似于二分法的用法 */
  5. public class UserForJoinByRecursiveTask extends RecursiveTask<Integer> {
  6. private final int THRESHOLD = MakeArray.length / 10;
  7. private int[] src;
  8. private int fromIndex;
  9. private int toIndex;
  10. public UserForJoinByRecursiveTask(int[] src, int fromIndex, int toIndex) {
  11. this.src = src;
  12. this.fromIndex = fromIndex;
  13. this.toIndex = toIndex;
  14. }
  15. @Override
  16. protected Integer compute() {
  17. int sum = 0;
  18. if (toIndex - fromIndex < THRESHOLD) {
  19. for (int i=fromIndex;i<=toIndex;i++){
  20. sum += src[i];
  21. }
  22. return sum;
  23. } else {
  24. //fromIndex.....mid.....toIndex
  25. int mid = (fromIndex+toIndex)/2; //这只是一种分的方式,也可以分解多个,数量不固定,invokeAll()方法支持数组、集合参数
  26. UserForJoinByRecursiveTask left = new UserForJoinByRecursiveTask(src,fromIndex,mid);
  27. UserForJoinByRecursiveTask right = new UserForJoinByRecursiveTask(src,mid,toIndex);
  28. invokeAll(left,right);
  29. return left.join()+right.join();
  30. }
  31. }
  32. public static void main(String[] args) {
  33. ForkJoinPool pool = new ForkJoinPool(6);
  34. UserForJoinByRecursiveTask task = new UserForJoinByRecursiveTask(MakeArray.getArr(),0,MakeArray.length-1);
  35. long start = System.currentTimeMillis();
  36. pool.invoke(task); //同步调用
  37. Integer sum = task.join();
  38. long end = System.currentTimeMillis();
  39. System.out.println("数组的和是"+sum+",forkJoin消耗的时间是"+(end-start));
  40. start = System.currentTimeMillis();
  41. int sum2 = 0;
  42. for (int num:MakeArray.getArr()) {
  43. sum2+=num;
  44. }
  45. end = System.currentTimeMillis();
  46. System.out.println("数组的和是"+sum+",普通循环消耗的时间是"+(end-start));
  47. }
  48. static class MakeArray {
  49. public final static int length = 100000000; //按需使用,,如果长度不太长不建议使用线程。cpu上下文切换消耗时间
  50. public static int[] getArr() {
  51. int[] arr = new int[length];
  52. for (int i = 0; i < arr.length; i++) {
  53. arr[i] = i + 1;
  54. }
  55. return arr;
  56. }
  57. }
  58. }

注意:pool.invoke()同步调用,会阻塞当前线程的执行,直到结果计算出来才继续向下执行。

2.3、Fork/Join的异步用法同时演示不要求返回值

遍历指定目录(含子目录)寻找指定类型文件

  1. package com.ld.task;
  2. import java.io.File;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.ForkJoinPool;
  6. import java.util.concurrent.RecursiveAction;
  7. /** * 遍历目录(含有子目录)找到指定文件。(分而治之ForkJoin异步用法:不要求有返回值,继承RecursiveAction) */
  8. public class UseForkJoin extends RecursiveAction {
  9. private File filePath;
  10. public UseForkJoin(File filePath) {
  11. this.filePath = filePath;
  12. }
  13. @Override
  14. protected void compute() {
  15. List<UseForkJoin> subTasks = new ArrayList<>();
  16. File[] files = filePath.listFiles();
  17. if (files != null) {
  18. for (File file : files) {
  19. if (file.isDirectory()) {
  20. subTasks.add(new UseForkJoin(file));
  21. } else {
  22. if (file.getAbsolutePath().endsWith("txt")) {
  23. System.out.println("文件:" + file.getAbsolutePath());
  24. }
  25. }
  26. }
  27. if (!subTasks.isEmpty()) {
  28. for (UseForkJoin useForkJoin : invokeAll(subTasks)) {
  29. useForkJoin.join();
  30. }
  31. }
  32. }
  33. }
  34. public static void main(String[] args) {
  35. ForkJoinPool forkJoinPool = new ForkJoinPool(6);
  36. UseForkJoin useForkJoin = new UseForkJoin(new File("d:/"));
  37. forkJoinPool.execute(useForkJoin);//异步调用
  38. // forkJoinPool.invoke(useForkJoin);//同步调用
  39. long start = System.currentTimeMillis();
  40. System.out.println("Task is running...");
  41. try {
  42. Thread.sleep(1000);
  43. System.out.println("执行主线程方法。。。。");
  44. Thread.sleep(5000);
  45. useForkJoin.join();
  46. long end = System.currentTimeMillis();
  47. System.out.println("Task is end.时间=" + (end - start));
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }

注意:forkJoinPool.execute()异步调用,不会阻塞当前线程的执行,直到等到调用join()方法获取结果,如果结果没有出来才会阻塞当前线程的执行,直到获取到结果,会继续向下执行。

发表评论

表情:
评论列表 (有 0 条评论,17人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线并发工具

    Fork-Join 什么是分而治之? 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到