A Java Fork/Join Framework(译文)

translate by jin.shengjie

摘要

  本篇论文介绍对支持并行程序Java框架的设计,具体实现和性能效率,所谓的并行程序就是将问题(递归)分解成多个并行解决的子任务,等待这些子任务的完成并组合最终的结果。这种通用设计是为Cilk编程语言设计的工作窃取框架的一种变种。 其中主要的实现技巧是围绕着有效的构建和管理任务队列和工作线程。 性能结果表明好的并行方式能够让多数程序加速执行,同时也表明程序有可能有改进的空间。

简介

  对于获取好的并行性能而言,Fork/Join并行方式是介于最简单和最有效率设计技巧中间的。Fork/Join的各种算法是常见分而治之算法(divide−and−conquer algorithms)的并行版本。常见的一种形式如下:

1
2
3
4
5
6
7
8
9
10
Result solve(Problem problem) {
if (problem is small){
directly solve problem
} else {
split problem into independent parts
fork new subtasks to solve each part;
join all subtasks;
compose result from subresults;
}
}

  fork操作启动一个新的并行fork/join子任务,join操作将会等到所有forked子任务都完成后才会继续当前任务。就像其他分而治之算法一样,Fork/Join算法多数情况下都需用到递归,即不停的将任务分解为小任务,直到任务小到可以用简单的,短小有序的方法来解决。一些关联的编程技巧和例子在《Java并发编程》第二版4.4章节有做讨论。本论文主要讨论ForkJoinTask的设计,实现和性能,ForkJoinTask是支持这种编程方式的Java框架。

设计

  Fork/join程序可以运行在任何一个支持构建子任务,这些子任务能够被并行执行,同时具有等待子任务返回任务结果机制的框架之上。然而java.lang.Thread这个类是支持fork/join程序的次优载体:

  • Fork/join 的任务具有简单正常的同步机制和管理需求。相比于通用目的的线程而言,fork/join任务的计算能力具备更高效率的执行策略。比如:fork/join任务除了等待子任务结束之外不需要被阻塞,因此,通用目的线程用来追踪线程的成本就会被浪费掉了。
  • 在某些给定任务粒度的情况下,创建和管理线程的成本就会大于任务做计算本身的时间成本。然而,当程序运行在特定的平台上时,这些粒度要做相应的调整,以便任务粒度大到可以覆盖掉线程开销来利用并行性。

  简而言之,标准的线程框架用来支持多数的fork/join程序还是太重了。但是同时因为线程是许多其他并发和并行程序的基石,所以仅仅为了支持这种fork/join方式而去移除线程开销和调优线程调度是不可能的(至少是不切实际的)。
实际上这种想法已经有很长的一段历史了,第一个提供这些问题的系统解决方案的框架是Cilk编程语言。Cilk和其他轻量级可执行框架将具有特定目的的fork/join 程序构建在操作系统的基本线程或进程机制之上。同样的机制也被运用到了Java语言当中,虽然在java中的线程正好反过来具有更底层操作系统能力。创建这种java轻量级执行框架的主要优势是能够用一种更便捷的写法来使用fork/join程序,同时能够运行在多数能够支持JVM的系统之上。
ForkJoinTask框架是基于在Cilk编程语言被使用的设计的一种变种。其他变种能够在Hood,Filamengs,stackthreads,以及那些构建于轻量级可执行任务的系统中看到。所有这些构建都是将任务映射成线程,就像操作系统以同样的方式将线程映射成CPU一样,但是fork/join程序在执行这种映射的时候能够充分利用简单性,规律性以及约束。所有的这些框架都已不同的写法来实现并行程序,且他们都为fork/join的设计而作了相应的优化:

  • 一个工作线程的线程池会被建立。每一个工作线程(在这里就是一个继承于ForkJoinTaskRuuner线程的实例)都是一个标准(“重”)的线程,这些线程用来处理队列中的任务。正常来说,工作线程的数量和系统中CPU的数量是一样的。再像Cilk这种native框架当中,他们会将工作线程映射成内核线程或轻量级进程,然后在映射到CPU。 在java中,JVM和操作系统必须将这些工作线程映射成CPU。然而,这对于操作系统而言是一份简单的任务,因为这些线程都是计算密集型的。任何一个合理的映射策略都会将这些线程映射到不同的CPU上。
  • 所有fork/join任务都是轻量级可执行类的实例,并不是线程的实例。在java中,独立可执行的任务必须要实现Runnable接口和定义一个run方法。在ForkJoinTask框架下,这些任务都是ForkJoinTask的子类而不是Thread的子类。
  • 一个特定目的的队列和调度规则被用来管理任务和通过工作线程来执行他们。这些机制通过那些任务类中提供的几个方法来触发,主要的方法有: fork, join, isDone(一个完成状态指示器), 还有一些便利性的方法,比如coInvoke方法用来fork然后同时join两个或者更多的任务。
  • 一个简单的控制和管理设施。用来建立工作池以及在一个正常的线程(比如java中的main方法)中启动一个给定的fork/join任务的执行。
    作为对于程序员来说一个fork/join框架是怎么展现的标砖样例,这里有一个计算斐波那契数列(Fibonacci)的方法:
