Table of Contents

1. Introduction
2. Getting Started
2.1 Downloading and Installing
2.2 A Hello World Example
2.3 Getting Set-up in an IDE
2.4 What's new
3. Data Parallelism
3.1 Parallel Collections
3.1.1 GParsPool
3.1.2 GParsExecutorsPool
3.2 Map-Reduce
3.3 Asynchronous Invocation
3.4. Fork-Join
4. Groovy CSP
5. Actors
5.1 Actors Principles
5.2 Special Actors
5.3 Tips and Tricks
5.4 Classic Examples using Actors
6. Agent
7. Dataflow Concurrency
7.1 Dataflow tasks
7.2 Dataflow operators
7.3 Dataflow implementation
8. Tips

1. Introduction

The world of mainstream computing is changing rapidly these days. If you open the hood and look under the covers of your computer, you'll most likely see a dual-core processor there. Or a quad-core, if you're lucky enough. We all run our software on multi-processors. The code we write today and tomorrow will probably never run on a single processor system. Parallel hardware has become common-place. Not so with the software though, at least not yet. People still create single-threaded code, although it will never be able to leverage the full power of future hardware. Some experiment with low-level concurrency primitives, like threads, locks or synchronized blocks, however, it has become obvious that the common shared-memory multithreading causes more troubles than it solves. Low-level concurrency handling is usually hard to get right. And it's not much fun either. With such a radical change in hardware, software inevitably has to change dramatically too. Higher-level concurrency concepts like map/reduce, fork/join, actors or dataflow will provide natural abstractions for different types of problem domains while leveraging the multi-core hardware underneath.

Meet GPars - an open-source concurrency library for Groovy that aims to give you multiple high-level abstractions for writing concurrent code in Groovy - map/reduce, fork/join, asynchronous closures, actors, agents, dataflow concurrency and other concepts, which aim to make your Groovy code concurrent with little effort. With GPars your Groovy code can easily utilize all the available processors on the target system. You can run multiple calculations at the same time, request network resources in parallel, safely solve hierarchical divide-and-conquer problems, perform functional style map/reduce collection processing or build your applications around the actor model.

The project is open sourced under the Apache 2 License . If you're working on a commercial, open-source, educational or any other type of software project in Groovy, download the binaries or integrate them from the maven repository and get going. The way to witting highly concurrent Groovy code is wide open. Enjoy!

2. Getting Started

Let's make several assumptions before we really start.
  1. You know and love Groovy. Otherwise you'd hardly invest your valuable time into studying a Groovy concurrency library.
  2. You target multi-core hardware with your code
  3. You use or want to use Groovy to write concurrent code.
  4. You have at least some understanding that in concurrent code some things can happen at any time in any order and often more of them at the same time.

That's about it. Let's roll the ball forward.

Brief overview

GPars aims to bring several useful concurrency abstractions to the Groovy developers. It's becoming obvious that dealing with concurrency on the thread/synchronized/lock level, as provided by the JVM, is way too low level to be safe and comfortable. Many high-level concepts, like actors or dataflow concurrency have been around for quite some time, since parallel computers had been in use in computer centers long before multi-core chips hit the hardware mainstream. Now, however, it's the time to adopt and test these abstractions for the mainstream software industry.

The concepts available in GPars can be categorized into three main groups:

  1. Code-level helpers - constructs that can be applied to small parts of the code-base such as individual algorithms or data structures without any major changes in the overall project architecture
  1. Architecture-level concepts - constructs that need to be taken into account when designing the project structure
  1. Shared Mutable State Protection - although about 95 of current use of shared mutable state can be avoided using proper abstractions, good abstractions are still necessary for the remaining 5% use cases, when shared mutable state can't be avoided

2.1 Downloading and Installing

There are several ways to add GPars to your project. Either download and add all the jar files manually, specify a dependency in Maven, Ivy or Gradle build files or use Grape. If you're building a Grails or a Griffon application, you can leverage the appropriate plugins to fetch the jar files for you.

Please visit the Integration page of the project for details.

2.2 A Hello World Example

Once you got setup, try the following script to test that your setup is functional.
import static groovyx.gpars.actor.Actors.actor

/** * A demo showing two cooperating actors. The decryptor decrypts received messages and replies them back. * The console actor sends a message to decrypt, prints out the reply and terminates both actors. * The main thread waits on both actors to finish using the join() method to prevent premature exit, * since both actors use the default actor group, which uses a daemon thread pool. * @author Dierk Koenig, Vaclav Pech */

def decryptor = actor { loop { react {message -> if (message instanceof String) reply message.reverse() else stop() } } }

def console = actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it decryptor.send false } }

[decryptor, console]*.join()

You should get a message "Decrypted message: Groovy is parallel" printed out on the console when you run the code.

2.3 Getting Set-up in an IDE

Adding the GPars jar files to your project or defining the appropriate dependencies in pom.xml should be enough to get you started with GPars in your IDE.

GPars DSL recognition

IntelliJ IDEA in both the free Community Edition and the commercial Ultimate Edition will recognize the GPars domain specific languages, complete methods like eachParallel() , reduce() or callAsync() and validate them. GPars uses the GroovyDSL mechanism, which teaches IntelliJ IDEA the DSLs as soon as the GPars jar file is added to the project.

2.4 What's new

The GPars 0.10 release introduces a lot of gradual enhancements and improvements on top of the previous 0.9 release.

Check out the JIRA release notes

Project changes

See http://gpars.codehaus.org/Breaking+Changes for the list of breaking changes.

Parallel collections

Fork / Join

Actors

myActor 'message'

GroovyCSP

Dataflow

Safe

myAgent increment

Other

Renaming hints

3. Data Parallelism

Focusing on data instead of processes helps a great deal to create robust concurrent programs. You as a programmer define your data together with functions that should be applied to it and then let the underlying machinery to process the data. Typically a set of concurrent tasks will be created and then they will be submitted to a thread pool for processing.

In GPars the GParsPool and GParsExecutorsPool classes give you access to low-level data parallelism techniques. While the GParsPool class relies on the jsr-166y Fork/Join framework and so offers greater functionality and better performance, the GParsExecutorsPool uses good old Java executors and so is easier to setup in a managed or restricted environment.

There are three fundamental domains covered by the GPars low-level data parallelism:

  1. Processing collections concurrently
  2. Running functions (closures) asynchronously
  3. Performing Fork/Join (Divide/Conquer) algorithms

3.1 Parallel Collections

Dealing with data frequently involves manipulating collections. Lists, arrays, sets, maps, iterators, strings and lot of other data types can be viewed as collections of items. The common pattern to process such collections is to take elements sequentially, one-by-one, and make an action for each of the items in row.

Take, for example, the min() function, which is supposed to return the smallest element of a collection. When you call the min() method on a collection of numbers, the caller thread will create an accumulator or so-far-the-smallest-value initialized to the minimum value of the given type, let say to zero. And then the thread will iterate through the elements of the collection and compare them with the value in the accumulator . Once all elements have been processed, the minimum value is stored in the accumulator .

This algorithm, however simple, is totally wrong on multi-core hardware. Running the min() function on a dual-core chip can leverage at most 50% of the computing power of the chip. On a quad-core it would be only 25%. Correct, this algorithm effectively wastes 75% of the computing power of the chip.

Tree-like structures proved to be more appropriate for parallel processing. The min() function in our example doesn't need to iterate through all the elements in row and compare their values with the accumulator . What it can do instead is relying on the multi-core nature of your hardware. A parallel_min() function could, for example, compare pairs (or tuples of certain size) of neighboring values in the collection and promote the smallest value from the tuple into a next round of comparison. Searching for minimum in different tuples can safely happen in parallel and so tuples in the same round can be processed by different cores at the same time without races or contention among threads.

Meet Parallel Arrays

The jsr-166y library brings a very convenient abstraction called Parallel Arrays . GPars leverages the Parallel Arrays implementation in several ways. The GParsPool and GParsExecutorsPool classes provide parallel variants of the common Groovy iteration methods like each() , collect() , findAll() and such.

def selfPortraits = images.findAllParallel{it.contains me}.collectParallel {it.resize()}
It also allows for a more functional style map/reduce collection processing.
def smallestSelfPortrait = images.parallel.filter{it.contains me}.map{it.resize()}.min{it.sizeInMB}

3.1.1 GParsPool

Use of GParsPool - the JSR-166y based concurrent collection processor

Usage of GParsPool

The GParsPool class enables a ParallelArray-based (from JSR-166y) concurrency DSL for collections and objects.

Examples of use:

//summarize numbers concurrently
 GParsPool.withPool {
     final AtomicInteger result = new AtomicInteger(0)
     [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
     assertEquals 15, result
 }

//multiply numbers asynchronously GParsPool.withPool { final List result = [1, 2, 3, 4, 5].collectParallel {it * 2} assert ([2, 4, 6, 8, 10].equals(result)) }

The passed-in closure takes an instance of a ForkJoinPool as a parameter, which can be then used freely inside the closure.
//check whether all elements within a collection meet certain criteria
 GParsPool.withPool(5) {ForkJoinPool pool ->
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }
The GParsPool.withPool() method takes optional parameters for number of threads in the created pool and an unhandled exception handler.
withPool(10) {...}
withPool(20, exceptionHandler) {...}

The GParsPool.withExistingPool() takes an already existing ForkJoinPool instance to reuse. The DSL is valid only within the associated block of code and only for the thread that has called the withPool() or withExistingPool() methods. The withPool() method returns only after all the worker threads have finished their tasks and the pool has been destroyed, returning back the return value of the associated block of code. The withExistingPool() method doesn't wait for the pool threads to finish.

Alternatively, the GParsPool class can be statically imported import static groovyx.gpars.GParsPool.`*` , which will allow omitting the GParsPool class name.

withPool {
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }

The following methods are currently supported on all objects in Groovy:

Meta-class enhancer

As an alternative you can use the ParallelEnhancer class to enhance meta-classes of any classes or individual instances with the parallel methods.

import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] ParallelEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] ParallelEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.everyParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

