线程的并发工具类之forkjoin原理和使用(六)

快来打我* 2022-11-20 04:58 216阅读 0赞

今天分析 forkjoin原理和使用:

1、Fork-Join

java 下多线程的开发可以我们自己启用多线程,线程池,还可以使用 forkjoin , forkjoin 可以让我们不去了解诸如 Thread,Runnable 等相关的知识,只要遵循 forkjoin 的开发模式,就可以写出很好的多线程并发程序,

2、分而治之

同时 forkjoin 在处理某一类问题时非常的有用,哪一类问题?分而治之的问 题。十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、 深度优先、广度优先、Dijkstra 、动态规划、朴素贝叶斯分类,有几个属于分 而治之?3 个,快速排序、归并排序、二分查找,还有大数据中 M/R 都是。 分治法的设计思想是:将一个难以直接解决的大问题,分割成一些规模较小 的相同问题,以便各个击破,分而治之。 分治策略是:对于一个规模为 n 的问题,若该问题可以容易地解决(比如说 规模 n 较小)则直接解决,否则将其分解为 k 个规模较小的子问题, 这些子问题 互相独立且与原问题形式相同 ( 子问题相互之间有联系就会变为动态规范算法 ) , 递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计 策略叫做分治法。

3、Fork-Join 原理

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70

4、工作密取

即当前线程的 Task 已经全被执行完毕,则自动取到其他线程的 Task 池中取 出 Task 继续执行。 ForkJoinPool 中维护着多个线程(一般为 CPU 核数)在不断地执行 Task ,每 个线程除了执行自己职务内的 Task 之外,还会根据自己工作线程的闲置情况去 获取其他繁忙的工作线程的 Task ,如此一来就能能够减少线程阻塞或是闲置的时 间,提高 CPU 利用率。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 1

5、 Fork/Join 实战

Fork/Join 使用的标准范式

我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务 中执行 fork 和 join 的操作机制,通常我们不直接继承 ForkjoinTask 类,只需要直

接继承其子类。

  1. RecursiveAction ,用于没有返回结果的任务

  2. RecursiveTask ,用于有返回值的任务

task 要通过 ForkJoinPool 来执行,使用 submit 或 invoke 提交,两者的区 别是:invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码;

submit 是异步执行。 join()和 get 方法当任务完成的时候返回计算结果。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 2

在我们自己实现的 compute 方法里,首先需要判断任务是否足够小,如果 足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务 在调用 invokeAll 方法时,又会进入 compute 方法,看看当前子任务是否需要继 续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。

5、Fork/Join 的同步用法和异步用法

参见代如 下,同步执行:

  1. //初始化一个数组
  2. public class MakeArray {
  3. //数组长度
  4. public static final int ARRAY_LENGTH = 4000;
  5. public final static int THRESHOLD = 47;
  6. public static int[] makeArray() {
  7. //new一个随机数发生器
  8. Random r = new Random();
  9. int[] result = new int[ARRAY_LENGTH];
  10. for(int i=0;i<ARRAY_LENGTH;i++){
  11. //用随机数填充数组
  12. result[i] = r.nextInt(ARRAY_LENGTH*3);
  13. }
  14. return result;
  15. }
  16. }

forkjoin 类和main方法:

  1. import java.util.concurrent.ForkJoinPool;
  2. import java.util.concurrent.RecursiveTask;
  3. /**
  4. * ForkJoin执行累加
  5. */
  6. public class SumArray {
  7. //继承有返回结果的类
  8. private static class SumTask extends RecursiveTask<Integer>{
  9. /*阈值*/
  10. private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
  11. private int[] src;
  12. private int fromIndex;
  13. private int toIndex;
  14. public SumTask(int[] src, int fromIndex, int toIndex) {
  15. this.src = src;
  16. this.fromIndex = fromIndex;
  17. this.toIndex = toIndex;
  18. }
  19. @Override
  20. protected Integer compute() {
  21. /*任务的大小是否合适*/
  22. if (toIndex - fromIndex < THRESHOLD){
  23. // System.out.println(" from index = "+fromIndex
  24. // +" toIndex="+toIndex);
  25. int count = 0;
  26. for(int i= fromIndex;i<=toIndex;i++){
  27. // SleepTools.ms(1);
  28. count = count + src[i];
  29. }
  30. return count;
  31. }else{
  32. //fromIndex....mid.....toIndex
  33. int mid = (fromIndex+toIndex)/2;
  34. SumTask left = new SumTask(src,fromIndex,mid);
  35. SumTask right = new SumTask(src,mid+1,toIndex);
  36. invokeAll(left,right);
  37. return left.join()+right.join();
  38. }
  39. }
  40. }
  41. public static void main(String[] args) {
  42. int[] src = MakeArray.makeArray();
  43. /*new出池的实例*/
  44. ForkJoinPool pool = new ForkJoinPool();
  45. /*new出Task的实例*/
  46. SumTask innerFind = new SumTask(src,0,src.length-1);
  47. long start = System.currentTimeMillis();
  48. pool.invoke(innerFind);
  49. //System.out.println("Task is Running.....");
  50. System.out.println("The count is "+innerFind.join()
  51. +" spend time:"+(System.currentTimeMillis()-start)+"ms");
  52. }
  53. }