class Fib extends FJTask {
    static final int threshold = 13;
    volatile int number; // arg/result

    Fib(int n) { number = n; }

    int getAnswer() {
        if (!isDone())
            throw new IllegalStateException();
        return number;
    }

    public void run() {
        int n = number;
        if (n <= threshold) // granularity ctl
            number = seqFib(n);
        else {
            Fib f1 = new Fib(n − 1);
            Fib f2 = new Fib(n − 2);
            coInvoke(f1, f2);
            number = f1.number + f2.number;
        }
    }

    public static void main(String[] args) {
        try {
            int groupSize = 2; // for example
            FJTaskRunnerGroup group =
                    new FJTaskRunnerGroup(groupSize);
            Fib f = new Fib(35); // for example
            group.invoke(f);
            int result = f.getAnswer();
            System.out.println("Answer: " +
                    result);
        } catch (InterruptedException ex) {}
    }

    int seqFib(int n) {
        if (n <= 1)
            return n;
        else
            return seqFib(n−1) + seqFib(n−2);
    }
}

  这个版本的代码在第四章节描述的平台上相对于每个任务都新起一个java.lang.Thread来说在效率上至少要快上30倍。这样的代码同时还维护了多线程java程序本质上的可移植性。对于程序员来说只要两个参数需要去调整:

  1. 构建工作线程的数量,一般来说都是和平台可用的CPU数量是一致的。(或者更少:其他的CPU用来处理其他不相关的任务,又或者在极少数情况下需要多于CPU数量的工作线程:用来抵消非计算密集型任务留下的间隙)
  2. 一个粒度参数。 这个参数用来代表创建任务的成本将高于潜在的并行性所带来的优势。这个参数实际上是具有算法相关性而不是平台相关性。通常来说可能都是设置一个在单核处理器上能够获得很好效果的阈值,但是在多可CPU上,该值还是需要不断的去探查才能得到。作为一个副效应,这个方式可以很好的和JVM的动态编译想配合,JVM动态编译指的是会优化小的方法会优于单块程序。也就是说,利用数据本地化的有点,能够是fork/join算法优于其他的算法,即便是在单核处理器上。

工作偷窃

 fork/join框架的核心在于他的轻量级调度机制。 FJTask采用了一种由Cilk语言最先开创的工作偷窃调度机制的基本策略。

  • 每个工作线程在它自己的调度队列里面维护一个可运行的任务
  • 队列是作为双端队列来维护的,同时支持LIFO(后进先出)和FIFO(先进先出)操作
  • 在给定的工作线程中由任务生成的子任务都会被放进工作线程自己的双端队列当中
  • 工作线程通过LIFO的方式来处理他们自己的双端队列,也就是通过pop的形式取出最后放进去的任务
  • 当一个工作线程自己的双端队列当中没有任务可以执行的时候,它就会尝试随机地从其他工作线程的任务队列中通过FIFO的方式来获取任务,这种方式也可以称作为偷任务
  • 当一个工作线程遇到了join操作,???
  • 当一个工作线程没有自己的任务可做并且 从其他线程队列当中偷窃任务失败的时候,它将会暂时停歇然后继续尝试做任务直到左右的线程都是相同的空闲状态。在这种情况下所有的线程都会被阻塞直到其他线程从顶级层级被激活。???

  就像在其他文档当中描述的那样,每个工作线程通过LIFO的规则从自己的工作队列当中过去任务,然而去偷窃任务的时候用的是FIFO的规则,这是一种对广泛使用递归方式的fork/join的优化设计。不正式而言,这种设计提供了两种基本的优势:

  1. 作为双端队列的拥有者,偷窃者通过在反方向的操作将会减少拥有者和偷窃者在获取任务时候的冲突。同时这样也利用到了递归分而治之算法会导致先产生大任务的特点。因此,通过偷窃线程,最先被偷窃到的任务是有可能被分担到更多的工作的,也会导致进一步递归的分解
    2.作为这些规则的结果,相对于那些粗粒度的或者没有利用递归分解任务的程序而言,那些具有相对小任务粒度的程序会有运行更快的趋势。即使在多数的fork/join程序当中有相对少的任务会被偷窃,创建细粒度任务意味着当工作线程更有获取到它的可能性。

