线程的并发工具类(1)----Fork/Join框架
Fork/Join框架-分而治之
- 1、分而治之
- 1.1、什么是”分而治之”?
- 1.2、什么是”工作密取”?
- 2、Fork/Join具体实现
- 2.1、Fork/Join使用的标准范式
- 2.2、Fork/Join的同步用法同时演示返回结果值
- 2.3、Fork/Join的异步用法同时演示不要求返回值
1、分而治之
1.1、什么是”分而治之”?
分而治之:规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。
Fork/Join充分提现了”分而治之”。
下面放张图说明下:
将一个大任务分解成若干个小任务,但不一定就是平均分割,可以任意分割。所以就会存在某一个线程提前完成任务,而别的任务还在工作中。这时为了提高程序的效率,提高资源的利用率,所以提出了另外一个概念:“工作密取”。
1.2、什么是”工作密取”?
还是上张图体现下:
就是当一个线程分配到的任务做完成以后,会去从别的线程里获取新的任务去执行。为什么不会发生冲突呢?看下图就知道了,为了避免执行任务冲突,空闲线程是从繁忙线程中的末尾去获取任务执行的,所以不会出现冲突的情况。
这样就会充分利用了资源,而不会造成资源的浪费。
2、Fork/Join具体实现
2.1、Fork/Join使用的标准范式
2.2、Fork/Join的同步用法同时演示返回结果值
统计整形数组中所有元素的和
package com.ld.task;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/** * 计算数组中所有元素的和,传递给主线程(分而治之,同步调用) * 类似于二分法的用法 */
public class UserForJoinByRecursiveTask extends RecursiveTask<Integer> {
private final int THRESHOLD = MakeArray.length / 10;
private int[] src;
private int fromIndex;
private int toIndex;
public UserForJoinByRecursiveTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
int sum = 0;
if (toIndex - fromIndex < THRESHOLD) {
for (int i=fromIndex;i<=toIndex;i++){
sum += src[i];
}
return sum;
} else {
//fromIndex.....mid.....toIndex
int mid = (fromIndex+toIndex)/2; //这只是一种分的方式,也可以分解多个,数量不固定,invokeAll()方法支持数组、集合参数
UserForJoinByRecursiveTask left = new UserForJoinByRecursiveTask(src,fromIndex,mid);
UserForJoinByRecursiveTask right = new UserForJoinByRecursiveTask(src,mid,toIndex);
invokeAll(left,right);
return left.join()+right.join();
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(6);
UserForJoinByRecursiveTask task = new UserForJoinByRecursiveTask(MakeArray.getArr(),0,MakeArray.length-1);
long start = System.currentTimeMillis();
pool.invoke(task); //同步调用
Integer sum = task.join();
long end = System.currentTimeMillis();
System.out.println("数组的和是"+sum+",forkJoin消耗的时间是"+(end-start));
start = System.currentTimeMillis();
int sum2 = 0;
for (int num:MakeArray.getArr()) {
sum2+=num;
}
end = System.currentTimeMillis();
System.out.println("数组的和是"+sum+",普通循环消耗的时间是"+(end-start));
}
static class MakeArray {
public final static int length = 100000000; //按需使用,,如果长度不太长不建议使用线程。cpu上下文切换消耗时间
public static int[] getArr() {
int[] arr = new int[length];
for (int i = 0; i < arr.length; i++) {
arr[i] = i + 1;
}
return arr;
}
}
}
注意:pool.invoke()同步调用,会阻塞当前线程的执行,直到结果计算出来才继续向下执行。
2.3、Fork/Join的异步用法同时演示不要求返回值
遍历指定目录(含子目录)寻找指定类型文件
package com.ld.task;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/** * 遍历目录(含有子目录)找到指定文件。(分而治之ForkJoin异步用法:不要求有返回值,继承RecursiveAction) */
public class UseForkJoin extends RecursiveAction {
private File filePath;
public UseForkJoin(File filePath) {
this.filePath = filePath;
}
@Override
protected void compute() {
List<UseForkJoin> subTasks = new ArrayList<>();
File[] files = filePath.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
subTasks.add(new UseForkJoin(file));
} else {
if (file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
for (UseForkJoin useForkJoin : invokeAll(subTasks)) {
useForkJoin.join();
}
}
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(6);
UseForkJoin useForkJoin = new UseForkJoin(new File("d:/"));
forkJoinPool.execute(useForkJoin);//异步调用
// forkJoinPool.invoke(useForkJoin);//同步调用
long start = System.currentTimeMillis();
System.out.println("Task is running...");
try {
Thread.sleep(1000);
System.out.println("执行主线程方法。。。。");
Thread.sleep(5000);
useForkJoin.join();
long end = System.currentTimeMillis();
System.out.println("Task is end.时间=" + (end - start));
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意:forkJoinPool.execute()异步调用,不会阻塞当前线程的执行,直到等到调用join()方法获取结果,如果结果没有出来才会阻塞当前线程的执行,直到获取到结果,会继续向下执行。
还没有评论,来说两句吧...