Java 7 之 fork-join 框架
java
2016-06-28

前言

如果让我个人理解什么是fork-join,我立刻会想到hadoop的map/reduce。他们是同一种模 型。更早之前,笔者就看过关于fork-join的相关文章,但是理解的都不够深刻。自从深入 研究了java多线程这块的相关技术,再回头来看fork-join,觉得真是个了不起的技术。

概述

fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处 理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在 于能够使用所有可用的运算能力来提升你的应用的性能。

类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线 程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作 而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任 务。 fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。 ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

Divide and conquer

合并排序是 divide-and-conquer 算法的一个例子,在这种算法中将一个问题递归分解成子 问题,再将子问题的解决方案组合得到最终结果。divide-and-conquer 算法也可用于顺序 环境中,但是在并行环境中更加有效,因为可以并行处理子问题。

并行 divide-and-conquer 算法首先对问题进行评估,确定其大小是否更适合使用顺序解决 方案;通常,可通过将问题大小与某个阙值进行比较完成。如果问题大到需要并行分解,算 法会将其分解成两个或更多子问题,并以并行方式对子问题递归调用算法本身,然后等待子 问题的结果,最后合并这些结果。用于选择顺序和并行执行方法的理想阙值是协调并行任务 的成本。如果协调成本为 0,更多的更细粒度的任务会提供更好的并行性;在需要转向顺序 方法之前,协调成本越低,就可以划分更细粒度的任务。

递归(旧的方法)

如果从一个数组中,选一个最大值。我一般都会采用递归调用。这样的话只有当前线程的一 个方法是运行的,其他方法都阻塞在线程栈中。所以在多核情况下,其他CPU都是空闲,没 有得到充分利用。

并行计算(fork-join)

类似hadoop的map/reduce,可以将任务拆分成多个块,然后最终将这些子结果集合并成最终 结果集。它的行为表现为当前任务是暂停的,并行执行两个子任务,而当前任务等待两个子 任务的完成。然后就可以将两个子任务的结果进行合并。这种并行分解方法常常称作 fork-join,因为执行一个任务将首先分解(fork)为多个子任务,然后再合并(join) (完成后)。

使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下 面所示的伪代码类似:


if(当前这个任务工作量足够小)
  直接完成这个任务
else
  将这个任务或这部分工作分解成两个部分

分别触发(invoke)这两个子任务的执行,并等待结果

示例代码如下:

importjava.util.concurrent.ForkJoinPool;
importjava.util.concurrent.RecursiveTask;

/**
 *@authorpiaohailin
 *@date2014-3-26
 */
 public class Fibonacci extends RecursiveTask<Long> {
   privatestaticfinallongserialVersionUID=7875142223684511653L;
   privatefinallongn;

   Fibonacci(longn){
     this.n=n;
   }

   protected Longcompute(){
     if(n<=1){
       returnn;
     }
     Fibonacci f1 = new Fibonacci(n-1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n-2);
     return f2.compute() + f1.join();
 }

 public static void main(String[] args) {
   Fibonacci task = new Fibonacci(10);
   ForkJoinPool pool = new ForkJoinPool(4);
   pool.invoke(task);
   System.out.println(task.getRawResult());
 }

}


工作窃取

fork-join 框架通过一种称作工作窃取(work stealing) 的技术减少了工作队列的争用情 况。每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的 (Java 6 在类库中添加了几种 deque 实现,包括ArrayDeque 和 LinkedBlockingDeque)。 当一个任务划分一个新线程时,它将自己推到 deque 的头部。当一个任务执行与另一个未 完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任 务完成(像Thread.join() 的操作一样)。当线程的任务队列为空,它将尝试从另一个线程 的 deque 的尾部 窃取另一个任务。

可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争 用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque头部永远不会发生争用; 因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的 争用(在 fork-join 框架中结合 deque实现会使这些访问模式进一步减少协调成本)。跟 传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进 先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另 一个线程需要窃取任务时,它将得到一个能够分解成多个小任务的任务,从而避免了在未来 窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最 小。

标准实现

除了能够使用fork/join框架来实现能够在多处理系统中被并行执行的定制化算法(如前文 中的ForkBlur.java例子),在Java SE中一些比较常用的功能点也已经使用fork/join框架 来实现了。在Java SE 8中,java.util.Arrays类的一系列parallelSort()方法就使用了 fork/join来实现。这些方法与sort()系列方法很类似,但是通过使用fork/join框架,借助 了并发来完成相关工作。在多处理器系统中,对大数组的并行排序会比串行排序更快。这些 方法究竟是如何运用fork/join框架并不在本教程的讨论范围内。想要了解更多的信息,请 参见Java API文档。 其他采用了fork/join框架的方法还包括java.util.streams包中的一 些方法,此包是作为Java SE 8发行版中Project Lambda的一部分。想要了解更多信息,请 参见Lambda Expressions一节。

参考资料 http://www.ibm.com/developerworks/cn/java/j-jtp11137.html

其它文章