(Quick Reference)

7.6 Pipeline DSL - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.0.0

7.6 Pipeline DSL

A DSL for building operators pipelines

Building dataflow networks can be further simplified. GPars offers handy shortcuts for the common scenario of building (mostly linear) pipelines of operators.

def toUpperCase = {s -> s.toUpperCase()}

final encrypt = new DataflowQueue() final DataflowReadChannel encrypted = encrypt | toUpperCase | {it.reverse()} | {'###encrypted###' + it + '###'}

encrypt << "I need to keep this message secret!" encrypt << "GPars can build linear operator pipelines really easily"

println encrypted.val println encrypted.val

This saves you from directly creating, wiring and manipulating all the channels and operators that are to form the pipeline. The pipe operator lets you hook an output of one function/operator/process to the input of another one. Just like chaining system processes on the command line.

The pipe operator is a handy shorthand for a more generic chainWith() method:

def toUpperCase = {s -> s.toUpperCase()}

final encrypt = new DataflowQueue() final DataflowReadChannel encrypted = encrypt.chainWith toUpperCase chainWith {it.reverse()} chainWith {'###encrypted###' + it + '###'}

encrypt << "I need to keep this message secret!" encrypt << "GPars can build linear operator pipelines really easily"

println encrypted.val println encrypted.val

Combining pipelines with straight operators

Since each operator pipeline has an entry and an exit channel, pipelines can be wired into more complex operator networks. Only your imagination can limit your ability to mix pipelines with channels and operators in the same network definitions.

def toUpperCase = {s -> s.toUpperCase()}
def save = {text ->
    //Just pretending to be saving the text to disk, database or whatever
    println 'Saving ' + text
}

final toEncrypt = new DataflowQueue() final DataflowReadChannel encrypted = toEncrypt.chainWith toUpperCase chainWith {it.reverse()} chainWith {'###encrypted###' + it + '###'}

final DataflowQueue fork1 = new DataflowQueue() final DataflowQueue fork2 = new DataflowQueue() splitter(encrypted, [fork1, fork2]) //Split the data flow

fork1.chainWith save //Hook in the save operation

//Hook in a sneaky decryption pipeline final DataflowReadChannel decrypted = fork2.chainWith {it[15..-4]} chainWith {it.reverse()} chainWith {it.toLowerCase()} .chainWith {'Groovy leaks! Check out a decrypted secret message: ' + it}

toEncrypt << "I need to keep this message secret!" toEncrypt << "GPars can build operator pipelines really easy"

println decrypted.val println decrypted.val

The type of the channel is preserved across the whole pipeline. E.g. if you start chaining off a synchronous channel, all the channels in the pipeline will be synchronous. In that case, obviously, the whole chain blocks, including the writer who writes into the channel at head, until someone reads data off the tail of the pipeline.
final SyncDataflowQueue queue = new SyncDataflowQueue()
final result = queue.chainWith {it * 2}.chainWith {it + 1} chainWith {it * 100}

Thread.start { 5.times { println result.val } }

queue << 1 queue << 2 queue << 3 queue << 4 queue << 5

Joining pipelines

Two pipelines (or channels) can be connected using the into() method:

final encrypt = new DataflowQueue()
final DataflowWriteChannel messagesToSave = new DataflowQueue()
encrypt.chainWith toUpperCase chainWith {it.reverse()} into messagesToSave

task { encrypt << "I need to keep this message secret!" encrypt << "GPars can build operator pipelines really easy" }

task { 2.times { println "Saving " + messagesToSave.val } }

The output of the encryption pipeline is directly connected to the input of the saving pipeline (a single channel in out case).

Forking the data flow

When a need comes to copy the output of a pipeline/channel into more than one following pipeline/channel, the split() method will help you:

final encrypt = new DataflowQueue()
final DataflowWriteChannel messagesToSave = new DataflowQueue()
final DataflowWriteChannel messagesToLog = new DataflowQueue()

encrypt.chainWith toUpperCase chainWith {it.reverse()}.split(messagesToSave, messagesToLog)

Tapping into the pipeline

Like split() the tap() method allows you to fork the data flow into multiple channels. Tapping, however, is slightly more convenient in some scenarios, since it treats one of the two new forks as the successor of the pipeline.

queue.chainWith {it * 2}.tap(logChannel).chainWith{it + 1}.tap(logChannel).into(PrintChannel)

Merging channels

Merging allows you to join multiple read channels as inputs for a single dataflow operator. The function passed as the second argument needs to accept as many arguments as there are channels being merged - each will hold a value of the corresponding channel.

maleChannel.merge(femaleChannel) {m, f -> m.marry(f)}.into(mortgageCandidatesChannel)

