7. Dataflow Concurrency

Dataflow concurrency offers an alternative concurrency model, which is inherently safe and robust.

Introduction

Check out the small example written in Groovy using GPars, which sums results of calculations performed by three concurrently run tasks:

import static groovyx.gpars.dataflow.Dataflow.task

final def x = new DataflowVariable() final def y = new DataflowVariable() final def z = new DataflowVariable()

task { z << x.val + y.val }

task { x << 10 }

task { y << 5 }

println "Result: ${z.val}"

Or the same algorithm rewritten using the Dataflows class.

import static groovyx.gpars.dataflow.Dataflow.task

final def df = new Dataflows()

task { df.z = df.x + df.y }

task { df.x = 10 }

task { df.y = 5 }

println "Result: ${df.z}"

We start three logical tasks, which can run in parallel and perform their particular activities. The tasks need to exchange data and they do so using Dataflow Variables. Think of Dataflow Variables as one-shot channels safely and reliably transferring data from producers to their consumers.

The Dataflow Variables have a pretty straightforward semantics. When a task needs to read a value from DataflowVariable (through the val property), it will block until the value has been set by another task or thread (using the '<<' operator). Each DataflowVariable can be set only once in its lifetime. Notice that you don't have to bother with ordering and synchronizing the tasks or threads and their access to shared variables. The values are magically transferred among tasks at the right time without your intervention. The data flow seamlessly among tasks / threads without your intervention or care.

Implementation detail: The three tasks in the example do not necessarily need to be mapped to three physical threads. Tasks represent so-called "green" or "logical" threads and can be mapped under the covers to any number of physical threads. The actual mapping depends on the scheduler, but the outcome of dataflow algorithms doesn't depend on the actual scheduling.

The bind operation of dataflow variables silently accepts re-binding to a value, which is equal to an already bound value. Call bindUnique to reject equal values on already-bound variables.

Benefits

Here's what you gain by using Dataflow Concurrency (by Jonas Bonér ):

This doesn't sound bad, does it?

Concepts

Dataflow programming

Quoting Wikipedia

Operations (in Dataflow programs) consist of "black boxes" with inputs and outputs, all of which are always explicitly defined. They run as soon as all of their inputs become valid, as opposed to when the program encounters them. Whereas a traditional program essentially consists of a series of statements saying "do this, now do this", a dataflow program is more like a series of workers on an assembly line, who will do their assigned task as soon as the materials arrive. This is why dataflow languages are inherently parallel; the operations have no hidden state to keep track of, and the operations are all "ready" at the same time.

Principles

With Dataflow Concurrency you can safely share variables across tasks. These variables (in Groovy instances of the DataflowVariable class) can only be assigned (using the '<<' operator) a value once in their lifetime. The values of the variables, on the other hand, can be read multiple times (in Groovy through the val property), even before the value has been assigned. In such cases the reading task is suspended until the value is set by another task. So you can simply write your code for each task sequentially using Dataflow Variables and the underlying mechanics will make sure you get all the values you need in a thread-safe manner.

In brief, you generally perform three operations with Dataflow variables:

And these are the three essential rules your programs have to follow:

Dataflow Queues and Broadcasts

Before you go to check the samples of using Dataflow Variables, Tasks and Operators, you should know a bit about streams and queues to have a full picture of Dataflow Concurrency. Except for dataflow variables there are also the concepts of DataflowQueues and DataflowBroadcast that you can leverage in your code. You may think of them as thread-safe buffers or queues for message transfer among concurrent tasks or threads. Check out a typical producer-consumer demo:

import static groovyx.gpars.dataflow.Dataflow.task

def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow'] final def buffer = new DataflowQueue()