实现

  这个框架用了大约800行存java代码来实现,主要都在FJTaskRunner这个类当中,它是java.lang.Thread的子类。 FJTasks本身仅仅包含了完成状态及时通过委托他们当前工作线程来执行其他操作。FJTaskRunnerGroup类用来构建工作线程,维护一些共享状态以及帮组协调启动和关闭。
更过的实现文档在util.concurrent包中可以获得。本节内容仅仅讨论实现这个框架过程中遇到的问题和解决方案,问题包含: 支持有效的双端队列操作(push,pop,take); 管理线程获取新任务的偷窃协议。

双端队列

  为了有效和可扩展性执行任务,任务管理必须被设计的尽可能的快。创建,推送以及后来的获取(不常发生的偷窃)任务和顺序执行程序的过程调用一样都是需要开销的。更少的开销能够让程序员采取更小的任务粒度,更小的任务粒度反过来又能获得好的并行性。
任务分配本身是JVM的责任。 java垃圾收集器允许我们创建一个特殊目的的内存分配器来维护任务。 如此一来,相比通过其他语言来实现FJTasks,通过java语言来实现将会大幅减少复杂度和代码量。
双端队列的基本结构是通过单数组来构建的普通结构,每个双端队列有两个索引; top 索引的行为就像一个基于数组栈指示器,做pushpop操作的时候会改变该值。 base索引只有take操作的时候才会改变。由于FJTaskRunner里面的操作都是和双端队列里面的细节紧密相关的(比如,fork就是简单的调用push操作),所以双端队列这个数据机构就嵌套在这个类当中,而不是独立一个组件。
  由于双端队列这个数组会同时被多个线程访问,有时候还不能完全具有同步机制,又由于一个独立的JAVA数组元素不能被申明成volatile,所以每个数组元素实际上一个固定的引用,这个引用对象维护着一个volatile引用。 当初做出这个决定主要是为了兼容JAVA内存规则,但是这种间接的引用最后被证明在测试平台上能够提高性能,推测起来大概是由于访问附近元素减少了缓存竞争,而这种附近元素的分布由于间接引用的关系在内存变得更加均匀。
  实现双端队列的主要困难围绕了同步机制和同步机制的终止。即使在已经优化过同步机制设施的JVM上,为每个pushpop操作获取锁都会成为一个瓶颈。 然而,在Cilk语言中采用策略的改版提供了一个解决办法,这个解决办法基于以下几个准则:

  • pushpop操作只能有拥有者线程来触发
  • 通过在take操作上加上进入锁,很容易在同一时刻限制只有偷窃线程来执行take操作。(必要时,双端队列的锁也能禁用take操作) 因此,冲突控制的问题就缩小为两个部分的同步机制问题了
  • 只有在双端队列即将要变空的情况下,poptake操作才会有冲突。其他情况下,都能保证这两个操作是在相互独立的元素上进行操作的

  如果双端队列里面确保多于一个元素的话,将popbase这两个索引定义为volatile变量能够保证poptake这两个操作不需要加锁。这是通过一种类似于戴克斯特拉算法来做到的,在这种算法当中, push会提前递减top变量:if (--top >= base) ...以及take会提前增加base变量:if(++base < top) ...
在这两个情况下,必须要在随后比较这两个索引来判断操作是否会导致双端队列为空。一个部队称的规则被用来处理潜在的冲突: pop操作会再检查状态并且在获得了双端队列锁之后继续操作(这把锁也是take操作需要持有的锁), 只有在双端队列真的为空的情况下,pop操作才会回退。相反, take操作会立即回退,通常来说他们会去尝试从其他的队列中去偷窃任务。这种不对称也是和Cilk语言中的协议差别最大的一点。???
  除非双端队列将要溢出,否则volatile变量索引的使用使得在做push操作的时候也不需要同步机制。如果双端队列真的将要溢出,push操作必须要获取到双端队列锁来进行扩容数组。否则,简单的确保只有在双端队列数组在take操作的时候被填满时才去更新。