Separation

Separation is the opposite operation to merge . The supplied closure returns a list of values, each of which will be output into an output channel with the corresponding position index.

queue1.separate([queue2, queue3, queue4]) {a -> [a-1, a, a+1]}

Choices

The binaryChoice() and choice() methods allow you to send a value to one out of two (or many) output channels, as indicated by the return value from a closure.

queue1.binaryChoice(queue2, queue3) {a -> a > 0}
queue1.choice([queue2, queue3, queue4]) {a -> a % 3}

Filtering

The filter() method allows to filter data in the pipeline using boolean predicates.

final DataflowQueue queue1 = new DataflowQueue()
        final DataflowQueue queue2 = new DataflowQueue()

final odd = {num -> num % 2 != 0 }

queue1.filter(odd) into queue2 (1..5).each {queue1 << it} assert 1 == queue2.val assert 3 == queue2.val assert 5 == queue2.val

Null values

If a chained function returns a null value, it is normally passed along the pipeline as a valid value. To indicate to the operator that no value should be passed further down the pipeline, a NullObject.nullObject instance must be returned.

final DataflowQueue queue1 = new DataflowQueue()
        final DataflowQueue queue2 = new DataflowQueue()

final odd = {num -> if (num == 5) return null //null values are normally passed on if (num % 2 != 0) return num else return NullObject.nullObject //this value gets blocked }

queue1.chainWith odd into queue2 (1..5).each {queue1 << it} assert 1 == queue2.val assert 3 == queue2.val assert null == queue2.val

Customizing the thread pools

All of the Pipeline DSL methods allow for custom thread pools or PGroups to be specified:

channel | {it * 2}

channel.chainWith(closure) channel.chainWith(pool) {it * 2} channel.chainWith(group) {it * 2}

channel.into(otherChannel) channel.into(pool, otherChannel) channel.into(group, otherChannel)

channel.split(otherChannel1, otherChannel2) channel.split(otherChannels) channel.split(pool, otherChannel1, otherChannel2) channel.split(pool, otherChannels) channel.split(group, otherChannel1, otherChannel2) channel.split(group, otherChannels)

channel.tap(otherChannel) channel.tap(pool, otherChannel) channel.tap(group, otherChannel)

channel.merge(otherChannel) channel.merge(otherChannels) channel.merge(pool, otherChannel) channel.merge(pool, otherChannels) channel.merge(group, otherChannel) channel.merge(group, otherChannels)

channel.filter( otherChannel) channel.filter(pool, otherChannel) channel.filter(group, otherChannel)

channel.binaryChoice( trueBranch, falseBranch) channel.binaryChoice(pool, trueBranch, falseBranch) channel.binaryChoice(group, trueBranch, falseBranch)

channel.choice( branches) channel.choice(pool, branches) channel.choice(group, branches)

channel.separate( outputs) channel.separate(pool, outputs) channel.separate(group, outputs)

Overriding the default PGroup

To avoid the necessity to specify PGroup for each Pipeline DSL method separately you may override the value of the default Dataflow PGroup.

Dataflow.usingGroup(group) {
    channel.choice(branches)
}
//Is identical to
channel.choice(group, branches)

The Dataflow.usingGroup() method resets the value of the default dataflow PGroup for the given code block to the value specified.

The pipeline builder

The Pipeline class offers an intuitive builder for operator pipelines. The greatest benefit of using the Pipeline class compared to chaining the channels directly is the ease with which a custom thread pool/group can be applied to all the operators along the constructed chain. The available methods and overloaded operators are identical to the ones available on channels directly.

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.operator.Pipeline
import groovyx.gpars.scheduler.DefaultPool
import groovyx.gpars.scheduler.Pool

final DataflowQueue queue = new DataflowQueue() final DataflowQueue result1 = new DataflowQueue() final DataflowQueue result2 = new DataflowQueue() final Pool pool = new DefaultPool(false, 2)

final negate = {-it}

final Pipeline pipeline = new Pipeline(pool, queue)

pipeline | {it * 2} | {it + 1} | negate pipeline.split(result1, result2)

queue << 1 queue << 2 queue << 3

assert -3 == result1.val assert -5 == result1.val assert -7 == result1.val

assert -3 == result2.val assert -5 == result2.val assert -7 == result2.val

pool.shutdown()

Passing construction parameters through the Pipeline DSL

You are likely to frequently need the ability to pass additional initialization parameters to the operators, such as the listeners to attach or the value for maxForks . Just like when building operators directly, the Pipeline DSL methods accept an optional map of parameters to pass in.

new Pipeline(group, queue1).merge([maxForks: 4, listeners: [listener]], queue2) {a, b -> a + b}.into queue3