When using the ParallelEnhancer class, you're not restricted to a withPool() block with the use of the GParsPool DSLs. The enhanced classed or instances remain enhanced till they get garbage collected.

Exception handling

If an exception is thrown while processing any of the passed-in closures, the exception gets re-thrown from the xxxParallel methods.

Transparently parallel collections

On top of adding new xxxParallel() methods, GPars can also let you change the semantics of the original iteration methods. For example, you may be passing a collection into a library method, which will process your collection in a sequential way, let say using the collect() method. By changing the semantics of the collect() method on your collection you can effectively parallelize the library sequential code.

GParsPool.withPool {

//The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeTransparent()) }

/** * A function implemented using standard sequential collect() and findAll() methods. */ def selectImportantNames(names) { names.collect {it.toUpperCase()}.findAll{it.size() > 4} }

Transparent parallelizm is also available in combination with ParallelEnhancer .

/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}

def names = ['Joe', 'Alice', 'Dave', 'Jason'] ParallelEnhancer.enhanceInstance(names) //The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(names.makeTransparent())

Dependency resolution

For the GParsPool class to work, the jsr166y-070108.jar must be on the classpath.

<dependency>
    <groupId>org.coconut.forkjoin</groupId>
    <artifactId>jsr166y</artifactId>
    <version>070108</version>
</dependency>

Avoid side-effects in functions

We have to warn you. Since the closures that are provided to the parallel methods like eachParallel() or collectParallel() may be run in parallel, you have to make sure that each of the closures is written in a thread-safe manner. The closures must hold no internal state, share data nor have side-effects beyond the boundaries the single element that they've been invoked on. Violations of these rules will open the door for race conditions and deadlocks, the most severe enemies of a modern multi-core programmer.

Don't do this:

def thumbnails = []
images.eachParallel {thumbnails << it.thumbnail}  //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!
At least, you've been warned.

3.1.2 GParsExecutorsPool

Use of GParsExecutorsPool - the Java Executors' based concurrent collection processor

Usage of GParsExecutorsPool

The GParsPool class enables a Java Executors-based concurrency DSL for collections and objects.

The GParsExecutorsPool class can be used as a pure-JDK-based collection parallel processor. Unlike the GParsPool class, GParsExecutorsPool doesn't require jsr-166y jar file, but leverages the standard JDK executor services to parallelize closures processing a collections or an object iteratively. It needs to be states, however, that GParsPool performs typically much better than GParsExecutorsPool does.

Examples of use:

//multiply numbers asynchronously
 GParsExecutorsPool.withPool {
     Collection<Future> result = [1, 2, 3, 4, 5].collectParallel{it * 10}
     assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get()))
 }

//multiply numbers asynchronously using an asynchronous closure GParsExecutorsPool.withPool { def closure={it * 10} def asyncClosure=closure.async() Collection<Future> result = [1, 2, 3, 4, 5].collect(asyncClosure) assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get())) }

The passed-in closure takes an instance of a ExecutorService as a parameter, which can be then used freely inside the closure.
//find an element meeting specified criteria
 GParsExecutorsPool.withPool(5) {ExecutorService service ->
     service.submit({performLongCalculation()} as Runnable)
 }
The GParsExecutorsPool.withPool() method takes optional parameters for number of threads in the created pool and a thread factory.
withPool(10) {...}
withPool(20, threadFactory) {...}

The GParsExecutorsPool.withExistingPool() takes an already existing executor service instance to reuse. The DSL is valid only within the associated block of code and only for the thread that has called the withPool() or withExistingPool() method. The withPool() method returns only after all the worker threads have finished their tasks and the executor service has been destroyed, returning back the return value of the associated block of code. The withExistingPool() method doesn't wait for the executor service threads to finish.

Alternatively, the GParsExecutorsPool class can be statically imported import static groovyx.gpars.GParsExecutorsPool.`*`_, which will allow omitting the _GParsExecutorsPool class name.

withPool {
     def result = [1, 2, 3, 4, 5].findParallel{Number number -> number > 2}
     assert result in [3, 4, 5]
 }
The following methods on all objects, which support iterations in Groovy, are currently supported:

Meta-class enhancer

As an alternative you can use the GParsExecutorsPoolEnhancer class to enhance meta-classes for any classes or individual instances with asynchronous methods.

import groovyx.gpars.GParsExecutorsPoolEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] GParsExecutorsPoolEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] GParsExecutorsPoolEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.allParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

When using the GParsExecutorsPoolEnhancer class, you're not restricted to a withPool() block with the use of the GParsExecutorsPool DSLs. The enhanced classed or instances remain enhanced till they get garbage collected.

Exception handling

If exceptions are thrown while processing any of the passed-in closures, an instance of AsyncException wrapping all the original exceptions gets re-thrown from the xxxParallel methods.

Avoid side-effects in functions

Once again we need to warn you about using closures with side-effects effecting objects beyond the scope of the single currently processed element or closures which keep state. Don't do that! It is dangerous to pass them to any of the xxxParallel() methods.

3.2 Map-Reduce

The Parallel Collection Map/Reduce DSL gives GPars a more functional flavor. In general, the Map/Reduce DSL may be used for the same purpose as the xxxParallel() family methods and has very similar semantics. On the other hand, Map/Reduce can perform considerably faster, if you need to chain multiple methods to process a single collection in multiple steps:
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
            .map {it.toURL().text.toUpperCase()}
            .filter {it.contains('GROOVY')}
            .map{it.split()}
            .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
            .sum()

The xxxParallel() methods have to follow the contract of their non-parallel peers. So a collectParallel() method must return a legal collection of items, which you can again treat as a Groovy collection. Internally the parallel collect method builds an efficient parallel structure, called parallel array, performs the required operation concurrently and before returning destroys the Parallel Array building the collection of results to return to you. A potential call to let say findAllParallel() on the resulting collection would repeat the whole process of construction and destruction of a Parallel Array instance under the covers.

With Map/Reduce you turn your collection into a Parallel Array and back only once. The Map/Reduce family of methods do not return Groovy collections, but are free to pass along the internal Parallel Arrays directly. Invoking the parallel property on a collection will build a Parallel Array for the collection and return a thin wrapper around the Parallel Array instance. Then you can chain all required methods like:

Returning back to a plain Groovy collection instance is always just a matter of retrieving the collection property.

