7.9 Kanban Flow - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.0.0
7.9 Kanban Flow
APIs: KanbanFlow | KanbanLink | KanbanTray | ProcessingNodeKanbanFlow
A KanbanFlow is a composed object that uses dataflow abstractions to define dependencies between multiple concurrent producer and consumer operators.Each link between a producer and a consumer is defined by a KanbanLink .Inside each KanbanLink, the communication between producer and consumer follows the KanbanFlow pattern as described in The KanbanFlow Pattern (recommended read). They use objects of type KanbanTray to send products downstream and signal requests for further products back to the producer.The figure below shows a KanbanLink with one producer, one consumer and five trays numbered 0 to 4. Tray number 0 has been used to take a product from producer to consumer, has been emptied by the consumer and is now sent back to the producer's input queue. Trays 1 and 2 wait carry products waiting for consumption, trays 3 and 4 wait to be used by producers.
import static groovyx.gpars.dataflow.ProcessingNode.node import groovyx.gpars.dataflow.KanbanFlowdef producer = node { down -> down 1 } def consumer = node { up -> println up.take() }new KanbanFlow().with { link producer to consumer start() // run for a while stop() }
send()
method, the <<
operator, or use the tray as a method object.
The following lines are equivalent:
node { down -> down.send 1 } node { down -> down << 1 } node { down -> down 1 }
take()
method, the empty
tray is automatically released.
You should call take()
only once!
If you prefer to not using an empty tray for sending products downstream (as typically
the case when a ProcessingNode acts as a filter), you must
release the tray in order to keep it in play. Otherwise, the number of trays in the
system decreases. You can release a tray either by calling the release()
method
or by using the ~
operator (think "shake it off").
The following lines are equivalent:
node { down -> down.release() } node { down -> ~down }
Trays are automatically released, if you call any of thetake()
orsend()
methods.
Various linking structures
In addition to a linear chains, a KanbanFlow can also link a single producer to multiple consumers (tree) or multiple producers to a single consumer (collector) or any combination of the above that results in a directed acyclic graph (DAG).The KanbanFlowTest class has many examples for such structures, including scenarios where a single producer delegates work to multiple consumers with- a work-stealing strategy where all consumers get their pick from the downstream,
- a master-slave strategy where a producer chooses from the available consumers, and
- a broadcast strategy where a producer sends all products to all consumers.
Composing KanbanFlows
Just as KanbanLink objects can be chained together to form a KanbanFlow , flows themselves can be composed again to form new greater flows from existing smaller ones.def firstFlow = new KanbanFlow() def producer = node(counter) def consumer = node(repeater) firstFlow.link(producer).to(consumer)def secondFlow = new KanbanFlow() def producer2 = node(repeater) def consumer2 = node(reporter) secondFlow.link(producer2).to(consumer2)flow = firstFlow + secondFlowflow.start()
Customizing concurrency characteristics
The amount of concurrency in a kanban system is determined by the number of trays (sometimes called WIP = work in progress). With no trays in the streams, the system does nothing.- With one tray only, the system is confined to sequential execution.
- With more trays, concurrency begins.
- With more trays than available processing units, the system begins to waste resources.
flow.start(0) // start without trays flow.start(1) // start with one tray per link in the flow flow.start() // start with the optimal number of trays
pooledGroup
property.Test: KanbanFlowTest
Demos:
DemoKanbanFlow
DemoKanbanFlowBroadcast
DemoKanbanFlowCycle
DemoKanbanLazyPrimeSequenceLoops