分支/合并框架Fork/Join

Fork/Join框架是Java7提供的一种用于并行执行任务的框架,它通过“Fork”将一个大任务拆分成若干个小任务,然后将这些小任务分配给每个可用的处理器来执行。当每个小任务执行完成后,它们的结果将被合并起来,形成原始的大任务的结果。Fork/Join框架包含一个名为ForkJoinPool的类,它是一个执行Fork/Join任务的线程池。

ForkJoinPool简介
ForkJoinPool是Java7提供的一种用于执行Fork/Join任务的线程池,它拥有普通线程池的所有特性,如定时、定期执行任务、线程池的大小控制等,但它也拥有自己的特性,如自动拆分大任务并行执行、支持结果合并等。ForkJoinPool使用Fork/Join框架执行任务,需要实现ForkJoinTask接口,这个接口是一个抽象类,它有两个子类:RecursiveAction和RecursiveTask,分别用于无结果和有结果的任务。

ForkJoinPool使用
ForkJoinPool可以让程序员使用起来更加简单和方便,它提供了多个构造函数,用于创建ForkJoinPool,可以指定线程池的大小,也可以使用默认的线程池大小。可以使用ForkJoinPool的execute()方法执行任务,也可以使用submit()方法提交任务,这两个方法都可以接受ForkJoinTask实例作为参数。ForkJoinPool还提供了shutdown()方法用于关闭线程池,它还提供了invoke()方法,用于同步执行ForkJoinTask。

ForkJoinPool实例
ForkJoinPool的使用实例如下:

创建一个线程池:
ForkJoinPool pool = new ForkJoinPool();

提交一个任务:
pool.submit(new MyTask());

new MyTask()的实现是实现了ForkJoinTask的子类的类。

ForkJoinTask的子类的任务就是对大的计算任务进行任务拆分,任务拆分就要使用递归了。
ForkJoinTask按照是否有返回值分为两个子类实现:
1)RecurisiveTask(有返回值)
2)RecurisiveAction(无返回值)

创建了ForkJoinTask的子类,就要把任务提交到ForkJoinPool,例如我们继承RecursiveTask,需实现它唯一的抽象方法
compute:
protected abstract R compute();

关闭线程池:
pool.shutdown();
ForkJoinPool优点
ForkJoinPool支持自动拆分大任务并行执行,它可以提高任务的执行效率,提升程序的性能,并且它可以支持无结果和有结果的任务,使得任务的开发更加简单。

ForkJoinPool缺点
ForkJoinPool比普通线程池复杂,它的使用也比较复杂,而且它只支持Fork/Join任务,不支持普通任务。

我们来看代码演示:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class ForkJoinSum extends RecursiveTask<Long> {

    private static final int MIN = 10000;

    private long[] num;
    private int start;
    private int end;

    public ForkJoinSum(long[] num) {
        this(num,0,num.length);
    }

    private ForkJoinSum(long[] num, int start, int end) {
        this.num = num;
        this.start = start;
        this.end = end;
    }

    public static void main(String[] args) {
        long[] longs = LongStream.range(1, 1000000).toArray();
        ForkJoinTask<Long> cal = new ForkJoinSum(longs);
        Long result = new ForkJoinPool().invoke(cal);
        System.out.println(result);

        long sum = 0;
        for(int i=0; i<longs.length; i++) {
            sum += longs[i];
        }
        System.out.println(sum);

        /* 输出
        499999500000
        499999500000
         */
    }

    /**
     * 重写方法,自定义任务分割逻辑
     * @return
     */
    @Override
    protected Long compute() {
        if(end-start<=MIN) {
            return sum(start,end);
        }

        ForkJoinSum left = new ForkJoinSum(num,start,start+(end-start)/2);
        left.fork();
        ForkJoinSum right = new ForkJoinSum(num,start+(end-start)/2,end);
        Long rv = right.compute();
        Long lv = left.join();
        return lv+rv;
    }

    private long sum(int start, int end) {
        long s =0;
        for(int i=start; i<end; i++) {
            s += num[i];
        }
        return s;
    }
}

从代码中可以看到,我们定义的计算任务,是放到了ForkJoinPool中。
在实际应用时,一个应用中使用多个ForkJoinPool是没有什么意义的。
一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。

总结:ForkJoinPool是Java7提供的一种用于执行Fork/Join任务的线程池,它拥有普通线程池的所有特性,并且支持自动拆分大任务并行执行,提高任务的执行效率;但是,它也有一些缺点,比如比较复杂,不支持普通任务等。