执行结果:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 3

异步执行:继承此类 RecursiveAction 没有返回值

  1. /**
  2. *类说明:遍历指定目录(含子目录)找寻指定类型文件
  3. */
  4. public class FindDirsFiles extends RecursiveAction {
  5. private File path;
  6. public FindDirsFiles(File path) {
  7. this.path = path;
  8. }
  9. @Override
  10. protected void compute() {
  11. List<FindDirsFiles> subTasks = new ArrayList<>();
  12. File[] files = path.listFiles();
  13. if (files!=null){
  14. for (File file : files) {
  15. if (file.isDirectory()) {
  16. // 对每个子目录都新建一个子任务。
  17. subTasks.add(new FindDirsFiles(file));
  18. } else {
  19. // 遇到文件,检查。
  20. if (file.getAbsolutePath().endsWith("txt")){
  21. System.out.println("文件:" + file.getAbsolutePath());
  22. }
  23. }
  24. }
  25. if (!subTasks.isEmpty()) {
  26. // 在当前的 ForkJoinPool 上调度所有的子任务。
  27. for (FindDirsFiles subTask : invokeAll(subTasks)) {
  28. subTask.join();
  29. }
  30. }
  31. }
  32. }
  33. public static void main(String [] args){
  34. try {
  35. // 用一个 ForkJoinPool 实例调度总任务
  36. ForkJoinPool pool = new ForkJoinPool();
  37. FindDirsFiles task = new FindDirsFiles(new File("E:/"));
  38. /*异步提交*/
  39. pool.execute(task);
  40. /*主线程做自己的业务工作*/
  41. System.out.println("Task is Running......");
  42. Thread.sleep(10000);
  43. int otherWork = 0;
  44. for(int i=0;i<100;i++){
  45. otherWork = otherWork+i;
  46. }
  47. System.out.println("Main Thread done sth......,otherWork="
  48. +otherWork);
  49. // task.join();//阻塞方法 如果执行此方法,则任务执行完 才继续往下走
  50. System.out.println("Task end");
  51. } catch (Exception e) {
  52. // TODO Auto-generated catch block
  53. e.printStackTrace();
  54. }
  55. }
  56. }

执行结果:

20201107164428853.png

6、forkjoin 实现一个归并排序算法:

  1. import java.util.Arrays;
  2. import java.util.concurrent.ForkJoinPool;
  3. import java.util.concurrent.RecursiveTask;
  4. /**
  5. * forkjoin实现的归并排序
  6. */
  7. public class FkSort {
  8. private static class SumTask extends RecursiveTask<int[]>{
  9. private final static int THRESHOLD = 2;
  10. private int[] src;
  11. public SumTask(int[] src) {
  12. this.src = src;
  13. }
  14. @Override
  15. protected int[] compute() {
  16. if(src.length<=THRESHOLD){
  17. return InsertionSort.sort(src);
  18. }else{
  19. //fromIndex....mid.....toIndex
  20. int mid = src.length / 2;
  21. SumTask leftTask = new SumTask(Arrays.copyOfRange(src, 0, mid));
  22. SumTask rightTask = new SumTask(Arrays.copyOfRange(src, mid, src.length));
  23. invokeAll(leftTask,rightTask);
  24. int[] leftResult = leftTask.join();
  25. int[] rightResult = rightTask.join();
  26. return SumTask.merge(leftResult,rightResult);
  27. }
  28. }
  29. /**
  30. * 归并排序——将两段排序好的数组结合成一个排序数组
  31. *
  32. * @param left
  33. * @param right
  34. * @return
  35. */
  36. public static int[] merge(int[] left, int[] right) {
  37. int[] result = new int[left.length + right.length];
  38. for (int index = 0, i = 0, j = 0; index < result.length; index++) {
  39. if (i >= left.length)/*左边数组已经取完,完全取右边数组的值即可*/
  40. result[index] = right[j++];
  41. else if (j >= right.length)/*右边数组已经取完,完全取左边数组的值即可*/
  42. result[index] = left[i++];
  43. else if (left[i] > right[j])/*左边数组的元素值大于右边数组,取右边数组的值*/
  44. result[index] = right[j++];
  45. else/*右边数组的元素值大于左边数组,取左边数组的值*/
  46. result[index] = left[i++];
  47. }
  48. return result;
  49. }
  50. }
  51. public static void main(String[] args) {
  52. ForkJoinPool pool = new ForkJoinPool();
  53. int[] src = MakeArray.makeArray();
  54. SumTask innerFind = new SumTask(src);
  55. long start = System.currentTimeMillis();
  56. int[] invoke = pool.invoke(innerFind);
  57. for(int number:invoke){
  58. System.out.println(number);
  59. }
  60. System.out.println(" spend time:"+(System.currentTimeMillis()-start)+"ms");
  61. }
  62. }

执行结果:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L25hbmRhbzE1OA_size_16_color_FFFFFF_t_70 4

到此forkjoin 解析完了。

发表评论

表情:
评论列表 (有 0 条评论,216人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线并发工具

    Fork-Join 什么是分而治之? 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到