task { for (word in words) { buffer << word.toUpperCase() //add to the buffer } }

task { while(true) println buffer.val //read from the buffer in a loop }

Both DataflowBroadcasts and DataflowQueues , just like DataflowVariables , implement the DataflowChannel interface with common methods allowing users to write to them and read values from them. The ability to treat both types identically through the DataflowChannel interface comes in handy once you start using them to wire tasks , operators or selectors together.

The DataflowChannel interface combines two interfaces, each serving its purpose:

You may prefer using these dedicated interfaces instead of the general DataflowChannel interface, to better express the intended usage.

Point-to-point communication

The DataflowQueue class can be viewed as a point-to-point (1 to 1, many to 1) communication channel. It allows one or more producers send messages to one reader. If multiple readers read from the same DataflowQueue , they will each consume different messages. Or to put it a different way, each message is consumed by exactly one reader. You can easily imagine a simple load-balancing scheme built around a shared DataflowQueue with readers being added dynamically when the consumer part of your algorithm needs to scale up. This is also a useful default choice when connecting tasks or operators.

Publish-subscribe communication

The DataflowBroadcast class offers a publish-subscribe (1 to many, many to many) communication model. One or more producers write messages, while all registered readers will receive all the messages. Each message is thus consumed by all readers with a valid subscription at the moment when the message is being written to the channel. The readers subscribe by calling the createReadChannel() method.

DataflowWriteChannel broadcastStream = new DataflowBroadcast()
DataflowReadChannel stream1 = broadcastStream.createReadChannel()
DataflowReadChannel stream2 = broadcastStream.createReadChannel()
broadcastStream << 'Message1'
broadcastStream << 'Message2'
broadcastStream << 'Message3'
assert stream1.val == stream2.val
assert stream1.val == stream2.val
assert stream1.val == stream2.val

Under the hood DataflowBroadcast uses the DataflowStream class to implement the message delivery.

DataflowStream

The DataflowStream class represents a deterministic dataflow channel. It is build around the concept of a functional queue and so provides a lock-free thread-safe implementation for message passing. Essentially, you may think of DataflowStream as a 1 to many communication channel, since when a reader consumes a messages, other readers will still be able to read the message. Also, all messages arrive to all readers in the same order. Since DataflowStream is implemented as a functional queue, its API requires that users traverse the values in the stream themselves. On the other hand DataflowStream offers handy methods for value filtering or transformation together with interesting performance characteristics.

The DataflowStream class, unlike the other communication elements, does not implement the DataflowChannel interface, since the semantics of its use is different. Use DataflowStreamReadAdapter and DataflowStreamWriteAdapter classes to wrap instances of the DataflowChannel class in DataflowReadChannel or DataflowWriteChannel implementations.

import groovyx.gpars.dataflow.stream.DataflowStream
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using dataflow tasks * * In principle, the algorithm consists of a concurrently run chained filters, * each of which detects whether the current number can be divided by a single prime number. * (generate nums 1, 2, 3, 4, 5, ...) -> (filter by mod 2) -> (filter by mod 3) -> (filter by mod 5) -> (filter by mod 7) -> (filter by mod 11) -> (caution! Primes falling out here) * The chain is built (grows) on the fly, whenever a new prime is found */

/** * We need a resizeable thread pool, since tasks consume threads while waiting blocked for values at DataflowQueue.val */ group = new DefaultPGroup(new ResizeablePool(true))

final int requestedPrimeNumberCount = 100

/** * Generating candidate numbers */ final DataflowStream candidates = new DataflowStream() group.task { candidates.generate(2, {it + 1}, {it < 1000}) }

/** * Chain a new filter for a particular prime number to the end of the Sieve * @param inChannel The current end channel to consume * @param prime The prime number to divide future prime candidates with * @return A new channel ending the whole chain */ def filter(DataflowStream inChannel, int prime) { inChannel.filter { number -> group.task { number % prime != 0 } } }

/** * Consume Sieve output and add additional filters for all found primes */ def currentOutput = candidates requestedPrimeNumberCount.times { int prime = currentOutput.first println "Found: $prime" currentOutput = filter(currentOutput, prime) }

For convenience and for the ability to use DataflowStream with other dataflow constructs, like e.g. operators, you can wrap it with DataflowReadAdapter for read access or DataflowWriteAdapter for write access. The DataflowStream class is designed for single-threaded producers and consumers. If multiple threads are supposed to read or write values to the stream, their access to the stream must be serialized externally or the adapters should be used.

DataflowStream Adapters

Since the DataflowStream API as well as the semantics of its use are very different from the one defined by Dataflow(Read/Write)Channel , adapters have to be used in order to allow DataflowStreams to be used with other dataflow elements. The DataflowStreamReadAdapter class will wrap a DataflowStream with necessary methods to read values, while the DataflowStreamWriteAdapter class will provide write methods around the wrapped DataflowStream .

It is important to mention that the DataflowStreamWriteAdapter is thread safe allowing multiple threads to add values to the wrapped DataflowStream through the adapter. On the other hand, DataflowStreamReadAdapter is designed to be used by a single thread.

To minimize the overhead and stay in-line with the DataflowStream semantics, the DataflowStreamReadAdapter class is not thread-safe and should only be used from within a single thread. If multiple threads need to read from a DataflowStream, they should each create their own wrapping DataflowStreamReadAdapter .

Thanks to the adapters DataflowStream can be used for communication between operators or selectors, which expect Dataflow(Read/Write)Channels .

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.stream.DataflowStream
import groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter
import groovyx.gpars.dataflow.stream.DataflowStreamWriteAdapter
import static groovyx.gpars.dataflow.Dataflow.selector
import static groovyx.gpars.dataflow.Dataflow.operator

/** * Demonstrates the use of DataflowStreamAdapters to allow dataflow operators to use DataflowStreams */

final DataflowStream a = new DataflowStream() final DataflowStream b = new DataflowStream() def aw = new DataflowStreamWriteAdapter(a) def bw = new DataflowStreamWriteAdapter(b) def ar = new DataflowStreamReadAdapter(a) def br = new DataflowStreamReadAdapter(b)

def result = new DataflowQueue()

def op1 = operator(ar, bw) { bindOutput it } def op2 = selector([br], [result]) { result << it }

aw << 1 aw << 2 aw << 3 assert([1, 2, 3] == [result.val, result.val, result.val]) op1.stop() op2.stop() op1.join() op2.join()

Also the ability to select a value from multiple DataflowChannels can only be used through an adapter around a DataflowStream :

import groovyx.gpars.dataflow.Select
import groovyx.gpars.dataflow.stream.DataflowStream
import groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter
import groovyx.gpars.dataflow.stream.DataflowStreamWriteAdapter
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the use of DataflowStreamAdapters to allow dataflow select to select on DataflowStreams */

final DataflowStream a = new DataflowStream() final DataflowStream b = new DataflowStream() def aw = new DataflowStreamWriteAdapter(a) def bw = new DataflowStreamWriteAdapter(b) def ar = new DataflowStreamReadAdapter(a) def br = new DataflowStreamReadAdapter(b)

final Select<?> select = select(ar, br) task { aw << 1 aw << 2 aw << 3 } assert 1 == select().value assert 2 == select().value assert 3 == select().value task { bw << 4 aw << 5 bw << 6 } def result = (1..3).collect{select()}.sort{it.value} assert result*.value == [4, 5, 6] assert result*.index == [1, 0, 1]

If you don't need any of the functional queue DataflowStream-special functionality, like generation, filtering or mapping, you may consider using the DataflowBroadcast class instead, which offers the publish-subscribe communication model through the DataflowChannel interface.

Bind handlers

def a = new DataflowVariable()
a >> {println "The variable has just been bound to $it"}
a.whenBound {println "Just to confirm that the variable has been really set to $it"}
...

A bound handlers can be registered on all dataflow channels (variables, queues or broadcasts) either using the >> operator or the whenBound() method. They will be run once a value is bound to the variable.

Dataflow queues and broadcasts also support a wheneverBound method to register a closure or a message handler to run each time a value is bound to them.

def queue = new DataflowQueue()
queue.wheneverBound {println "A value $it arrived to the queue"}

Dataflow variables and broadcasts are one of several possible ways to implement Parallel Speculations . For details, please check out Parallel Speculations in the Parallel Collections section of the User Guide.

Further reading

Scala Dataflow library by Jonas Bonér

JVM concurrency presentation slides by Jonas Bonér

Dataflow Concurrency library for Ruby

7.1 Tasks

The Dataflow tasks give you an easy-to-grasp abstraction of mutually-independent logical tasks or threads, which can run concurrently and exchange data solely through Dataflow Variables, Queues, Broadcasts and Streams. Dataflow tasks with their easy-to-express mutual dependencies and inherently sequential body could also be used as a practical implementation of UML Activity Diagrams .

Check out the examples.

A simple mashup example

In the example we're downloading the front pages of three popular web sites, each in their own task, while in a separate task we're filtering out sites talking about Groovy today and forming the output. The output task synchronizes automatically with the three download tasks on the three Dataflow variables through which the content of each website is passed to the output task.

import static groovyx.gpars.GParsPool.*
import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.task

/** * A simple mashup sample, downloads content of three websites * and checks how many of them refer to Groovy. */

def dzone = new DataflowVariable() def jroller = new DataflowVariable() def theserverside = new DataflowVariable()

task { println 'Started downloading from DZone' dzone << 'http://www.dzone.com'.toURL().text println 'Done downloading from DZone' }

task { println 'Started downloading from JRoller' jroller << 'http://www.jroller.com'.toURL().text println 'Done downloading from JRoller' }

task { println 'Started downloading from TheServerSide' theserverside << 'http://www.theserverside.com'.toURL().text println 'Done downloading from TheServerSide' }

task { withPool { println "Number of Groovy sites today: " + ([dzone, jroller, theserverside].findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } }.join()

Grouping tasks

Dataflow tasks can be organized into groups to allow for performance fine-tuning. Groups provide a handy task() factory method to create tasks attached to the groups. Using groups allows you to organize tasks or operators around different thread pools (wrapped inside the group). While the Dataflow.task() command schedules the task on a default thread pool (java.util.concurrent.Executor, fixed size=#cpu+1, daemon threads), you may prefer being able to define your own thread pool(s) to run your tasks.

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { task { … }

task { … } }

The default thread pool for dataflow tasks contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete. When grouping tasks, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.

A mashup variant with methods

To avoid giving you wrong impression about structuring the Dataflow code, here's a rewrite of the mashup example, with a downloadPage() method performing the actual download in a separate task and returning a DataflowVariable instance, so that the main application thread could eventually get hold of the downloaded content. Dataflow variables can obviously be passed around as parameters or return values.

package groovyx.gpars.samples.dataflow

import static groovyx.gpars.GParsExecutorsPool.* import groovyx.gpars.dataflow.DataflowVariable import static groovyx.gpars.dataflow.Dataflow.task

/** * A simple mashup sample, downloads content of three websites and checks how many of them refer to Groovy. */ final List urls = ['http://www.dzone.com', 'http://www.jroller.com', 'http://www.theserverside.com']

task { def pages = urls.collect { downloadPage(it) } withPool { println "Number of Groovy sites today: " + (pages.findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } }.join()

def downloadPage(def url) { def page = new DataflowVariable() task { println "Started downloading from $url" page << url.toURL().text println "Done downloading from $url" } return page }

A physical calculation example

Dataflow programs naturally scale with the number of processors. Up to a certain level, the more processors you have the faster the program runs. Check out, for example, the following script, which calculates parameters of a simple physical experiment and prints out the results. Each task performs its part of the calculation and may depend on values calculated by some other tasks as well as its result might be needed by some of the other tasks. With Dataflow Concurrency you can split the work between tasks or reorder the tasks themselves as you like and the dataflow mechanics will ensure the calculation will be accomplished correctly.

import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.task

final def mass = new DataflowVariable() final def radius = new DataflowVariable() final def volume = new DataflowVariable() final def density = new DataflowVariable() final def acceleration = new DataflowVariable() final def time = new DataflowVariable() final def velocity = new DataflowVariable() final def decelerationForce = new DataflowVariable() final def deceleration = new DataflowVariable() final def distance = new DataflowVariable()

def t = task { println """ Calculating distance required to stop a moving ball. ==================================================== The ball has a radius of ${radius.val} meters and is made of a material with ${density.val} kg/m3 density, which means that the ball has a volume of ${volume.val} m3 and a mass of ${mass.val} kg. The ball has been accelerating with ${acceleration.val} m/s2 from 0 for ${time.val} seconds and so reached a velocity of ${velocity.val} m/s.

Given our ability to push the ball backwards with a force of ${decelerationForce.val} N (Newton), we can cause a deceleration of ${deceleration.val} m/s2 and so stop the ball at a distance of ${distance.val} m.

======================================================================================================================= This example has been calculated asynchronously in multiple tasks using GPars Dataflow concurrency in Groovy. Author: ${author.val} """

System.exit 0 }

task { mass << volume.val * density.val }

task { volume << Math.PI * (radius.val ** 3) }

task { radius << 2.5 density << 998.2071 //water acceleration << 9.80665 //free fall decelerationForce << 900 }

task { println 'Enter your name:' def name = new InputStreamReader(System.in).readLine() author << (name?.trim()?.size()>0 ? name : 'anonymous') }

task { time << 10 velocity << acceleration.val * time.val }

task { deceleration << decelerationForce.val / mass.val }

task { distance << deceleration.val * ((velocity.val/deceleration.val) ** 2) * 0.5 }

t.join()

Note: I did my best to make all the physical calculations right. Feel free to change the values and see how long distance you need to stop the rolling ball.

Deterministic deadlocks

If you happen to introduce a deadlock in your dependencies, the deadlock will occur each time you run the code. No randomness allowed. That's one of the benefits of Dataflow concurrency. Irrespective of the actual thread scheduling scheme, if you don't get a deadlock in tests, you won't get them in production.

task {
    println a.val
    b << 'Hi there'
}

task { println b.val a << 'Hello man' }

Dataflows map

As a handy shortcut the Dataflows class can help you reduce the amount of code you have to write to leverage Dataflow variables.

def df = new Dataflows()
df.x = 'value1'
assert df.x == 'value1'

Dataflow.task {df.y = 'value2}

assert df.y == 'value2'

Think of Dataflows as a map with Dataflow Variables as keys storing their bound values as appropriate map values. The semantics of reading a value (e.g. df.x) and binding a value (e.g. df.x = 'value') remain identical to the semantics of plain Dataflow Variables (x.val and x << 'value' respectively).

Mixing Dataflows and Groovy with blocks

When inside a with block of a Dataflows instance, the dataflow variables stored inside the Dataflows instance can be accessed directly without the need to prefix them with the Dataflows instance identifier.

new Dataflows().with {
    x = 'value1'
    assert x == 'value1'

Dataflow.task {y = 'value2}

assert y == 'value2' }

Returning a value from a task

Typically dataflow tasks communicate through dataflow variables. On top of that, tasks can also return values, again through a dataflow variable. When you invoke the task() factory method, you get back an instance of DataflowVariable, on which you can listen for the task's return value, just like when using any other DataflowVariable.

final DataflowVariable t1 = task {
        return 10
    }
    final DataflowVariable t2 = task {
        return 20
    }
    def results = [t1, t2]*.val
    println 'Both sub-tasks finished and returned values: ' + results

Obviously the value can also be obtained without blocking the caller using the whenBound() method.

def task = task {
    println 'The task is running and calculating the return value'
    30
}
task >> {value -> println "The task finished and returned $value"}

h2. Joining tasks

Using the join() operation on the result dataflow variable of a task you can block until the task finishes.

task {
     final DataflowVariable t1 = task {
         println 'First sub-task running.'
     }
     final DataflowVariable t2 = task {
         println 'Second sub-task running'
     }
     [t1, t2]*.join()
     println 'Both sub-tasks finished'
 }.join()

7.2 Selects

Frequently a value needs to be obtained from one of several dataflow channels (variables, queues, broadcasts or streams). The Select class is suitable for such scenarios. Select can scan multiple dataflow channels and pick one channel from all the input channels, which currently have a value available for read. The value from that channels is read and returned to the caller together with the index of the originating channel. Picking the channel is either random, or based on channel priority, in which case channels with lower position index in the Select constructor have higher priority.

Selecting a value from multiple channels

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Shows a basic use of Select, which monitors a set of input channels for values and makes these values * available on its output irrespective of their original input channel. * Note that dataflow variables and queues can be combined for Select. * * You might also consider checking out the prioritySelect method, which prioritizes values by the index of their input channel */ def a = new DataflowVariable() def b = new DataflowVariable() def c = new DataflowQueue()

task { sleep 3000 a << 10 }

task { sleep 1000 b << 20 }

task { sleep 5000 c << 30 }

def select = select([a, b, c]) println "The fastest result is ${select().value}"

Note that the return type from select() is SelectResult , holding the value as well as the originating channel index.

There are multiple ways to read values from a Select:

def sel = select(a, b, c, d)
def result = sel.select()                                       //Random selection
def result = sel()                                              //Random selection (a short-hand variant)
def result = sel.select([true, true, false, true])              //Random selection with guards specified
def result = sel([true, true, false, true])                     //Random selection with guards specified (a short-hand variant)
def result = sel.prioritySelect()                               //Priority selection
def result = sel.prioritySelect([true, true, false, true])      //Priority selection with guards specifies

By default the Select blocks the caller until a value to read is available. Alternatively, Select allows to have the value sent to a provided MessageStream (e.g. an actor) without blocking the caller.

def handler = actor {...}
def sel = select(a, b, c, d)

sel.select(handler) //Random selection sel(handler) //Random selection (a short-hand variant) sel.select(handler, [true, true, false, true]) //Random selection with guards specified sel(handler, [true, true, false, true]) //Random selection with guards specified (a short-hand variant) sel.prioritySelect(handler) //Priority selection sel.prioritySelect(handler, [true, true, false, true]) //Priority selection with guards specifies

Guards

Guards allow the caller to omit some input channels from the selection. Guards are specified as a List of boolean flags passed to the select() or prioritySelect() methods.

def sel = select(leaders, seniors, experts, juniors)
def teamLead = sel([true, true, false, false]).value        //Only 'leaders' and 'seniors' qualify for becoming a teamLead here

A typical use for guards is to make Selects flexible to adopt to the changes in the user state.

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()

def t = task { final def select = select(operations, numbers) 3.times { def instruction = select([true, false]).value def num1 = select([false, true]).value def num2 = select([false, true]).value final def formula = "$num1 $instruction $num2" println "$formula = ${new GroovyShell().evaluate(formula)}" } }

task { operations << '+' operations << '+' operations << '*' }

task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }

t.join()

Priority Select

When certain channels should have precedence over others when selecting, the prioritySelect methods should be used instead.

/**
 * Shows a basic use of Priority Select, which monitors a set of input channels for values and makes these values
 * available on its output irrespective of their original input channel.
 * Note that dataflow variables, queues and broadcasts can be combined for Select.
 * Unlike plain select method call, the prioritySelect call gives precedence to input channels with lower index.
 * Available messages from high priority channels will be served before messages from lower-priority channels.
 * Messages received through a single input channel will have their mutual order preserved.
 *
 */
def critical = new DataflowVariable()
def ordinary = new DataflowQueue()
def whoCares = new DataflowQueue()

task { ordinary << 'All working fine' whoCares << 'I feel a bit tired' ordinary << 'We are on target' }

task { ordinary << 'I have just started my work. Busy. Will come back later...' sleep 5000 ordinary << 'I am done for now' }

task { whoCares << 'Huh, what is that noise' ordinary << 'Here I am to do some clean-up work' whoCares << 'I wonder whether unplugging this cable will eliminate that nasty sound.' critical << 'The server room goes on UPS!' whoCares << 'The sound has disappeared' }

def select = select([critical, ordinary, whoCares]) println 'Starting to monitor our IT department' sleep 3000 10.times {println "Received: ${select.prioritySelect().value}"}

7.3 Operators

Dataflow Operators and Selectors provide a full Dataflow implementation with all the usual ceremony.

Concepts

Full dataflow concurrency builds on the concept of channels connecting operators and selectors, which consume values coming through input channels, transform them into new values and output the new values into their output channels. While Operators wait for all input channels to have a value available for read before they start process them, Selectors are triggered by a value available on any of the input channels.

operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
    …
    bindOutput 0, x + y + z
}

/**
 * CACHE
 *
 * Caches sites' contents. Accepts requests for url content, outputs the content. Outputs requests for download
 * if the site is not in cache yet.
 */
operator(inputs: [urlRequests], outputs: [downloadRequests, sites]) {request ->

if (!request.content) { println "[Cache] Retrieving ${request.site}" def content = cache[request.site] if (content) { println "[Cache] Found in cache" bindOutput 1, [site: request.site, word:request.word, content: content] } else { def downloads = pendingDownloads[request.site] if (downloads != null) { println "[Cache] Awaiting download" downloads << request } else { pendingDownloads[request.site] = [] println "[Cache] Asking for download" bindOutput 0, request } } } else { println "[Cache] Caching ${request.site}" cache[request.site] = request.content bindOutput 1, request def downloads = pendingDownloads[request.site] if (downloads != null) { for (downloadRequest in downloads) { println "[Cache] Waking up" bindOutput 1, [site: downloadRequest.site, word:downloadRequest.word, content: request.content] } pendingDownloads.remove(request.site) } } }

The standard error handling will print out an error message to standard error output and stop the operator in case an uncaught exception is thrown from withing the operator's body. To alter the behavior, you can redefine the reportError() method on the operator:

op.metaClass.reportError = {Throwable e ->
        //handle the exception
        stop()  //You can also stop the operator
    }

Types of operators

There are specialized versions of operators serving specific purposes:

Chaining operators

Operators are typically combined into networks, when some operators consume output by other operators.

operator(inputs:[a, b], outputs:[c, d]) {...}
splitter(c, [e, f])
selector(inputs:[e, d]: outputs:[]) {...}

You may alternatively refer to output channels through operators themselves:

def op1 = operator(inputs:[a, b], outputs:[c, d]) {...}
def sp1 = splitter(op1.outputs[0], [e, f])                            //takes the first output of op1
selector(inputs:[sp1.outputs[0], op1.outputs[1]]: outputs:[]) {...}   //takes the first output of sp1 and the second output of op1

Parallelize operators

By default an operator's body is processed by a single thread at a time. While this is a safe setting allowing the operator's body to be written in a non-thread-safe manner, once an operator becomes "hot" and data start to accumulate in the operator's input queues, you might consider allowing multiple threads to run the operator's body concurrently. Bear in mind that in such a case you need to avoid or protect shared resources from multi-threaded access. To enable multiple threads to run the operator's body concurrently, pass an extra maxForks parameter when creating an operator:

def op = operator(inputs: [a, b, c], outputs: [d, e], maxForks: 2) {x, y, z ->
    bindOutput 0, x + y + z
    bindOutput 1, x * y * z
}

The value of the maxForks parameter indicates the maximum of threads running the operator concurrently. Only positive numbers are allowed with value 1 being the default.

Please always make sure the group serving the operator holds enough threads to support all requested forks. Using groups allows you to organize tasks or operators around different thread pools (wrapped inside the group). While the Dataflow.task() command schedules the task on a default thread pool (java.util.concurrent.Executor, fixed size=#cpu+1, daemon threads), you may prefer being able to define your own thread pool(s) to run your tasks.

def group = new DefaultPGroup(10)
group.operator((inputs: [a, b, c], outputs: [d, e], maxForks: 5) {x, y, z -> ...}

The default group uses a resizeable thread pool as so will never run out of threads.

Synchronizing the output

When enabling internal parallelization of an operator by setting the value for maxForks to a value greater than 1 it is important to remember that without explicit or implicit synchronization in the operators' body race-conditions may occur. Especially bear in mind that values written to multiple output channels are not guarantied to be written atomically in the same order to all the channels

operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    bindOutput 0, msg
    bindOutput 1, msg
}
inputChannel << 1
inputChannel << 2
inputChannel << 3
inputChannel << 4
inputChannel << 5
May result in output channels having the values mixed-up something like:
a -> 1, 3, 2, 4, 5
b -> 2, 1, 3, 5, 4

Explicit synchronization is one way to get correctly bound all output channels and protect operator not-thread local state:

def lock = new Object()
operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    doStuffThatIsThreadSafe()

synchronized(lock) { doSomethingThatMustNotBeAccessedByMultipleThreadsAtTheSameTime() bindOutput 0, msg bindOutput 1, 2*msg } }

Obviously you need to weight the pros and cons here, since synchronization may defeat the purpose of setting maxForks to a value greater than 1.

To set values of all the operator's output channels in one atomic step, you may also consider calling either the bindAllOutputsAtomically method, passing in a single value to write to all output channels or the bindAllOutputsAtomically method, which takes a multiple values, each of which will be written to the output channel with the same position index.

operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    doStuffThatIsThreadSafe()
        bindAllOutputValuesAtomically msg, 2*msg
    }
}

Using the bindAllOutputs or the bindAllOutputValues methods will not guarantee atomicity of writes across al the output channels when using internal parallelism. If preserving the order of messages in multiple output channels is not an issue, bindAllOutputs as well as bindAllOutputValues will provide better performance over the atomic variants.

Stopping operators

Dataflow operators and selectors can be stopped in two ways:

  1. by calling the stop() method on all operators that need to be stopped
  2. by sending a poisson message.

Using the stop() method:

def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }

def op2 = selector(inputs: [d], outputs: [f, out]) { }

def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

[op1, op2, op3]*.stop() //Stop all operators by calling the stop() method on them op1.join() op2.join() op3.join()

Using the poisson message:

def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }

def op2 = selector(inputs: [d], outputs: [f, out]) { }

def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

a << PoisonPill.instance //Send the poisson

op1.join() op2.join() op3.join()

After receiving a poisson an operator stops. It only makes sure the poisson is first sent to all its output channels, so that the poisson can spread to the connected operators.

Grouping operators

Dataflow operators can be organized into groups to allow for performance fine-tuning. Groups provide a handy operator() factory method to create tasks attached to the groups.

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { operator(inputs: [a, b, c], outputs: [d]) {x, y, z -> … bindOutput 0, x + y + z } }

The default thread pool for dataflow operators contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete. When grouping operators, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.

Selectors

Selector's body should be a closure consuming either one or two arguments.

selector (inputs : [a, b, c], outputs : [d, e]) {value ->
    ....
}

The two-argument closure will get a value plus an index of the input channel, the value of which is currently being processed. This allows the selector to distinguish between values coming through different input channels.

selector (inputs : [a, b, c], outputs : [d, e]) {value, index ->
    ....
}

Priority Selector

When priorities need to be preserved among input channels, a DataflowPrioritySelector should be used.

prioritySelector(inputs : [a, b, c], outputs : [d, e]) {value, index ->
    …
}

The priority selector will always prefer values from channels with lower position index over values coming through the channels with higher position index.

Join selector

A selector without a body closure specified will copy all incoming values to all of its output channels.

def join = selector (inputs : [programmers, analysis, managers], outputs : [employees, colleagues])

Internal parallelism

The maxForks attribute allowing for internal selectors parallelism is also available.

selector (inputs : [a, b, c], outputs : [d, e], maxForks : 5) {value ->
    ....
}

Guards

Just like Selects , Selectors also allow the users to temporarily include/exclude individual input channels from selection. The guards input property can be used to set the initial mask on all input channels and the setGuards and setGuard methods are then available in the selector's body.

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.selector
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()

def instruction def nums = []

selector(inputs: [operations, numbers], outputs: [], guards: [true, false]) {value, index -> //initial guards is set here if (index == 0) { instruction = value setGuard(0, false) //setGuard() used here setGuard(1, true) } else nums << value if (nums.size() == 2) { setGuards([true, false]) //setGuards() used here final def formula = "${nums[0]} $instruction ${nums[1]}" println "$formula = ${new GroovyShell().evaluate(formula)}" nums.clear() } }

task { operations << '+' operations << '+' operations << '*' }

task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }

Avoid combining guards and maxForks greater than 1. Although the Selector is thread-safe and won't be damaged in any way, the guards are likely not to be set the way you expect. The multiple threads running selector's body concurrently will tend to over-write each-other's settings to the guards property.

7.4 Dataflow implementation

The Dataflow Concurrency in GPars builds on top of its actor support. All of the dataflow tasks share a thread pool and so the number threads created through Dataflow.task() factory method don't need to correspond to the number of physical threads required from the system. The PGroup.task() factory method can be used to attach the created task to a group. Since each group defines its own thread pool, you can easily organize tasks around different thread pools just like you do with actors.

Combining actors and Dataflow Concurrency

The good news is that you can combine actors and Dataflow Concurrency in any way you feel fit for your particular problem at hands. You can freely you use Dataflow Variables from actors.

final DataflowVariable a = new DataflowVariable()

final Actor doubler = Actors.actor { react {message-> a << 2 * message } }

final Actor fakingDoubler = actor { react { doubler.send it //send a number to the doubler println "Result ${a.val}" //wait for the result to be bound to 'a' } }

fakingDoubler << 10

In the example you see the "fakingDoubler" using both messages and a DataflowVariable to communicate with the doubler actor.

Using plain java threads

The DataflowVariable as well as the DataflowQueue classes can obviously be used from any thread of your application, not only from the tasks created by Dataflow.task() . Consider the following example:

import groovyx.gpars.dataflow.DataflowVariable

final DataflowVariable a = new DataflowVariable<String>() final DataflowVariable b = new DataflowVariable<String>()

Thread.start { println "Received: $a.val" Thread.sleep 2000 b << 'Thank you' }

Thread.start { Thread.sleep 2000 a << 'An important message from the second thread' println "Reply: $b.val" }

We're creating two plain java.lang.Thread instances, which exchange data using the two data flow variables. Obviously, neither the actor lifecycle methods, nor the send/react functionality or thread pooling take effect in such scenarios.

7.5 Classic examples

The Sieve of Eratosthenes implementation using dataflow tasks

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using dataflow tasks */

final int requestedPrimeNumberCount = 1000

final DataflowQueue initialChannel = new DataflowQueue()

/** * Generating candidate numbers */ task { (2..10000).each { initialChannel << it } }

/** * Chain a new filter for a particular prime number to the end of the Sieve * @param inChannel The current end channel to consume * @param prime The prime number to divide future prime candidates with * @return A new channel ending the whole chain */ def filter(inChannel, int prime) { def outChannel = new DataflowQueue()

task { while (true) { def number = inChannel.val if (number % prime != 0) { outChannel << number } } } return outChannel }

/** * Consume Sieve output and add additional filters for all found primes */ def currentOutput = initialChannel requestedPrimeNumberCount.times { int prime = currentOutput.val println "Found: $prime" currentOutput = filter(currentOutput, prime) }

The Sieve of Eratosthenes implementation using a combination of dataflow tasks and operators

import groovyx.gpars.dataflow.DataflowQueue
       import static groovyx.gpars.dataflow.Dataflow.operator
       import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using dataflow tasks and operators */

final int requestedPrimeNumberCount = 100

final DataflowQueue initialChannel = new DataflowQueue()

/** * Generating candidate numbers */ task { (2..1000).each { initialChannel << it } }

/** * Chain a new filter for a particular prime number to the end of the Sieve * @param inChannel The current end channel to consume * @param prime The prime number to divide future prime candidates with * @return A new channel ending the whole chain */ def filter(inChannel, int prime) { def outChannel = new DataflowQueue()

operator([inputs: [inChannel], outputs: [outChannel]]) { if (it % prime != 0) { bindOutput it } } return outChannel }

/** * Consume Sieve output and add additional filters for all found primes */ def currentOutput = initialChannel requestedPrimeNumberCount.times { int prime = currentOutput.val println "Found: $prime" currentOutput = filter(currentOutput, prime) }