3.6 Fork-Join - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.2.0
3.6 Fork-Join
Fork/Join or Divide and Conquer is a very powerful abstraction to solve hierarchical problems.The abstraction
When talking about hierarchical problems, think about quick sort, merge sort, file system or general tree navigation and such.- Fork / Join algorithms essentially split a problem at hands into several smaller sub-problems and recursively apply the same algorithm to each of the sub-problems.
- Once the sub-problem is small enough, it is solved directly.
- The solutions of all sub-problems are combined to solve their parent problem, which in turn helps solve its own parent problem.
Check out the fancy interactive Fork/Join visualization demo , which will show you how threads cooperate to solve a common divide-and-conquer algorithm.The mighty JSR-166y library solves Fork / Join orchestration pretty nicely for us, but leaves a couple of rough edges, which can hurt you, if you don't pay attention enough. You still deal with threads, pools or synchronization barriers.
The GPars abstraction convenience layer
GPars can hide the complexities of dealing with threads, pools and recursive tasks from you, yet let you leverage the powerful Fork/Join implementation in jsr166y.import static groovyx.gpars.GParsPool.runForkJoin import static groovyx.gpars.GParsPool.withPoolwithPool() { println """Number of files: ${ runForkJoin(new File("./src")) {file -> long count = 0 file.eachFile { if (it.isDirectory()) { println "Forking a child task for $it" forkOffChild(it) //fork a child task } else { count++ } } return count + (childrenResults.sum(0)) //use results of children tasks to calculate and store own result } }""" }
def quicksort(numbers) { withPool { runForkJoin(0, numbers) {index, list -> def groups = list.groupBy {it <=> list[list.size().intdiv(2)]} if ((list.size() < 2) || (groups.size() == 1)) { return [index: index, list: list.clone()] } (-1..1).each {forkOffChild(it, groups[it] ?: [])} return [index: index, list: childrenResults.sort {it.index}.sum {it.list}] }.list } }
The important piece of the puzzle that needs to be mentioned here is that forkOffChild() doesn't wait for the child to run. It merely schedules it for execution some time in the future. If a child fails by throwing an exception, you should not expect the exception to be fired from the forkOffChild() method itself. The exception ise likely to happen long after the parent has returned from the call to the forkOffChild() method.It is the getChildrenResults() method that will re-throw exceptions that happened in the child sub-tasks back to the parent task.
Alternative approach
Alternatively, the underlying mechanism of nested Fork/Join worker tasks can be used directly. Custom-tailored workers can eliminate the performance overhead associated with parameter spreading imposed when using the generic workers. Also, custom workers can be implemented in Java and so further increase the performance of the algorithm.public final class FileCounter extends AbstractForkJoinWorker<Long> { private final File file; def FileCounter(final File file) { this.file = file } @Override protected Long computeTask() { long count = 0; file.eachFile { if (it.isDirectory()) { println "Forking a thread for $it" forkOffChild(new FileCounter(it)) //fork a child task } else { count++ } } return count + ((childrenResults)?.sum() ?: 0) //use results of children tasks to calculate and store own result } }withPool(1) {pool -> //feel free to experiment with the number of fork/join threads in the pool println "Number of files: ${runForkJoin(new FileCounter(new File("..")))}" }
Fork / Join saves your resources
Fork/Join operations can be safely run with small number of threads thanks to internally using the TaskBarrier class to synchronize the threads. While a thread is blocked inside an algorithm waiting for its sub-problems to be calculated, the thread is silently returned to the pool to take on any of the available sub-problems from the task queue and process them. Although the algorithm creates as many tasks as there are sub-directories and tasks wait for the sub-directory tasks to complete, as few as one thread is enough to keep the computation going and eventually calculate a valid result.Mergesort example
import static groovyx.gpars.GParsPool.runForkJoin import static groovyx.gpars.GParsPool.withPool/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize) while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] } if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]withPool(3) { //feel free to experiment with the number of fork/join threads in the pool println """Sorted numbers: ${ runForkJoin(numbers) {nums -> println "Thread ${Thread.currentThread().name[-1]}: Sorting $nums" switch (nums.size()) { case 0..1: return nums //store own result case 2: if (nums[0] <= nums[1]) return nums //store own result else return nums[-1..0] //store own result default: def splitList = split(nums) [splitList[0], splitList[1]].each {forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }""" }
Mergesort example using a custom-tailored worker class
public final class SortWorker extends AbstractForkJoinWorker<List<Integer>> { private final List numbers def SortWorker(final List<Integer> numbers) { this.numbers = numbers.asImmutable() } /** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] } /** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize) while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] } if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result } /** * Sorts a small list or delegates to two children, if the list contains more than two elements. */ @Override protected List<Integer> computeTask() { println "Thread ${Thread.currentThread().name[-1]}: Sorting $numbers" switch (numbers.size()) { case 0..1: return numbers //store own result case 2: if (numbers[0] <= numbers[1]) return numbers //store own result else return numbers[-1..0] //store own result default: def splitList = split(numbers) [new SortWorker(splitList[0]), new SortWorker(splitList[1])].each{forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]withPool(1) { //feel free to experiment with the number of fork/join threads in the pool println "Sorted numbers: ${runForkJoin(new SortWorker(numbers))}" }
Running child tasks directly
The forkOffChild() method has a sibling - the runChildDirectly() method, which will run the child task directly and immediately within the current thread instead of scheduling the child task for asynchronous processing on the thread pool. Typically you'll call _forkOffChild() on all sub-tasks but the last, which you invoke directly without the scheduling overhead.Closure fib = {number -> if (number <= 2) { return 1 } forkOffChild(number - 1) // This task will run asynchronously, probably in a different thread final def result = runChildDirectly(number - 2) // This task is run directly within the current thread return (Integer) getChildrenResults().sum() + result } withPool { assert 55 == runForkJoin(10, fib) }