第一步分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。
第二步執行任務並合並結果。分割的子任務分別放在雙端隊列裡,然後幾個啟動線程分別從雙端隊列裡獲取任務執行。子任務執行完的結果都統一放在一個隊列裡,啟動一個線程從隊列裡拿數據,然後合並這些數據。
Fork/Join使用兩個類來完成以上兩件事情:
1 package com.thread.test.thread;
2
3 import java.io.File;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.concurrent.ForkJoinPool;
7 import java.util.concurrent.RecursiveTask;
8
9 /**
10 * Created by windwant on 2016/6/3.
11 */
12 public class MyForkJoin {
13
14 public static void main(String[] args) {
15 MyTask task = new MyTask(new File("D:\\MPS"));
16 Integer sum = new ForkJoinPool().invoke(task);
17 System.out.println(sum);
18 }
19 }
20
21 class MyTask extends RecursiveTask<Integer>{
22
23 public Integer num = 0;
24
25 private File file;
26 MyTask(File file){
27 this.file = file;
28 }
29
30 @Override
31 protected Integer compute() {
32 List<MyTask> taskList = new ArrayList<MyTask>();
33 if(file.isDirectory()){
34 File[] list = file.listFiles();
35 for(File subf: list){
36 if(subf.isDirectory()){
37 MyTask mt = new MyTask(subf);
38 taskList.add(mt);
39 }else{
40 num++;
41 }
42 }
43 }else{
44 num = 1;
45 }
46
47 if(!taskList.isEmpty()){
48 //同下
49 // for(MyTask mtask: taskList){
50 // mtask.fork();
51 // }
52 // for(MyTask mtask: taskList){
53 // num += mtask.join();
54 // }
55
56 for(MyTask mtask: invokeAll(taskList)){
57 num += mtask.join();
58 }
59 }
60 return num;
61 }
62 }
項目地址:https://github.com/windwant/threadtest