def myNumbers = (1..1000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collection

Avoid side-effects in functions

Once again we need to warn you. To avoid nasty surprises, please, keep your closures, which you pass to the Map/Reduce functions, stateless and clean from side-effects.

Availability

This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .

3.3 Asynchronous Invocation

Running long-lasting tasks in the background belongs to the activities, the need for which arises quite frequently. Your main thread of execution wants to initialize a few calculations, downloads, searches or such, however, the results may not be needed immediately. GPars gives the developers the tools to schedule the asynchronous activities for processing in the background and collect the results once they're needed.

Usage of GParsPool and GParsExecutorsPool asynchronous processing facilities

Both GParsPool and GParsExecutorsPool provide almost identical services in this domain, although they leverage different underlying machinery, based on which of the two classes the user chooses.

Closures enhancements

The following methods are added to closures inside the GPars(Executors)Pool.withPool() blocks:

Examples:

GParsPool.withPool() {
    Closure longLastingCalculation = {calculate()}
    Closure fastCalculation = longLastingCalculation.async()  //create a new closure, which starts the original closure on a thread pool
    Future result=fastCalculation()                           //returns almost immediately
    //do stuff while calculation performs …
    println result.get()
}

GParsPool.withPool() {
    /**
     * The callAsync() method is an asynchronous variant of the default call() method to invoke a closure.
     * It will return a Future for the result value.
     */
    assert 6 == {it * 2}.call(3)
    assert 6 == {it * 2}.callAsync(3).get()
}

Executor Service enhancements

The ExecutorService and jsr166y.forkjoin.ForkJoinPool class is enhanced with the << (leftShift) operator to submit tasks to the pool and return a Future for the result.

Example:

GParsExecutorsPool.withPool {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}

Running functions (closures) in parallel

The GParsPool and GParsExecutorsPool classes also provide handy methods executeAsync() and executeAsyncAndWait() to easily run multiple closures asynchronously.

Example:

GParsPool.withPool {
    assertEquals([10, 20], GParsPool.executeAsyncAndWait({calculateA()}, {calculateB()}))         //waits for results
    assertEquals([10, 20], GParsPool.executeAsync({calculateA()}, {calculateB()})*.get())  //returns Futures instead and doesn't wait for results to be calculated
}

3.4. Fork-Join

Fork/Join or Divide and Conquer is a very powerful abstraction to solve hierarchical problems.

The abstraction

When talking about hierarchical problems, think about quick sort, merge sort, file system or general tree navigation and such.

The mighty JSR-166y library solves Fork / Join orchestration pretty nicely for us, but leaves a couple of rough edges, which can hurt you, if you don't pay attention enough. You still deal with threads, pools or synchronization barriers.

The GPars abstraction convenience layer

GPars can hide the complexities of dealing with threads, pools and recursive tasks from you, yet let you leverage the powerful Fork/Join implementation in jsr166y.

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

withPool() { println """Number of files: ${ runForkJoin(new File("./src")) {file -> long count = 0 file.eachFile { if (it.isDirectory()) { println "Forking a child task for $it" forkOffChild(it) //fork a child task } else { count++ } } return count + (childrenResults.sum(0)) //use results of children tasks to calculate and store own result } }""" }

The runForkJoin() factory method will use the supplied recursive code together with the provided values and build a hierarchical Fork/Join calculation. The number of values passed to the runForkJoin() method must match the number of expected parameters of the closure as well as the number of arguments passed into the forkOffChild() method.

def quicksort(numbers) {
    withPool {
        runForkJoin(0, numbers) {index, list ->
            def groups = list.groupBy {it <=> list[list.size().intdiv(2)]}
            if ((list.size() < 2) || (groups.size() == 1)) {
                return [index: index, list: list.clone()]
            }
            (-1..1).each {forkOffChild(it, groups[it] ?: [])}
            return [index: index, list: childrenResults.sort {it.index}.sum {it.list}]
        }.list
    }
}

Alternative approach

Alternatively, the underlying mechanism of nested Fork/Join worker tasks can be used directly. Custom-tailored workers can eliminate the performance overhead associated with parameter spreading imposed when using the generic workers. Also, custom workers can be implemented in Java and so further increase the performance of the algorithm.

public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;

def FileCounter(final File file) { this.file = file }

@Override protected Long computeTask() { long count = 0; file.eachFile { if (it.isDirectory()) { println "Forking a thread for $it" forkOffChild(new FileCounter(it)) //fork a child task } else { count++ } } return count + ((childrenResults)?.sum() ?: 0) //use results of children tasks to calculate and store own result } }

withPool(1) {pool -> //feel free to experiment with the number of fork/join threads in the pool println "Number of files: ${runForkJoin(new FileCounter(new File("..")))}" }

The AbstractForkJoinWorker subclasses may be written both in Java or Groovy, giving you the option to easily optimize for execution speed, if row performance of the worker becomes a bottleneck.

Fork / Join saves your resources

Fork/Join operations can be safely run with small number of threads thanks to internally using the TaskBarrier class to synchronize the threads. While a thread is blocked inside an algorithm waiting for its sub-problems to be calculated, the thread is silently returned to the pool to take on any of the available sub-problems from the task queue and process them. Although the algorithm creates as many tasks as there are sub-directories and tasks wait for the sub-directory tasks to complete, as few as one thread is enough to keep the computation going and eventually calculate a valid result.

Mergesort example

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(3) { //feel free to experiment with the number of fork/join threads in the pool println """Sorted numbers: ${ runForkJoin(numbers) {nums -> println "Thread ${Thread.currentThread().name[-1]}: Sorting $nums" switch (nums.size()) { case 0..1: return nums //store own result case 2: if (nums[0] <= nums[1]) return nums //store own result else return nums[-1..0] //store own result default: def splitList = split(nums) [splitList[0], splitList[1]].each {forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }""" }

Mergesort example using a custom-tailored worker class

public final class SortWorker extends AbstractForkJoinWorker<List<Integer>> {
    private final List numbers

def SortWorker(final List<Integer> numbers) { this.numbers = numbers.asImmutable() }

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

/** * Sorts a small list or delegates to two children, if the list contains more than two elements. */ @Override protected List<Integer> computeTask() { println "Thread ${Thread.currentThread().name[-1]}: Sorting $numbers" switch (numbers.size()) { case 0..1: return numbers //store own result case 2: if (numbers[0] <= numbers[1]) return numbers //store own result else return numbers[-1..0] //store own result default: def splitList = split(numbers) [new SortWorker(splitList[0]), new SortWorker(splitList[1])].each{forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(1) { //feel free to experiment with the number of fork/join threads in the pool println "Sorted numbers: ${runForkJoin(new SortWorker(numbers))}" }

Availability

This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .

4. Groovy CSP

todo

5. Actors

The actor support in gpars were inspired by the Actors library in Scala but have meanwhile gone beyond that.

Actors allow for a messaging-based concurrency model, built from independent active objects that exchange messages and have no mutable shared state. Actors can help developers avoid issues like deadlocks, live-locks or starvation, so typical for shared memory, while leveraging the multi-core nature of today's hardware. A nice wrap-up of the key concepts behind actors was written recently by Ruben Vermeersch. Actors guarantee that always at most one thread processes the actor's body at a time and also under the covers the memory gets synchronized each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort . Ideally actor's code should never be invoked directly from outside so all the code of the actor class can only be executed by the thread handling the last received message and so all the actor's code is implicitly thread-safe . If any of the actor's methods is allowed to be called by other objects directly, the thread-safety guarantee for the actor's code and state are no longer valid .

Actors can share a relatively small thread pool. This can go as far as having many concurrent actors that share a single pooled thread. They avoid the threading limitations of the JVM.

Actor code is processed in chunks separated by quiet periods of waiting for new events (messages). This can be naturally modeled through continuations . As JVM doesn't support continuations directly, they have to be simulated in the actors frameworks, which has slight impact on organization of the actors' code. However, the benefits in most cases outweigh the difficulties.

import groovyx.gpars.actor.AbstractPooledActor

class GameMaster extends AbstractPooledActor { int secretNum

void afterStart() { secretNum = new Random().nextInt(10) }

void act() { loop { react { int num -> if ( num > secretNum ) reply 'too large' else if ( num < secretNum ) reply 'too small' else { reply 'you win' stop() System.exit 0 } } } } }

class Player extends AbstractPooledActor { String name AbstractPooledActor server int myNum

void act() { loop { myNum = new Random().nextInt(10) server.send myNum react { switch( it ) { case 'too large': println "$name: $myNum was too large"; break case 'too small': println "$name: $myNum was too small"; break case 'you win': println "$name: I won $myNum"; stop(); break } } } } }

def master = new GameMaster().start() new Player( name: 'Player', server: master ).start()

[master, player]*.join()

example by _Jordi Campos i Miralles, Departament de Matemàtica Aplicada i Anàlisi, MAiA Facultat de Matemàtiques, Universitat de Barcelona_

Usage of Actors

Gpars provides consistent Actor APIs and DSLs. Actors in principal perform three specific operations - send messages, receive messages and create new actors. Although not specifically enforced by GPars messages should be immutable or at least follow the hands-off policy when the sender never touches the messages after the message has been sent off.

Sending messages

Messages can be sent to actors using the send() method. Alternatively, the << operator or the implicit call() method can be used. A family of sendAndWait() methods is available to block the caller until a reply from the actor is available. The reply is returned from the sendAndWait() method as a return value. The sendAndWait() methods may also return after a timeout expires or in case of termination of the called actor.

actor.send 'Message'
actor << 'Message'    //using the << operator
actor 'Message'       //using the implicit call() method
def reply1 = actor.sendAndWait('Message')
def reply2 = actor.sendAndWait(10, TimeUnit.SECONDS, 'Message')
def reply3 = actor.sendAndWait(10.seconds, 'Message')

The sendAndContinue() method allows the caller to continue its processing while the supplied closure is waiting for a reply from the actor.

friend.sendAndContinue 'I need money!', {money -> pocket money}
println 'I can continue while my friend is collecting money for me'

All send()_, _sendAndWait() or sendAndContinue() methods will throw an exception if invoked on a non-active actor.

Receiving messages

Non-blocking message retrieval

Calling the react() method, optionally with a timeout parameter, from within the actor's code will consume the next message from the actor's inbox, potentially waiting, if there is no message to be processed immediately.

println 'Waiting for a gift'
react {gift ->
    if (myWife.likes gift) reply 'Thank you!'
}

Under the covers the supplied closure is not invoked directly, but scheduled for processing by any thread in the thread pool once a message is available. After scheduling the current thread will then be detached from the actor and freed to process any other actor, which has received a message already.

To allow detaching actors from the threads the react() method demands the code to be written in a special Continuation-style.

loop {
    println 'Waiting for a gift'
    react {gift ->
        if (myWife.likes gift) reply 'Thank you!'
        else {
            reply 'Try again, please'
            react {anotherGift ->
                if (myChildren.like gift) reply 'Thank you!'
            }
            println 'Never reached'
        }
    }
    println 'Never reached'
}
println 'Never reached'

The react() and loop() methods never return normally and any code put after a call to either of the two methods will never be executed. The closure supplied to the react() or loop() methods is the code where the computation should continue . Thus continuation style .

Blocking message retrieval

Unlike the react() method, which gives up the current thread until a message is available for an actor, the receive() method blocks waiting for a message. This allows for a non-continuation style code and also might have positive performance implications in certain scenarios.

Mixing react() and receive() calls within a single actor is also possible.

Actors.actor {
    def msg1 = receive()
    receive {msg2, msg3 ->
        [msg1, msg2, msg3]*.reply 'Hi!'
    }
    react {msg4 ->
        msg4.reply 'You're the last today!'
    }
}.start()

Sending replies

The reply/replyIfExists methods are not only defined on the actors themselves, but also on the messages upon their reception, which is particularly handy when handling multiple messages in a single call. In such cases reply() invoked on the actor sends a reply to authors of all the currently processed message (the last one), whereas reply() called on messages sends a reply to the author of the particular message only.

react {offerA ->
    react {offerB ->
        react {offerC ->
            //sent to each of the senders
            [offerA, offerB, offerC]*.reply 'Received your kind offer. Now processing it and comparing with others.'

offerA.reply 'You were the fastest' //sent to the author of offerA only

def winnerOffer = [offerA, offerB, offerC].min {it.price} winnerOffer.reply 'I accept your reasonable offer' //sent to the winner only ([offerA, offerB, offerC] - [winnerOffer])*.reply 'Maybe next time' //sent to the loosers only } } }

The sender property

Messages upon retrieval offer the sender property to identify the originator of the message

react {tweet ->
    if (isSpam(tweet)) ignoreTweetsFrom tweet.sender
}

Forwarding

When sending a message a different actor can be specified as the sender so that potential replies to the message will be forwarded to the specified actor and not to the actual originator.

def decryptor = actor {
    react {message ->
        reply message.reverse()
//        message.reply message.reverse()  //Alternatives to send replies
//        message.sender.send message.reverse()
    }
}

def console = actor { //This actor will print out decrypted messages, since the replies are forwarded to it react { println 'Decrypted message: ' + it } }

decryptor.send 'lellarap si yvoorG', console //Specify an actor to send replies to

Creating Actors

Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned to back the pool once a message has been processed and the actor is idle waiting for some more messages to arrive.

For example, this is how you create an actor that prints out all messages that it receives.

import static groovyx.gpars.actor.Actors.*

def console = actor { loop { react { println it } } }

Notice the loop() method call, which ensures that the actor doesn't stop after having processed the first message.

Here's an example with a decryptor service, which can decrypt submitted messages and send the decrypted messages back to the originators.

import static groovyx.gpars.actor.Actors.*

final def decryptor = actor { loop { react {String message-> if ('stopService' == message) stop() else reply message.reverse() } } }

actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it decryptor.send 'stopService' } }

Here's an example of an actor that waits for up to 30 seconds to receive a message, prints it out and terminates.

import static groovyx.gpars.actor.Actors.*

def me = actor { delegate.metaClass.onTimeout = {-> friend.send('I see, busy as usual. Never mind.')}

friend.send('Hi') react(30.seconds) { //continue conversation } }

Undelivered messages

Sometimes messages cannot be delivered to the target actor. When special action needs to be taken for undelivered messages, at actor termination all unprocessed messages from its queue have their onDeliveryError() method called. The onDeliveryError() method or closure defined on the message can, for example, send a notification back to the original sender of the message.

final AbstractPooledActor me
me = Actors.actor {
    def message1 = 1
    def message2 = 2

message1.metaClass.onDeliveryError = {-> me << "Could not deliver $delegate" }

message2.metaClass.onDeliveryError = {-> me << "Could not deliver $delegate" }

actor1 << message1 actor2 << message1 … }

Joining actors

Actors provide a join() method to allow callers to wait for the actor to terminate. A variant accepting a timeout is also available. The Groovy spread-dot operator comes in handy when joining multiple actors at a time.

def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()

[master, player]*.join()

Custom schedulers

Actors leverage the standard JDK concurrency library by default. To provide a custom thread scheduler use the appropriate constructor parameter when creating an actor group. The supplied scheduler will orchestrate threads in the group's thread pool.

Please also see the numerous Actor Demos.

5.1 Actors Principles

Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned back to the pool once a message has been processed and the actor is idle waiting for some more messages to arrive. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of _event-based actors_, which are detached from the underlying physical threads.

Here are some examples of how to use actors. This is how you create an actor that prints out all messages that it receives.

import static groovyx.gpars.actor.Actors.*

def console = actor { loop { react { println it } }

Notice the loop() method call, which ensures that the actor doesn't stop after having processed the first message.

As an alternative you can extend the AbstractPooledActor class and override the act() method. Once you instantiate the actor, you need to start it so that it attaches itself to the thread pool and can start accepting messages. The actor() factory method will take care of starting the actor.

class CustomActor extends AbstractPooledActor {
    @Override protected void act() {
        loop {
            react {
                println it
            }
        }
    }
}

def console=new CustomActor() console.start()

Messages can be sent to the actor using multiple methods

console.send('Message')
console 'Message'
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}
console.sendAndWait 'Message'

Creating an asynchronous service

import static groovyx.gpars.actor.Actors.*

final def decryptor = actor { loop { react {String message-> reply message.reverse() } } }

def console = actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it } }

console.join()

As you can see, you create new actors with the actor() method passing in the actor's body as a closure parameter. Inside the actor's body you can use loop() to iterate, react() to receive messages and reply() to send a message to the actor, which has sent the currently processed message. When the decryptor actor doesn't find a message in its message queue, the react() method gives up the thread and returns it back to the thread pool for other actors to pick up. Only after a new message arrives to the actor's message queue, the closure of the react() method gets scheduled for processing with the pool. Event-based actors internally simulate continuations - actor's work is split into sequentially run chunks, which get invoked once a message is available in the inbox. Each chunk for a single actor can be performed by different thread from the thread pool.

Groovy flexible syntax with closures allows our library to offer multiple ways to define actors. For instance, here's an example of an actor that waits for up to 30 seconds to receive a message, prints it out and terminates. Actors allow time DSL defined by org.codehaus.groovy.runtime.TimeCategory class to be used for timeout specification to the react() method.

import static groovyx.gpars.actor.Actors.*

def me = actor { delegate.metaClass.onTimeout = {->friend.send('I see, busy as usual. Never mind.')}

friend.send('Hi') react(10.seconds) { //continue conversation } }

me.join()

Notice the possibility to use Groovy meta-programming to define actor's lifecycle notification methods (e.g. onTimeout() ) dynamically.

Actors guarantee thread-safety for non-thread-safe code

Actors guarantee that always at most one thread processes the actor's body at a time and also under the covers the memory gets synchronized each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort .

class MyCounterActor extends AbstractPooledActor {
    private Integer counter = 0

protected void act() { loop { react { counter++ } } } }

Ideally actor's code should never be invoked directly from outside so all the code of the actor class can only be executed by the thread handling the last received message and so all the actor's code is implicitly thread-safe . If any of the actor's methods is allowed to be called by other objects directly, the thread-safety guarantee for the actor's code and state are no longer valid .

Simple calculator

A little bit more realistic example of an event-driven actor that receives two numeric messages, sums them up and sends the result to the console actor.

import static groovyx.gpars.actor.Actors.*

//not necessary, just showing that a single-threaded pool can still handle multiple actors defaultActorPGroup.resize 1

final def console = actor { loop { react { println 'Result: ' + it } } }

final def calculator = actor { react {a -> react {b -> console.send(a + b) } } }

calculator.send 2 calculator.send 3

calculator.join()

You can group reception of multiple messages in a single react() call.

final def calculator = actor {
    react {a, b ->
        console.send(a + b)
    }
}
Notice that event-driven actors require special care regarding the react() method. Since event_driven actors need to split the code into independent chunks assignable to different threads sequentially and continuations are not natively supported on JVM, the chunks are created artificially with tasks and exceptions. As a result the react() and loop() methods never return normally and actors' code must be structured accordingly. Again, this is in line with what Scala actors do.

Concurrent Merge Sort Example

For comparison I'm also including a more involved example performing a concurrent merge sort of a list of integers using actors. You can see that thanks to flexibility of Groovy we came pretty close to the Scala model, although I still miss Scala pattern matching for message handling.

import static groovyx.gpars.actor.Actors.*

Closure createMessageHandler(def parentActor) { return { react {List<Integer> message -> assert message != null switch (message.size()) { case 0..1: parentActor.send(message) break case 2: if (message[0] <= message[1]) parentActor.send(message) else parentActor.send(message[-1..0]) break default: def splitList = split(message)

def child1 = actor(createMessageHandler(delegate)) def child2 = actor(createMessageHandler(delegate)) child1.send(splitList[0]) child2.send(splitList[1])

react {message1 -> react {message2 -> parentActor.send merge(message1, message2) } } } } } }

def console = new DefaultPGroup(1).actor { react { println "Sorted array:t${it}" System.exit 0 } }

def sorter = actor(createMessageHandler(console)) sorter.send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3]) console.join()

Since actors reuse threads from a pool, the script will work with virtually any size of a thread pool, no matter how many actors are created along the way.

For brevity I didn't include the two helper methods split() and merge() in the code snippet. You can find them below.

def split(List<Integer> list) {
    int listSize = list.size()
    int middleIndex = listSize / 2
    def list1 = list[0..<middleIndex]
    def list2 = list[middleIndex..listSize - 1]
    return [list1, list2]
}

List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

Actor lifecycle methods

Each Actor can define lifecycle observing methods, which will be called whenever a certain lifecycle event occurs.

You can either define the methods statically in your Actor class or add them dynamically to the actor's metaclass:

def myActor = actor {
    delegate.metaClass.onException = {
        log.error('Exception occurred', it)
    }

… }

Pool management

_Actors_ can be organized into groups and as a default there's always an application-wide pooled actor group available. And just like the Actors abstract factory can be used to create actors in the default group, custom groups can be used as abstract factories to create new actors instances belonging to these groups.

def myGroup = new DefaultPGroup()

def actor1 = myGroup.actor { … }

def actor2 = myGroup.actor { … }

The actors belonging to the same group share the underlying thread pool of that group. The pool by default contains n + 1 threads, where n stands for the number of CPUs detected by the JVM. The pool size can be set explicitly either by setting the gpars.poolsize system property or individually for each actor group by specifying the appropriate constructor parameter.

def myGroup = new DefaultPGroup(10)  //the pool will contain 10 threads

The thread pool can be manipulated through the appropriate DefaultPGroup class, which delegates to the Pool interface of the thread pool. For example, the resize() method allows you to change the pool size any time and the resetDefaultSize() sets it back to the default value. The shutdown() method can be called when you need to safely finish all tasks, destroy the pool and stop all the threads in order to exit JVM in an organized manner.

… (n+1 threads in the default pool after startup)

Actors.defaultActorPGroup.resize 1 //use one-thread pool

… (1 thread in the pool)

Actors.defaultActorPGroup.resetDefaultSize()

… (n+1 threads in the pool)

Actors.defaultActorPGroup.shutdown()

As an alternative to the DefaultPGroup_, which creates a pool of daemon threads, the _NonDaemonPGroup class can be used when non-daemon threads are required.

def daemonGroup = new DefaultPGroup()

def actor1 = daemonGroup.actor { … }

def nonDaemonGroup = new NonDaemonPGroup()

def actor2 = nonDaemonGroup.actor { … }

class MyActor { def MyActor() { this.actorGroup = nonDaemonGroup }

void act() {...} }

Actors belonging to the same group share the underlying thread pool. With pooled actor groups you can split your actors to leverage multiple thread pools of different sizes and so assign resources to different components of your system and tune their performance.

def coreActors = new NonDaemonPGroup(5)  //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1)  //1 daemon thread pool

def priceCalculator = coreActors.actor { … }

def paymentProcessor = coreActors.actor { … }

def emailNotifier = helperActors.actor { … }

def cleanupActor = helperActors.actor { … }

//increase size of the core actor group coreActors.resize 6

//shutdown the group's pool once you no longer need the group to release resources helperActors.shutdown()

Do not forget to shutdown custom pooled actor groups, once you no longer need them and their actors, to preserve system resources.

Common trap: App terminates while actors do not receive messages

Most likely you're using daemon threads and pools, which is the default setting, and your main thread finishes. Calling actor.join() on any, some or all of your actors would block the main thread until the actor terminates and thus keep all your actors running. Alternatively use instances of NonDaemonPGroup and assign some of your actors to these groups.

def nonDaemonGroup = new NonDaemonPGroup()
def myActor = nonDaemonGroup.actor {...}

alternatively

def nonDaemonGroup = new NonDaemonPGroup()

class MyActor extends AbstractPooledActor { def MyActor() { this.actorGroup = nonDaemonGroup }

void act() {...} }

def myActor = new MyActor()

5.2 Special Actors

Dynamic Dispatch Actor

The DynamicDispatchActor class is a pooled actor allowing for an alternative structure of the message handling code. In general DynamicDispatchActor repeatedly scans for messages and dispatches arrived messages to one of the onMessage(message) methods defined on the actor. The DynamicDispatchActor leverages the Groovy dynamic method dispatch mechanism under the covers.

import groovyx.gpars.actor.DynamicDispatchActor

final class MyActor extends DynamicDispatchActor {

void onMessage(String message) { println 'Received string' }

void onMessage(Integer message) { println 'Received integer' }

void onMessage(Object message) { println 'Received object' }

void onMessage(List message) { println 'Received list' stop() } }

final def actor = new MyActor().start()

actor 1 actor '' actor 1.0 actor(new ArrayList())

actor.join()

In some scenarios, typically when no implicit conversation-history-dependent state needs to be preserved for the actor, the dynamic dispatch code structure may be more intuitive than the traditional one using nested loop and react statements.

The DynamicDispatchActor class also provides a handy facility to add message handlers dynamically at actor construction time using the when handlers:

final Actor actor = new DynamicDispatchActor({
    when {String msg -> println 'A String'; reply 'Thanks'}
    when {Double msg -> println 'A Double'; reply 'Thanks'}
    when {msg -> println 'A something ...'; reply 'What was that?'}
})
actor.start()

Obviously the two approaches can be combined:

final class MyActor extends DynamicDispatchActor {

def MyActor(final closure) { super(closure); }

void onMessage(String message) { println 'Received string' }

void onMessage(Integer message) { println 'Received integer' }

void onMessage(Object message) { println 'Received object' }

void onMessage(List message) { println 'Received list' stop() } }

final def actor = new MyActor({ when {BigDecimal num -> println 'Received BigDecimal'} if (needHandleFloats) when {Float num -> println 'Got a float'} }).start()

Reactive Actor

The ReactiveActor class, constructed typically by calling Actors.reactor() or _DefaultPGroup.reactor()_, allow for more event-driven like approach. When a reactive actor receives a message, the supplied block of code, which makes up the reactive actor's body, is run with the message as a parameter. The result returned from the code is sent in reply.

import groovyx.gpars.group.DefaultPGroup

final def group = new DefaultPGroup()

final def doubler = group.reactor { 2 * it }.start()

group.actor { println 'Double of 10 = ' + doubler.sendAndWait(10) }.start()

group.actor { println 'Double of 20 = ' + doubler.sendAndWait(20) }.start()

group.actor { println 'Double of 30 = ' + doubler.sendAndWait(30) }.start()

for(i in (1..10)) { println "Double of $i = ${doubler.sendAndWait(i)}" }

doubler.stop() doubler.join()

Here's an example of an actor, which submits a batch of numbers to a ReactiveActor for processing and then prints the results gradually as they arrive.

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actors

final def doubler = Actors.reactor { 2 * it }.start()

Actor actor = Actors.actor { (1..10).each {doubler << it} int i = 0 loop { i += 1 if (i > 10) stop() else { react {message -> println "Double of $i = $message" } } } }.start()

actor.join() doubler.stop() doubler.join()

Essentially reactive actors provide a convenience shortcut for an actor that would wait for messages in a loop, process them and send back the result. This is schematically how the reactive actor looks inside:

public class ReactiveActor extends AbstractPooledActor {
    Closure body

void act() { loop { react {message -> reply body(message) } } } }

5.3 Tips and Tricks

Structuring actor's code

When extending the AbstractPooledActor class, you can call any actor's methods from within the act() method and use the react() or loop() methods in them.
class MyActor extends AbstractPooledActor {

protected void act() { handleA() }

private void handleA() { react {a -> handleB(a) } }

private void handleB(int a) { react {b -> println a + b reply a + b } } }

Bear in mind that the methods handleA() and handleB() in all our examples never return, since they call react()_, which itself never returns.

Alternatively, when using the _actor() factory method, you can add event-handling code through the meta class as closures.

Actor actor2 = actor {
    delegate.metaClass {
        handleA = {->
            react {a ->
                 handleB(a)
            }
        }

handleB = {a -> react {b -> println a + b reply a + b } } }

handleA() }

Closures, which have the actor set as their delegate can also be used to structure event-handling code.

Closure handleB = {a ->
    react {b ->
        println a + b
        reply a + b
    }
}

Closure handleA = {-> react {a -> handleB(a) } }

Actor actor3 = actor { handleA.delegate = delegate handleB.delegate = delegate

handleA() }

Event-driven loops

When coding event-driven actors you have to have in mind that calls to react() and loop() methods never return. This becomes a bit of a challenge once you try to implement any types of loops in your actors. On the other hand, if you leverage the fact that react() never returns, you may call methods recursively without fear to fill up the stack. Look at the examples below, which respectively use the three described techniques for structuring actor's code.

A subclass of AbstractPooledActor

class MyLoopActor extends AbstractPooledActor {

protected void act() { outerLoop() }

private void outerLoop() { react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done' } }

private void innerLoop() { react {b -> println 'Inner ' + b if (b == 0) outerLoop() else innerLoop() } } }

Enhancing the actor's metaClass

Actor actor = actor {
    outerLoop()
}

actor.metaClass { outerLoop = {-> react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done' } }

innerLoop = {-> react {b -> println 'Inner ' + b if (b==0) outerLoop() else innerLoop() } } }

Using Groovy closures

Closure innerLoop

Closure outerLoop = {-> react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done' } }

innerLoop = {-> react {b -> println 'Inner ' + b if (b==0) outerLoop() else innerLoop() } }

Actor actor = actor { outerLoop() } outerLoop.delegate = actor innerLoop.delegate = actor

Plus don't forget about the possibility to use the actor's loop() method to create a loop that never stops before the actor terminates.

class MyLoopActor extends AbstractPooledActor {

protected void act() { loop { outerLoop() } }

private void outerLoop() { react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done for now, but will loop again' } }

private void innerLoop() { react {b -> println 'Inner ' + b if (b == 0) outerLoop() else innerLoop() } } }

5.4 Classic Examples using Actors

A few examples on Actors use

Examples

Sleeping Barber

Problem description

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.AbstractPooledActor
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.Actor

final def group = new DefaultPGroup()

final def barber = group.actor { final def random = new Random() loop { react {message -> switch (message) { case Enter: message.customer.send new Start() println "Barber: Processing customer ${message.customer.name}" doTheWork(random) message.customer.send new Done() message.reply new Next() break case Wait: println "Barber: No customers. Going to have a sleep" break } } } }.start()

private def doTheWork(Random random) { Thread.sleep(random.nextInt(10) * 1000) }

final Actor waitingRoom

waitingRoom = group.actor { final int capacity = 5 final List<Customer> waitingCustomers = [] boolean barberAsleep = true

loop { react {message -> switch (message) { case Enter: if (waitingCustomers.size() == capacity) { reply new Full() } else { waitingCustomers << message.customer if (barberAsleep) { assert waitingCustomers.size() == 1 barberAsleep = false waitingRoom.send new Next() } else reply new Wait() } break case Next: if (waitingCustomers.size()>0) { def customer = waitingCustomers.remove(0) barber.send new Enter(customer:customer) } else { barber.send new Wait() barberAsleep = true } } } }

}.start()

class Customer extends AbstractPooledActor { String name Actor localBarbers

void act() { localBarbers << new Enter(customer:this) loop { react {message -> switch (message) { case Full: println "Customer: $name: The waiting room is full. I am leaving." stop() break case Wait: println "Customer: $name: I will wait." break case Start: println "Customer: $name: I am now being served." break case Done: println "Customer: $name: I have been served." break

} } } } }

class Enter { Customer customer } class Full {} class Wait {} class Next {} class Start {} class Done {}

new Customer(name:'Joe', localBarbers:waitingRoom).start() new Customer(name:'Dave', localBarbers:waitingRoom).start() new Customer(name:'Alice', localBarbers:waitingRoom).start()

System.in.read()

Dining Philosophers

Problem description

import groovyx.gpars.actor.AbstractPooledActor
import groovyx.gpars.actor.Actors

Actors.defaultActorPGroup.resize 5

final class Philosopher extends AbstractPooledActor { private Random random = new Random()

String name def forks = []

void act() { assert 2 == forks.size() loop { think() forks*.send new Take() react {a -> react {b -> if ([a, b].any {Rejected.isCase it}) { println "$name: tOops, can't get my forks! Giving up." [a, b].find {Accepted.isCase it}?.reply new Finished() } else { eat() reply new Finished() } } } } }

void think() { println "$name: tI'm thinking" Thread.sleep random.nextInt(5000) println "$name: tI'm done thinking" }

void eat() { println "$name: tI'm EATING" Thread.sleep random.nextInt(2000) println "$name: tI'm done EATING" } }

final class Fork extends AbstractPooledActor {

String name boolean available = true

void act() { loop { react {message -> switch (message) { case Take: if (available) { available = false reply new Accepted() } else reply new Rejected() break case Finished: assert !available available = true break default: throw new IllegalStateException("Cannot process the message: $message") } } } } }

final class Take {} final class Accepted {} final class Rejected {} final class Finished {}

def forks = [ new Fork(name:'Fork 1'), new Fork(name:'Fork 2'), new Fork(name:'Fork 3'), new Fork(name:'Fork 4'), new Fork(name:'Fork 5') ]

def philosophers = [ new Philosopher(name:'Joe', forks:[forks[0], forks[1]]), new Philosopher(name:'Dave', forks:[forks[1], forks[2]]), new Philosopher(name:'Alice', forks:[forks[2], forks[3]]), new Philosopher(name:'James', forks:[forks[3], forks[4]]), new Philosopher(name:'Phil', forks:[forks[4], forks[0]]), ]

forks*.start() philosophers*.start()

System.in.read()

Word sort

Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of _WordSortActors_, splits among them the files to sort words in and collects the results.

Inspired by [Scala Concurrency blog post by Michael Galpin

//Messages
private final class FileToSort { String fileName }
private final class SortResult { String fileName; List<String> words }

//Worker actor final class WordSortActor extends AbstractPooledActor {

private List<String> sortedWords(String fileName) { parseFile(fileName).sort {it.toLowerCase()} }

private List<String> parseFile(String fileName) { List<String> words = [] new File(fileName).splitEachLine(' ') {words.addAll(it)} return words }

void act() { loop { react {message -> switch (message) { case FileToSort: println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}" reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName)) } } } } }

//Master actor final class SortMaster extends AbstractPooledActor {

String docRoot = '/' int numActors = 1

List<List<String>> sorted = [] private CountDownLatch startupLatch = new CountDownLatch(1) private CountDownLatch doneLatch

private void beginSorting() { int cnt = sendTasksToWorkers() doneLatch = new CountDownLatch(cnt) }

private List createWorkers() { return (1..numActors).collect {new WordSortActor().start()} }

private int sendTasksToWorkers() { List<PooledActor> workers = createWorkers() int cnt = 0 new File(docRoot).eachFile { workers[cnt % numActors] << new FileToSort(fileName: it) cnt += 1 } return cnt }

public void waitUntilDone() { startupLatch.await() doneLatch.await() }

void act() { beginSorting() startupLatch.countDown() loop { react { switch (it) { case SortResult: sorted << it.words doneLatch.countDown() println "Received results for file=${it.fileName}" } } } } }

//start the actors to sort words def master = new SortMaster(docRoot: 'C:/dev/TeamCity/logs/', numActors: 5).start() master.waitUntilDone() println 'Done' println master.sorted

Load Balancer

Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.

If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.AbstractPooledActor

/** * Demonstrates work balancing among adaptable set of workers. * The load balancer receives tasks and queues them in a temporary task queue. * When a worker finishes his assignment, it asks the load balancer for a new task. * If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. * If the number of tasks in the task queue exceeds certain limit, a new worker is created * to increase size of the worker pool. */

final class LoadBalancer extends AbstractPooledActor { int workers = 0 List taskQueue = [] private static final QUEUE_SIZE_TRIGGER = 10

void act() { loop { def message = receive() switch (message) { case NeedMoreWork: if (taskQueue.size() == 0) { println 'No more tasks in the task queue. Terminating the worker.' message.reply DemoWorker.EXIT workers -= 1 } else message.reply taskQueue.remove(0) break case WorkToDo: taskQueue << message if ((workers == 0) || (taskQueue.size() >= QUEUE_SIZE_TRIGGER)) { println 'Need more workers. Starting one.' workers += 1 new DemoWorker(this).start() } } println "Active workers=${workers}tTasks in queue=${taskQueue.size()}" } } }

final class DemoWorker extends AbstractPooledActor { final static Object EXIT = new Object() private static final Random random = new Random()

Actor balancer

def DemoWorker(balancer) { this.balancer = balancer }

void act() { loop { this.balancer << new NeedMoreWork() react { switch (it) { case WorkToDo: processMessage(it) break case EXIT: stop() } } }

}

private void processMessage(message) { synchronized (random) { Thread.sleep random.nextInt(5000) } } } final class WorkToDo {} final class NeedMoreWork {}

final Actor balancer = new LoadBalancer().start()

//produce tasks for (i in 1..20) { Thread.sleep 100 balancer << new WorkToDo() }

//produce tasks in a parallel thread Thread.start { for (i in 1..10) { Thread.sleep 1000 balancer << new WorkToDo() } }

Thread.sleep 35000 //let the queues get empty balancer << new WorkToDo() balancer << new WorkToDo() Thread.sleep 10000

6. Agent

The Agent class, which is a thread-safe non-blocking shared mutable state wrapper implementation inspired by Agents in Clojure.

A lot of the concurrency problems disappear when you eliminate the need for Shared Mutable State with your architecture. Indeed, concepts like actors, CSP or dataflow concurrency avoid or isolate mutable state completely. In some cases, however, sharing mutable data is either inevitable or makes the design more natural and understandable. Think, for example, of a shopping cart in a typical e-commerce application, when multiple AJAX requests may hit the cart with read or write requests concurrently.

Introduction

In the Clojure programing language you can find a concept of Agents, the purpose of which is to protect mutable data that need to be shared across threads. Agents hide the data and protect it from direct access. Clients can only send commands (functions) to the agent. The commands will be serialized and processed against the data one-by-one in turn. With the commands being executed serially the commands do not need to care about concurrency and can assume the data is all theirs when run. Although implemented differently, GPars Agents, called Agent , fundamentally behave like actors. They accept messages and process them asynchronously. The messages, however, must be commands (functions or Groovy closures) and will be executed inside the agent. After reception the received function is run against the internal state of the Agent and the return value of the function is considered to be the new internal state of the Agent.

Essentially, agents safe-guard mutable values by allowing only a single agent-managed thread to make modifications to them. The mutable values are not directly accessible from outside, but instead requests have to be sent to the agent and the agent guarantees to process the requests sequentially on behalf of the callers. Agents guarantee sequential execution of all requests and so consistency of the values.

Schematically:

agent = new Agent(0)  //created a new Agent wrapping an integer with initial value 0
agent.send {increment()}  //asynchronous send operation, sending the increment() function
…
//after some delay to process the message the internal Agent's state has been updated
…
assert agent.val== 1
To wrap integers, we can certainly use AtomicXXX types on the Java platform, but when the state is a more complex object we need more support.

Concepts

GPars provides an Agent class, which is a special-purpose thread-safe non-blocking implementation inspired by Agents in Clojure.

An Agent wraps a reference to mutable state, held inside a single field, and accepts code (closures / commands) as messages, which can be sent to the Agent just like to any other actor using the '<<' operator, the send() methods or the implicit call() method. At some point after reception of a closure / command, the closure is invoked against the internal mutable field and can make changes to it. The closure is guaranteed to be run without intervention from other threads and so may freely alter the internal state of the Agent held in the internal <i>data</i> field.

The whole update process is of the fire-and-forget type, since once the message (closure) is sent to the Agent, the caller thread can go off to do other things and come back later to check the current value with Agent.val or Agent.valAsync(closure).

Basic rules

Examples

Shared list of members

The Agent wraps a list of members, who have been added to the jug. To add a new member a message (command to add a member) has to be sent to the jugMembers Agent.

import groovyx.gpars.agent.Agent
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

/** * Create a new Agent wrapping a list of strings */ def jugMembers = new Agent<List<String>>(['Me']) //add Me

jugMembers.send {it.add 'James'} //add James

final Thread t1 = Thread.start { jugMembers.send {it.add 'Joe'} //add Joe }

final Thread t2 = Thread.start { jugMembers << {it.add 'Dave'} //add Dave jugMembers {it.add 'Alice'} //add Alice (using the implicit call() method) }

[t1, t2]*.join() println jugMembers.val jugMembers.valAsync {println "Current members: $it"}

jugMembers.await()

Shared conference counting number of registrations

The Conference class allows registration and un-registration, however these methods can only be called from the commands sent to the conference Agent.

import groovyx.gpars.agent.Agent

/** * Conference stores number of registrations and allows parties to register and unregister. * It inherits from the Agent class and adds the register() and unregister() private methods, * which callers may use it the commands they submit to the Conference. */ class Conference extends Agent<Long> { def Conference() { super(0) } private def register(long num) { data += num } private def unregister(long num) { data -= num } }

final Agent conference = new Conference() //new Conference created

/** * Three external parties will try to register/unregister concurrently */

final Thread t1 = Thread.start { conference << {register(10L)} //send a command to register 10 attendees }

final Thread t2 = Thread.start { conference << {register(5L)} //send a command to register 5 attendees }

final Thread t3 = Thread.start { conference << {unregister(3L)} //send a command to unregister 3 attendees }

[t1, t2, t3]*.join()

assert 12L == conference.val

Factory methods

Agent instances can also be created using the Agent.agent() factory method.

def jugMembers = Agent.agent ['Me']  //add Me

Grouping

By default all Agent instances belong to the same group sharing its daemon thread pool.

Custom groups can also create instances of Agent. These instances will belong to the group, which created them, and will share a thread pool. To create an Agent instance belonging to a group, call the agent() factory method on the group. This way you can organize and tune performance of agents.

final def group = new NonDaemonPGroup(5)  //create a group around a thread pool
def jugMembers = group.agent(['Me'])  //add Me

The default thread pool for agents contains daemon threads. Make sure that your custom thread pools either use daemon threads, too, which can be achieved either by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.

Direct pool replacement

Alternatively, by calling the attachToThreadPool() method on an Agent instance a custom thread pool can be specified for it.

def jugMembers = new Agent<List<String>>(['Me'])  //add Me

final ExecutorService pool = Executors.newFixedThreadPool(10) jugMembers.attachToThreadPool(new DefaultPool(pool))

Remember, like actors, a single Agent instance (aka agent) can never use more than one thread at a time

The shopping cart example

import groovyx.gpars.agent.Agent

class ShoppingCart { private def cartState = new Agent([:]) //----------------- public methods below here ---------------------------------- public void addItem(String product, int quantity) { cartState << {it[product] = quantity} //the << operator sends //a message to the Agent } public void removeItem(String product) { cartState << {it.remove(product)} } public Object listContent() { return cartState.val } public void clearItems() { cartState << performClear }

public void increaseQuantity(String product, int quantityChange) { cartState << this.&changeQuantity.curry(product, quantityChange) } //----------------- private methods below here --------------------------------- private void changeQuantity(String product, int quantityChange, Map items) { items[product] = (items[product] ?: 0) + quantityChange } private Closure performClear = { it.clear() } } //----------------- script code below here ------------------------------------- final ShoppingCart cart = new ShoppingCart() cart.addItem 'Pilsner', 10 cart.addItem 'Budweisser', 5 cart.addItem 'Staropramen', 20

cart.removeItem 'Budweisser' cart.addItem 'Budweisser', 15

println "Contents ${cart.listContent()}"

cart.increaseQuantity 'Budweisser', 3 println "Contents ${cart.listContent()}"

cart.clearItems() println "Contents ${cart.listContent()}"

You might have noticed two implementation strategies in the code.
  1. Public methods may internally just send the required code off to the Agent, instead of executing the same functionality directly

And so sequential code like

public void addItem(String product, int quantity) {
    cartState[product]=quantity

}

becomes

public void addItem(String product, int quantity) {
    cartState << {it[product] = quantity}
}
2. Public methods may send references to internal private methods or closures, which hold the desired functionality to perform
public void clearItems() {
    cartState << performClear
}

private Closure performClear = { it.clear() }

Currying might be necessary, if the closure takes other arguments besides the current internal state instance. See the increaseQuantity method.

The printer service example

Another example - a not thread-safe printer service shared by multiple threads. The printer needs to have the document and quality properties set before printing, so obviously a potential for race conditions if not guarded properly. Callers don't want to block until the printer is available, which the fire-and-forget nature of actors solves very elegantly.

import groovyx.gpars.agent.Agent

/** * A non-thread-safe service that slowly prints documents on at a time */ class PrinterService { String document String quality

public void printDocument() { println "Printing $document in $quality quality" Thread.sleep 5000 println "Done printing $document" } }

def printer = new Agent<PrinterService>(new PrinterService())

final Thread thread1 = Thread.start { for (num in (1..3)) { final String text = "document $num" printer << {printerService -> printerService.document = text printerService.quality = 'High' printerService.printDocument() } Thread.sleep 200 } println 'Thread 1 is ready to do something else. All print tasks have been submitted' }

final Thread thread2 = Thread.start { for (num in (1..4)) { final String text = "picture $num" printer << {printerService -> printerService.document = text printerService.quality = 'Medium' printerService.printDocument() } Thread.sleep 500 } println 'Thread 2 is ready to do something else. All print tasks have been submitted' }

[thread1, thread2]*.join() printer.await()

For latest update, see the respective Demos.

Reading the value

To follow the clojure philosophy closely the Agent class gives reads higher priority than to writes. By using the instantVal property your read request will bypass the incoming message queue of the Agent and return the current snapshot of the internal state. The val property will wait in the message queue for processing, just like the non-blocking variant valAsync(Clojure cl) , which will invoke the provided closure with the internal state as a parameter.

You have to bear in mind that the instantVal property might return although correct, but randomly looking results, since the internal state of the Agent at the time of instantVal execution is non-deterministic and depends on the messages that have been processed before the thread scheduler executes the body of instantVal .

The await() method allows you to wait for processing all the messages submitted to the Agent before and so blocks the calling thread.

State copy strategy

To avoid leaking the internal state the Agent class allows to specify a copy strategy as the second constructor argument. With the copy strategy specified, the internal state is processed by the copy strategy closure and the output value of the copy strategy value is returned to the caller instead of the actual internal state. This applies to _instantVal_, val as well as to valAsync() .

Error handling

Exceptions thrown from within the submitted commands are stored inside the agent and can be obtained from the errors property. The property gets cleared once read.

def jugMembers = new Agent<List>()
    assert jugMembers.errors.empty

jugMembers.send {throw new IllegalStateException('test1')} jugMembers.send {throw new IllegalArgumentException('test2')} jugMembers.await()

List errors = jugMembers.errors assertEquals(2, errors.size()) assert errors[0] instanceof IllegalStateException assertEquals 'test1', errors[0].message assert errors[1] instanceof IllegalArgumentException assertEquals 'test2', errors[1].message

assert jugMembers.errors.empty

Fair and Non-fair agents

Agents can be either fair or non-fair. Fair agents give up the thread after processing each message, non-fair agents keep a thread until their message queue is empty. As a result, non-fair agents tends to perform better than fair ones. The default setting for all Agent instances is to be non-fair, however by calling its makeFair() method the instance can be made fair.

def jugMembers = new Agent<List>(['Me'])  //add Me
    jugMembers.makeFair()

7. Dataflow Concurrency

Dataflow concurrency offers an alternative concurrency model, which is inherently safe and robust.

Introduction

Check out the small example written in Groovy using GPars, which sums results of calculations performed by three concurrently run tasks:

import static groovyx.gpars.dataflow.DataFlow.task

final def x = new DataFlowVariable() final def y = new DataFlowVariable() final def z = new DataFlowVariable()

task { z << x.val + y.val println "Result: ${z.val}" }

task { x << 10 }

task { y << 5 }

Or the same algorithm rewritten using the DataFlows class.

import static groovyx.gpars.dataflow.DataFlow.task

final def df = new DataFlows()

task { df.z = df.x + df.y println "Result: ${df.z}" }

task { df.x = 10 }

task { df.y = 5 }

We start three logical tasks, which can run in parallel and perform their particular activities. The tasks need to exchange data and they do so using Dataflow Variables. Think of Dataflow Variables as one-shot channels safely and reliably transferring data from producers to their consumers.

The Dataflow Variables have a pretty straightforward semantics. When a task needs to read a value from DataFlowVariable (through the val property), it will block until the value has been set by another task or thread (using the '<<' operator). Each DataFlowVariable can be set only once in its lifetime. Notice that you don't have to bother with ordering and synchronizing the tasks or threads and their access to shared variables. The values are magically transferred among tasks at the right time without your intervention. The data flow seamlessly among tasks / threads without your intervention or care.

Implementation detail: The three tasks in the example do not necessarily need to be mapped to three physical threads. Tasks represent so-called "green" or "logical" threads and can be mapped under the covers to any number of physical threads. The actual mapping depends on the scheduler, but the outcome of dataflow algorithms doesn't depend on the actual scheduling.

Benefits

Here's what you gain by using Dataflow Concurrency (by Jonas Bonér ):

This doesn't sound bad, does it?

Concepts

Dataflow programming

Quoting Wikipedia

Operations (in Dataflow programs) consist of "black boxes" with inputs and outputs, all of which are always explicitly defined. They run as soon as all of their inputs become valid, as opposed to when the program encounters them. Whereas a traditional program essentially consists of a series of statements saying "do this, now do this", a dataflow program is more like a series of workers on an assembly line, who will do their assigned task as soon as the materials arrive. This is why dataflow languages are inherently parallel; the operations have no hidden state to keep track of, and the operations are all "ready" at the same time.

Principles

With Dataflow Concurrency you can safely share variables across tasks. These variables (in Groovy instances of the DataFlowVariable class) can only be assigned (using the '<<' operator) a value once in their lifetime. The values of the variables, on the other hand, can be read multiple times (in Groovy through the val property), even before the value has been assigned. In such cases the reading task is suspended until the value is set by another task. So you can simply write your code for each task sequentially using Dataflow Variables and the underlying mechanics will make sure you get all the values you need in a thread-safe manner.

In brief, you generally perform three operations with Dataflow variables:

And these are the three essential rules your programs have to follow:

Dataflow Streams

Before you go to check the samples of using Dataflow Variables, Tasks and Operators, you should know a bit about streams to have a full picture of Dataflow Concurrency. Except for dataflow variables there's also a concept of DataFlowStreams that you can leverage. You may think of them as thread-safe buffers or queues. Check out a typical producer-consumer demo:

import static groovyx.gpars.dataflow.DataFlow.task

def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow'] final def buffer = new DataFlowStream()

task { for (word in words) { buffer << word.toUpperCase() //add to the buffer } }

task { while(true) println buffer.val //read from the buffer in a loop }

Bind handlers

def a = new DataFlowVariable()
a >> {println "The variable has just been bound to $it"}
a.whenBound {println "Just to confirm that the variable has been really set to $it"}
...

A bound handlers can be registered on Dataflow Variables either using the >> operator or the whenBound() method. They will be run once a value is bound to the variable.

Further reading

Scala Dataflow library by Jonas Bonér

JVM concurrency presentation slides by Jonas Bonér

Dataflow Concurrency library for Ruby

7.1 Dataflow tasks

The Dataflow tasks give you an easy-to-grasp abstraction of mutually-independent logical tasks or threads, which can run concurrently and exchange data solely through Dataflow Variables and Streams.

Check out the examples.

A simple mashup example

In the example we're downloading the front pages of three popular web sites, each in their own task, while in a separate task we're filtering out sites talking about Groovy today and forming the output. The output task synchronizes automatically with the three download tasks on the three Dataflow variables through which the content of each website is passed to the output task.

import static groovyx.gpars.GParsPool.*
import groovyx.gpars.dataflow.DataFlowVariable
import static groovyx.gpars.dataflow.DataFlow.task

/** * A simple mashup sample, downloads content of three websites * and checks how many of them refer to Groovy. */

def dzone = new DataFlowVariable() def jroller = new DataFlowVariable() def theserverside = new DataFlowVariable()

task { println 'Started downloading from DZone' dzone << 'http://www.dzone.com'.toURL().text println 'Done downloading from DZone' }

task { println 'Started downloading from JRoller' jroller << 'http://www.jroller.com'.toURL().text println 'Done downloading from JRoller' }

task { println 'Started downloading from TheServerSide' theserverside << 'http://www.theserverside.com'.toURL().text println 'Done downloading from TheServerSide' }

task { withPool { println "Number of Groovy sites today: " + ([dzone, jroller, theserverside].findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } System.exit 0 }

Grouping tasks

Dataflow tasks can be organized into groups to allow for performance fine-tuning. Groups provide a handy task() factory method to create tasks attached to the groups.

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { task { … }

task { … } }

The default thread pool for dataflow tasks contains non-daemon threads, which means your application will not exit before all tasks complete. When grouping tasks, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.

A mashup variant with methods

To avoid giving you wrong impression about structuring the Dataflow code, here's a rewrite of the mashup example, with a downloadPage() method performing the actual download in a separate task and returning a DataFlowVariable instance, so that the main application thread could eventually get hold of the downloaded content. Dataflow variables can obviously be passed around as parameters or return values.

package groovyx.gpars.samples.dataflow

import static groovyx.gpars.GParsExecutorsPool.* import groovyx.gpars.dataflow.DataFlowVariable import static groovyx.gpars.dataflow.DataFlow.task

/** * A simple mashup sample, downloads content of three websites and checks how many of them refer to Groovy. */ final List urls = ['http://www.dzone.com', 'http://www.jroller.com', 'http://www.theserverside.com']

task { def pages = urls.collect { downloadPage(it) } withPool { println "Number of Groovy sites today: " + (pages.findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } System.exit 0 }

def downloadPage(def url) { def page = new DataFlowVariable() task { println "Started downloading from $url" page << url.toURL().text println "Done downloading from $url" } return page }

A physical calculation example

Dataflow programs naturally scale with the number of processors. Up to a certain level, the more processors you have the faster the program runs. Check out, for example, the following script, which calculates parameters of a simple physical experiment and prints out the results. Each task performs its part of the calculation and may depend on values calculated by some other tasks as well as its result might be needed by some of the other tasks. With Dataflow Concurrency you can split the work between tasks or reorder the tasks themselves as you like and the dataflow mechanics will ensure the calculation will be accomplished correctly.

import groovyx.gpars.dataflow.DataFlowActor
import groovyx.gpars.dataflow.DataFlowVariable
import static groovyx.gpars.dataflow.DataFlow.task

final def mass = new DataFlowVariable() final def radius = new DataFlowVariable() final def volume = new DataFlowVariable() final def density = new DataFlowVariable() final def acceleration = new DataFlowVariable() final def time = new DataFlowVariable() final def velocity = new DataFlowVariable() final def decelerationForce = new DataFlowVariable() final def deceleration = new DataFlowVariable() final def distance = new DataFlowVariable()

task { println """ Calculating distance required to stop a moving ball. ==================================================== The ball has a radius of ${radius.val} meters and is made of a material with ${density.val} kg/m3 density, which means that the ball has a volume of ${volume.val} m3 and a mass of ${mass.val} kg. The ball has been accelerating with ${acceleration.val} m/s2 from 0 for ${time.val} seconds and so reached a velocity of ${velocity.val} m/s.

Given our ability to push the ball backwards with a force of ${decelerationForce.val} N (Newton), we can cause a deceleration of ${deceleration.val} m/s2 and so stop the ball at a distance of ${distance.val} m.

======================================================================================================================= This example has been calculated asynchronously in multiple tasks using GPars DataFlow concurrency in Groovy. Author: ${author.val} """

System.exit 0 }

task { mass << volume.val * density.val }

task { volume << Math.PI * (radius.val ** 3) }

task { radius << 2.5 density << 998.2071 //water acceleration << 9.80665 //free fall decelerationForce << 900 }

task { println 'Enter your name:' def name = new InputStreamReader(System.in).readLine() author << (name?.trim()?.size()>0 ? name : 'anonymous') }

task { time << 10 velocity << acceleration.val * time.val }

task { deceleration << decelerationForce.val / mass.val }

task { distance << deceleration.val * ((velocity.val/deceleration.val) ** 2) * 0.5 }

Note: I did my best to make all the physical calculations right. Feel free to change the values and see how long distance you need to stop the rolling ball.

Deterministic deadlocks

If you happen to introduce a deadlock in your dependencies, the deadlock will occur each time you run the code. No randomness allowed. That's one of the benefits of Dataflow concurrency. Irrespective of the actual thread scheduling scheme, if you don't get a deadlock in tests, you won't get them in production.

task {
    println a.val
    b << 'Hi there'
}

task { println b.val a << 'Hello man' }

DataFlows map

As a handy shortcut the DataFlows class can help you reduce the amount of code you have to write to leverage Dataflow variables.

def df = new DataFlows()
df.x = 'value1'
assert df.x == 'value1'

DataFlow.task {df.y = 'value2}

assert df.y == 'value2'

Think of DataFlows as a map with Dataflow Variables as keys storing their bound values as appropriate map values. The semantics of reading a value (e.g. df.x) and binding a value (e.g. df.x = 'value') remain identical to the semantics of plain Dataflow Variables (x.val and x << 'value' respectively).

7.2 Dataflow operators

Dataflow Operators provide a full Dataflow implementation with all the usual ceremony.

Concepts

operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
    …
    bindOutput 0, x + y + z
}

/**
 * CACHE
 *
 * Caches sites' contents. Accepts requests for url content, outputs the content. Outputs requests for download
 * if the site is not in cache yet.
 */
operator(inputs: [urlRequests], outputs: [downloadRequests, sites]) {request ->

if (!request.content) { println "[Cache] Retrieving ${request.site}" def content = cache[request.site] if (content) { println "[Cache] Found in cache" bindOutput 1, [site: request.site, word:request.word, content: content] } else { def downloads = pendingDownloads[request.site] if (downloads != null) { println "[Cache] Awaiting download" downloads << request } else { pendingDownloads[request.site] = [] println "[Cache] Asking for download" bindOutput 0, request } } } else { println "[Cache] Caching ${request.site}" cache[request.site] = request.content bindOutput 1, request def downloads = pendingDownloads[request.site] if (downloads != null) { for (downloadRequest in downloads) { println "[Cache] Waking up" bindOutput 1, [site: downloadRequest.site, word:downloadRequest.word, content: request.content] } pendingDownloads.remove(request.site) } } }

The standard error handling will print out an error message to standard error output and stop the operator in case an uncaught exception is thrown from withing the operator's body. To alter the behavior, you can redefine the reportError() method on the operator:

op.metaClass.reportError = {Throwable e ->
        //handle the exception
        stop()  //You can also stop the operator
    }

Parallelize operators

By default an operator's body is processed by a single thread at a time. While this is a safe setting allowing the operator's body to be written in a non-thread-safe manner, once an operator becomes "hot" and data start to accumulate in the operator's input queues, you might consider allowing multiple threads to run the operator's body concurrently. Bear in mind that in such a case you need to avoid or protect shared resources from multi-threaded access. To enable multiple threads to run the operator's body concurrently, pass an extra maxForks parameter when creating an operator:

def op = operator(inputs: [a, b, c], outputs: [d, e], maxForks: 2) {x, y, z ->
        bindOutput 0, x + y + z
        bindOutput 1, x * y * z
    }

The value of the maxForks parameter indicates the maximum of threads running the operator concurrently. Only positive numbers are allowed with value 1 being the default.

Grouping operators

Dataflow operators can be organized into groups to allow for performance fine-tuning. Groups provide a handy operator() factory method to create tasks attached to the groups.

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { operator(inputs: [a, b, c], outputs: [d]) {x, y, z -> … bindOutput 0, x + y + z } }

The default thread pool for dataflow operators contains non-daemon threads, which means your application will not exit before all operators are stopped. When grouping operators, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.

7.3 Dataflow implementation

The Dataflow Concurrency in GPars builds on top of its actor support. All of the dataflow tasks share a thread pool and so the number threads created through DataFlow.task() factory method don't need to correspond to the number of physical threads required from the system. The PGroup.task() factory method can be used to attach the created task to a group. Since each group defines its own thread pool, you can easily organize tasks around different thread pools just like you do with actors.

Combining actors and Dataflow Concurrency

The good news is that you can combine actors and Dataflow Concurrency in any way you feel fit for your particular problem at hands. You can freely you use Dataflow Variables from actors.

final DataFlowVariable a = new DataFlowVariable()

final AbstractPooledActor doubler = Actors.actor { react {message-> a << 2 * message } }

final AbstractPooledActor fakingDoubler = actor { react { doubler.send it //send a number to the doubler println "Result ${a.val}" //wait for the result to be bound to 'a' } }

fakingDoubler << 10

In the example you see the "fakingDoubler" using both messages and a DataFlowVariable to communicate with the doubler actor.

Using plain java threads

The DataFlowVariable as well as the DataFlowStream classes can obviously be used from any thread of your application, not only from the tasks created by DataFlow.task() . Consider the following example:

import groovyx.gpars.dataflow.DataFlowVariable

final DataFlowVariable a = new DataFlowVariable<String>() final DataFlowVariable b = new DataFlowVariable<String>()

Thread.start { println "Received: $a.val" Thread.sleep 2000 b << 'Thank you' }

Thread.start { Thread.sleep 2000 a << 'An important message from the second thread' println "Reply: $b.val" }

We're creating two plain java.lang.Thread instances, which exchange data using the two data flow variables. Obviously, neither the actor lifecycle methods, nor the send/react functionality or thread pooling take effect in such scenarios.

8. Tips

General GPars Tips

Grouping

High-level concurrency concepts, like Agents, Actors or Dataflow tasks and operators can be grouped around shared thread pools. The PGroup class and its sub-classes represent convenient GPars wrappers around thread pools. Objects created using the group's factory methods will share the group's thread pool.

def group1 = new DefaultPGroup()
def group2 = new NonDaemonPGroup()

group1.with { task {...} task {...} def op = operator(...) {...} def actor = actor{...} def anotherActor = group2.actor{...} //will belong to group2 def agent = safe(0) }

When customizing the thread pools for groups, consider using the existing GPars implementations - the DefaultPool or ResizeablePool classes. Or you may create your own implementation of the groovyx.gpars.scheduler.Pool interface to pass to the DefaultPGroup or NonDaemonPGroup constructors.