The actor support in gpars were inspired by the Actors library in Scala but have meanwhile gone beyond that.Actors allow for a messaging-based concurrency model, built from independent active objects that exchange messages and have no mutable shared state. Actors can help developers avoid issues like deadlocks, live-locks or starvation, so typical for shared memory, while leveraging the multi-core nature of today's hardware.
A nice wrap-up of the key concepts behind actors was written recently by Ruben Vermeersch.
Actors guarantee that always at most one thread processes the actor's body at a time and also under the covers the memory gets synchronized
each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort .
Ideally actor's code should never be invoked directly from outside so all the code of the actor class can only be executed by the thread
handling the last received message and so all the actor's code is implicitly thread-safe .
If any of the actor's methods is allowed to be called by other objects directly, the thread-safety guarantee for the actor's code and state are no longer valid .
Actors can share a relatively small thread pool. This can go as far as having many concurrent actors that share a single pooled thread. They avoid the threading limitations of the JVM.Actor code is processed in chunks separated by quiet periods of waiting for new events (messages). This can be naturally modeled through continuations . As JVM doesn't support continuations directly, they have to be simulated in the actors frameworks, which has slight impact on organization of the actors' code. However, the benefits in most cases outweigh the difficulties.import groovyx.gpars.actor.AbstractPooledActorclass GameMaster extends AbstractPooledActor {
int secretNum void afterStart() {
secretNum = new Random().nextInt(10)
} void act() {
loop {
react { int num ->
if ( num > secretNum )
reply 'too large'
else if ( num < secretNum )
reply 'too small'
else {
reply 'you win'
stop()
System.exit 0
} } } } }class Player extends AbstractPooledActor {
String name
AbstractPooledActor server
int myNum void act() {
loop {
myNum = new Random().nextInt(10)
server.send myNum
react {
switch( it ) {
case 'too large': println "$name: $myNum was too large"; break
case 'too small': println "$name: $myNum was too small"; break
case 'you win': println "$name: I won $myNum"; stop(); break
} } } } }def master = new GameMaster().start()
new Player( name: 'Player', server: master ).start()[master, player]*.join()
example by _Jordi Campos i Miralles, Departament de Matemàtica Aplicada i Anàlisi, MAiA Facultat de Matemàtiques, Universitat de Barcelona_Usage of Actors
Gpars provides consistent Actor APIs and DSLs. Actors in principal perform three specific operations - send messages, receive messages and create new actors. Although not specifically enforced by GPars
messages should be immutable or at least follow the hands-off policy when the sender never touches the messages after the message has been sent off.Sending messages
Messages can be sent to actors using the send() method. Alternatively, the << operator or the implicit call() method can be used. A family of sendAndWait() methods is available to block the caller until a reply from the actor is available.
The reply is returned from the sendAndWait() method as a return value.
The sendAndWait() methods may also return after a timeout expires or in case of termination of the called actor.
actor.send 'Message'
actor << 'Message' //using the << operator
actor 'Message' //using the implicit call() method
def reply1 = actor.sendAndWait('Message')
def reply2 = actor.sendAndWait(10, TimeUnit.SECONDS, 'Message')
def reply3 = actor.sendAndWait(10.seconds, 'Message')
The sendAndContinue() method allows the caller to continue its processing while the supplied closure is waiting for a reply from the actor.
friend.sendAndContinue 'I need money!', {money -> pocket money}
println 'I can continue while my friend is collecting money for me'
All send()_, _sendAndWait() or sendAndContinue() methods will throw an exception if invoked on a non-active actor.Receiving messages
Non-blocking message retrieval
Calling the react() method, optionally with a timeout parameter, from within the actor's code will consume the next message from the actor's inbox,
potentially waiting, if there is no message to be processed immediately.println 'Waiting for a gift'
react {gift ->
if (myWife.likes gift) reply 'Thank you!'
}
Under the covers the supplied closure is not invoked directly, but scheduled for processing by any thread in the thread pool once
a message is available. After scheduling the current thread will then be detached from the actor and freed to process any other actor,
which has received a message already.To allow detaching actors from the threads the react() method demands the code to be written in a special Continuation-style.
loop {
println 'Waiting for a gift'
react {gift ->
if (myWife.likes gift) reply 'Thank you!'
else {
reply 'Try again, please'
react {anotherGift ->
if (myChildren.like gift) reply 'Thank you!'
}
println 'Never reached'
}
}
println 'Never reached'
}
println 'Never reached'
The react() and loop() methods never return normally and any code put after a call to either of the two methods will never be executed.
The closure supplied to the react() or loop() methods is the code where the computation should continue . Thus continuation style .Blocking message retrieval
Unlike the react() method, which gives up the current thread until a message is available for an actor, the receive() method blocks waiting for a message.
This allows for a non-continuation style code and also might have positive performance implications in certain scenarios.Mixing react() and receive() calls within a single actor is also possible.Actors.actor {
def msg1 = receive()
receive {msg2, msg3 ->
[msg1, msg2, msg3]*.reply 'Hi!'
}
react {msg4 ->
msg4.reply 'You're the last today!'
}
}.start()
Sending replies
The reply/replyIfExists methods are not only defined on the actors themselves, but also on the messages upon their reception, which is particularly handy when handling multiple messages in a single call. In such cases reply() invoked on the actor sends a reply to authors of all the currently processed message (the last one), whereas reply() called on messages sends a reply to the author of the particular message only.react {offerA ->
react {offerB ->
react {offerC ->
//sent to each of the senders
[offerA, offerB, offerC]*.reply 'Received your kind offer. Now processing it and comparing with others.' offerA.reply 'You were the fastest' //sent to the author of offerA only def winnerOffer = [offerA, offerB, offerC].min {it.price}
winnerOffer.reply 'I accept your reasonable offer' //sent to the winner only
([offerA, offerB, offerC] - [winnerOffer])*.reply 'Maybe next time' //sent to the loosers only
}
}
}
The sender property
Messages upon retrieval offer the sender property to identify the originator of the message
react {tweet ->
if (isSpam(tweet)) ignoreTweetsFrom tweet.sender
}
Forwarding
When sending a message a different actor can be specified as the sender so that potential replies to the message will be forwarded to the specified actor and not to the actual originator.def decryptor = actor {
react {message ->
reply message.reverse()
// message.reply message.reverse() //Alternatives to send replies
// message.sender.send message.reverse()
}
}def console = actor { //This actor will print out decrypted messages, since the replies are forwarded to it
react {
println 'Decrypted message: ' + it
}
}decryptor.send 'lellarap si yvoorG', console //Specify an actor to send replies to
Creating Actors
Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned to back the pool once a message has been processed and the actor is idle waiting for some more messages to arrive.For example, this is how you create an actor that prints out all messages that it receives.import static groovyx.gpars.actor.Actors.*def console = actor {
loop {
react {
println it
}
}
}
Notice the loop() method call, which ensures that the actor doesn't stop after having processed the first message.Here's an example with a decryptor service, which can decrypt submitted messages and send the decrypted messages back to the originators.import static groovyx.gpars.actor.Actors.*final def decryptor = actor {
loop {
react {String message->
if ('stopService' == message) stop()
else reply message.reverse()
}
}
}actor {
decryptor.send 'lellarap si yvoorG'
react {
println 'Decrypted message: ' + it
decryptor.send 'stopService'
}
}
Here's an example of an actor that waits for up to 30 seconds to receive a message, prints it out and terminates.import static groovyx.gpars.actor.Actors.*def me = actor {
delegate.metaClass.onTimeout = {-> friend.send('I see, busy as usual. Never mind.')} friend.send('Hi')
react(30.seconds) {
//continue conversation
}
}
Undelivered messages
Sometimes messages cannot be delivered to the target actor. When special action needs to be taken for undelivered messages, at actor termination all unprocessed messages from its queue have their onDeliveryError() method called. The onDeliveryError() method or closure defined on the message can, for example, send a notification back to the original sender of the message.final AbstractPooledActor me
me = Actors.actor {
def message1 = 1
def message2 = 2 message1.metaClass.onDeliveryError = {->
me << "Could not deliver $delegate"
} message2.metaClass.onDeliveryError = {->
me << "Could not deliver $delegate"
} actor1 << message1
actor2 << message1
…
}
Joining actors
Actors provide a join() method to allow callers to wait for the actor to terminate. A variant accepting a timeout is also available. The Groovy spread-dot operator comes in handy when joining multiple actors at a time.def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()[master, player]*.join()
Custom schedulers
Actors leverage the standard JDK concurrency library by default. To provide a custom thread scheduler use the appropriate constructor parameter when creating an actor group. The supplied scheduler will orchestrate threads in the group's thread pool.
Please also see the numerous Actor Demos.
Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned back to the pool once a message has been processed and the actor is idle waiting for some more messages to arrive. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of _event-based actors_, which are detached from the underlying physical threads.Here are some examples of how to use actors. This is how you create an actor that prints out all messages that it receives.import static groovyx.gpars.actor.Actors.*def console = actor {
loop {
react {
println it
}
}
Notice the loop() method call, which ensures that the actor doesn't stop after having processed the first message.As an alternative you can extend the AbstractPooledActor class and override the act() method. Once you instantiate the actor, you need to start it so that it attaches itself to the thread pool and can start accepting messages.
The actor() factory method will take care of starting the actor.class CustomActor extends AbstractPooledActor {
@Override protected void act() {
loop {
react {
println it
}
}
}
}def console=new CustomActor()
console.start()
Messages can be sent to the actor using multiple methodsconsole.send('Message')
console 'Message'
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}
console.sendAndWait 'Message'
Creating an asynchronous service
import static groovyx.gpars.actor.Actors.*final def decryptor = actor {
loop {
react {String message->
reply message.reverse()
}
}
}def console = actor {
decryptor.send 'lellarap si yvoorG'
react {
println 'Decrypted message: ' + it
}
}console.join()
As you can see, you create new actors with the actor() method passing in the actor's body as a closure parameter. Inside the actor's body you can use loop() to iterate, react() to receive messages and reply() to send a message to the actor, which has sent the currently processed message. When the decryptor actor doesn't find a message in its message queue, the react() method gives up the thread and returns it back to the thread pool for other actors to pick up. Only after a new message arrives to the actor's message queue, the closure of the react() method gets scheduled for processing with the pool. Event-based actors internally simulate continuations - actor's work is split into sequentially run chunks, which get invoked once a message is available in the inbox. Each chunk for a single actor can be performed by different thread from the thread pool.Groovy flexible syntax with closures allows our library to offer multiple ways to define actors. For instance, here's an example of an actor that waits for up to 30 seconds to receive a message, prints it out and terminates. Actors allow time DSL defined by org.codehaus.groovy.runtime.TimeCategory class to be used for timeout specification to the react() method.import static groovyx.gpars.actor.Actors.*def me = actor {
delegate.metaClass.onTimeout = {->friend.send('I see, busy as usual. Never mind.')} friend.send('Hi')
react(10.seconds) {
//continue conversation
}
}me.join()
Notice the possibility to use Groovy meta-programming to define actor's lifecycle notification methods (e.g. onTimeout() ) dynamically.Actors guarantee thread-safety for non-thread-safe code
Actors guarantee that always at most one thread processes the actor's body at a time and also under the covers the memory gets synchronized
each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort .class MyCounterActor extends AbstractPooledActor {
private Integer counter = 0 protected void act() {
loop {
react {
counter++
}
}
}
}
Ideally actor's code should never be invoked directly from outside so all the code of the actor class can only be executed by the thread
handling the last received message and so all the actor's code is implicitly thread-safe .
If any of the actor's methods is allowed to be called by other objects directly, the thread-safety guarantee for the actor's code and state are no longer valid .Simple calculator
A little bit more realistic example of an event-driven actor that receives two numeric messages, sums them up and sends the result to the console actor.
import static groovyx.gpars.actor.Actors.*//not necessary, just showing that a single-threaded pool can still handle multiple actors
defaultActorPGroup.resize 1final def console = actor {
loop {
react {
println 'Result: ' + it
}
}
}final def calculator = actor {
react {a ->
react {b ->
console.send(a + b)
}
}
}calculator.send 2
calculator.send 3calculator.join()
You can group reception of multiple messages in a single react() call.final def calculator = actor {
react {a, b ->
console.send(a + b)
}
}
Notice that event-driven actors require special care regarding the react() method. Since event_driven actors need to split the code into independent chunks assignable to different threads sequentially and continuations are not natively supported on JVM, the chunks are created artificially with tasks and exceptions. As a result the react() and loop() methods never return normally and actors' code must be structured accordingly. Again, this is in line with what Scala actors do.Concurrent Merge Sort Example
For comparison I'm also including a more involved example performing a concurrent merge sort of a list of integers using actors. You can see that thanks to flexibility of Groovy we came pretty close to the Scala model, although I still miss Scala pattern matching for message handling.import static groovyx.gpars.actor.Actors.*Closure createMessageHandler(def parentActor) {
return {
react {List<Integer> message ->
assert message != null
switch (message.size()) {
case 0..1:
parentActor.send(message)
break
case 2:
if (message[0] <= message[1]) parentActor.send(message)
else parentActor.send(message[-1..0])
break
default:
def splitList = split(message) def child1 = actor(createMessageHandler(delegate))
def child2 = actor(createMessageHandler(delegate))
child1.send(splitList[0])
child2.send(splitList[1]) react {message1 ->
react {message2 ->
parentActor.send merge(message1, message2)
}
}
}
}
}
}def console = new DefaultPGroup(1).actor {
react {
println "Sorted array:t${it}"
System.exit 0
}
}def sorter = actor(createMessageHandler(console))
sorter.send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3])
console.join()
Since actors reuse threads from a pool, the script will work with virtually any size of a thread pool, no matter how many actors are created along the way.For brevity I didn't include the two helper methods split() and merge() in the code snippet. You can find them below.
def split(List<Integer> list) {
int listSize = list.size()
int middleIndex = listSize / 2
def list1 = list[0..<middleIndex]
def list2 = list[middleIndex..listSize - 1]
return [list1, list2]
}List<Integer> merge(List<Integer> a, List<Integer> b) {
int i = 0, j = 0
final int newSize = a.size() + b.size()
List<Integer> result = new ArrayList<Integer>(newSize) while ((i < a.size()) && (j < b.size())) {
if (a[i] <= b[j]) result << a[i++]
else result << b[j++]
} if (i < a.size()) result.addAll(a[i..-1])
else result.addAll(b[j..-1])
return result
}
Actor lifecycle methods
Each Actor can define lifecycle observing methods, which will be called whenever a certain lifecycle event occurs.
- afterStop(List undeliveredMessages) - called right after the actor is stopped, passing in all the unprocessed messages from the queue.
- onInterrupt(InterruptedException e) - called when the actor's thread gets interrupted. Thread interruption will result in the stopping the actor in any case.
- onTimeout() - called when no messages are sent to the actor within the timeout specified for the currently blocking react method. Timeout will result in stopping the actor.
- onException(Throwable e) - called when an exception occurs in the actor's event handler. Actor will stop after return from this method.
You can either define the methods statically in your Actor class or add them dynamically to the actor's metaclass:
def myActor = actor {
delegate.metaClass.onException = {
log.error('Exception occurred', it)
}…
}
Pool management
_Actors_ can be organized into groups and as a default there's always an application-wide pooled actor group available. And just like the Actors abstract factory can be used to create actors in the default group, custom groups can be used as abstract factories to create new actors instances belonging to these groups.def myGroup = new DefaultPGroup()def actor1 = myGroup.actor {
…
}def actor2 = myGroup.actor {
…
}
The actors belonging to the same group share the underlying thread pool of that group. The pool by default contains n + 1 threads, where n stands for the number of CPUs detected by the JVM. The pool size can be set explicitly either by setting the gpars.poolsize system property or individually for each actor group by specifying the appropriate constructor parameter.def myGroup = new DefaultPGroup(10) //the pool will contain 10 threads
The thread pool can be manipulated through the appropriate DefaultPGroup class, which delegates to the Pool interface of the thread pool. For example, the resize() method allows you to change the pool size any time and the resetDefaultSize() sets it back to the default value. The shutdown() method can be called when you need to safely finish all tasks, destroy the pool and stop all the threads in order to exit JVM in an organized manner.… (n+1 threads in the default pool after startup)Actors.defaultActorPGroup.resize 1 //use one-thread pool… (1 thread in the pool)Actors.defaultActorPGroup.resetDefaultSize()… (n+1 threads in the pool)Actors.defaultActorPGroup.shutdown()
As an alternative to the DefaultPGroup_, which creates a pool of daemon threads, the _NonDaemonPGroup class can be used when non-daemon threads are required.def daemonGroup = new DefaultPGroup()def actor1 = daemonGroup.actor {
…
}def nonDaemonGroup = new NonDaemonPGroup()def actor2 = nonDaemonGroup.actor {
…
}class MyActor {
def MyActor() {
this.actorGroup = nonDaemonGroup
} void act() {...}
}
Actors belonging to the same group share the underlying thread pool. With pooled actor groups you can split your actors to leverage multiple thread pools of different sizes and so assign resources to different components of your system and tune their performance.def coreActors = new NonDaemonPGroup(5) //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1) //1 daemon thread pooldef priceCalculator = coreActors.actor {
…
}def paymentProcessor = coreActors.actor {
…
}def emailNotifier = helperActors.actor {
…
}def cleanupActor = helperActors.actor {
…
}//increase size of the core actor group
coreActors.resize 6//shutdown the group's pool once you no longer need the group to release resources
helperActors.shutdown()
Do not forget to shutdown custom pooled actor groups, once you no longer need them and their actors, to preserve system resources.Common trap: App terminates while actors do not receive messages
Most likely you're using daemon threads and pools, which is the default setting, and your main thread finishes. Calling actor.join() on any, some or all of your actors would block the main thread until the actor terminates and thus keep all your actors running.
Alternatively use instances of NonDaemonPGroup and assign some of your actors to these groups.
def nonDaemonGroup = new NonDaemonPGroup()
def myActor = nonDaemonGroup.actor {...}
alternatively
def nonDaemonGroup = new NonDaemonPGroup()class MyActor extends AbstractPooledActor {
def MyActor() {
this.actorGroup = nonDaemonGroup
} void act() {...}
}def myActor = new MyActor()
Dynamic Dispatch Actor
The DynamicDispatchActor class is a pooled actor allowing for an alternative structure of the message handling code. In general DynamicDispatchActor repeatedly scans for messages and dispatches arrived messages to one of the onMessage(message) methods defined on the actor. The DynamicDispatchActor leverages the Groovy dynamic method dispatch mechanism under the covers.import groovyx.gpars.actor.DynamicDispatchActorfinal class MyActor extends DynamicDispatchActor { void onMessage(String message) {
println 'Received string'
} void onMessage(Integer message) {
println 'Received integer'
} void onMessage(Object message) {
println 'Received object'
} void onMessage(List message) {
println 'Received list'
stop()
}
}final def actor = new MyActor().start()actor 1
actor ''
actor 1.0
actor(new ArrayList())actor.join()
In some scenarios, typically when no implicit conversation-history-dependent state needs to be preserved for the actor, the dynamic dispatch code structure may be more intuitive than the traditional one using nested loop and react statements.The DynamicDispatchActor class also provides a handy facility to add message handlers dynamically at actor construction time using the when handlers:final Actor actor = new DynamicDispatchActor({
when {String msg -> println 'A String'; reply 'Thanks'}
when {Double msg -> println 'A Double'; reply 'Thanks'}
when {msg -> println 'A something ...'; reply 'What was that?'}
})
actor.start()
Obviously the two approaches can be combined:
final class MyActor extends DynamicDispatchActor { def MyActor(final closure) { super(closure); } void onMessage(String message) {
println 'Received string'
} void onMessage(Integer message) {
println 'Received integer'
} void onMessage(Object message) {
println 'Received object'
} void onMessage(List message) {
println 'Received list'
stop()
}
}final def actor = new MyActor({
when {BigDecimal num -> println 'Received BigDecimal'}
if (needHandleFloats) when {Float num -> println 'Got a float'}
}).start()
Reactive Actor
The ReactiveActor class, constructed typically by calling Actors.reactor() or _DefaultPGroup.reactor()_, allow for more event-driven like approach. When a reactive actor receives a message, the supplied block of code, which makes up the reactive actor's body, is run with the message as a parameter. The result returned from the code is sent in reply.import groovyx.gpars.group.DefaultPGroupfinal def group = new DefaultPGroup()final def doubler = group.reactor {
2 * it
}.start()group.actor {
println 'Double of 10 = ' + doubler.sendAndWait(10)
}.start()group.actor {
println 'Double of 20 = ' + doubler.sendAndWait(20)
}.start()group.actor {
println 'Double of 30 = ' + doubler.sendAndWait(30)
}.start()for(i in (1..10)) {
println "Double of $i = ${doubler.sendAndWait(i)}"
}doubler.stop()
doubler.join()
Here's an example of an actor, which submits a batch of numbers to a ReactiveActor for processing and then prints the results gradually as they arrive.import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actorsfinal def doubler = Actors.reactor {
2 * it
}.start()Actor actor = Actors.actor {
(1..10).each {doubler << it}
int i = 0
loop {
i += 1
if (i > 10) stop()
else {
react {message ->
println "Double of $i = $message"
}
}
}
}.start()actor.join()
doubler.stop()
doubler.join()
Essentially reactive actors provide a convenience shortcut for an actor that would wait for messages in a loop, process them and send back the result. This is schematically how the reactive actor looks inside:public class ReactiveActor extends AbstractPooledActor {
Closure body void act() {
loop {
react {message ->
reply body(message)
}
}
}
}
Structuring actor's code
When extending the AbstractPooledActor class, you can call any actor's methods from within the act() method and use the react() or loop() methods in them.
class MyActor extends AbstractPooledActor { protected void act() {
handleA()
} private void handleA() {
react {a ->
handleB(a)
}
} private void handleB(int a) {
react {b ->
println a + b
reply a + b
}
}
}
Bear in mind that the methods handleA() and handleB() in all our examples never return, since they call react()_, which itself never returns.Alternatively, when using the _actor() factory method, you can add event-handling code through the meta class as closures.
Actor actor2 = actor {
delegate.metaClass {
handleA = {->
react {a ->
handleB(a)
}
} handleB = {a ->
react {b ->
println a + b
reply a + b
}
}
} handleA()
}
Closures, which have the actor set as their delegate can also be used to structure event-handling code.Closure handleB = {a ->
react {b ->
println a + b
reply a + b
}
}Closure handleA = {->
react {a ->
handleB(a)
}
}Actor actor3 = actor {
handleA.delegate = delegate
handleB.delegate = delegate handleA()
}
Event-driven loops
When coding event-driven actors you have to have in mind that calls to react() and loop() methods never return. This becomes a bit of a challenge once you try to implement any types of loops in your actors.
On the other hand, if you leverage the fact that react() never returns, you may call methods recursively without fear to fill up the stack. Look at the examples below, which respectively use the three described techniques for structuring actor's code.A subclass of AbstractPooledActor class MyLoopActor extends AbstractPooledActor { protected void act() {
outerLoop()
} private void outerLoop() {
react {a ->
println 'Outer: ' + a
if (a!=0) innerLoop()
else println 'Done'
}
} private void innerLoop() {
react {b ->
println 'Inner ' + b
if (b == 0) outerLoop()
else innerLoop()
}
}
}
Enhancing the actor's metaClassActor actor = actor {
outerLoop()
}actor.metaClass {
outerLoop = {->
react {a ->
println 'Outer: ' + a
if (a!=0) innerLoop()
else println 'Done'
}
} innerLoop = {->
react {b ->
println 'Inner ' + b
if (b==0) outerLoop()
else innerLoop()
}
}
}
Using Groovy closuresClosure innerLoopClosure outerLoop = {->
react {a ->
println 'Outer: ' + a
if (a!=0) innerLoop()
else println 'Done'
}
}innerLoop = {->
react {b ->
println 'Inner ' + b
if (b==0) outerLoop()
else innerLoop()
}
}Actor actor = actor {
outerLoop()
}
outerLoop.delegate = actor
innerLoop.delegate = actor
Plus don't forget about the possibility to use the actor's loop() method to create a loop that never stops before the actor terminates.class MyLoopActor extends AbstractPooledActor { protected void act() {
loop {
outerLoop()
}
} private void outerLoop() {
react {a ->
println 'Outer: ' + a
if (a!=0) innerLoop()
else println 'Done for now, but will loop again'
}
} private void innerLoop() {
react {b ->
println 'Inner ' + b
if (b == 0) outerLoop()
else innerLoop()
}
}
}
A few examples on Actors use
Examples
- Sleeping Barber
- Dining Philosophers
- Word Sort
- Load Balancer
Sleeping Barber
Problem descriptionimport groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.AbstractPooledActor
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.Actorfinal def group = new DefaultPGroup()final def barber = group.actor {
final def random = new Random()
loop {
react {message ->
switch (message) {
case Enter:
message.customer.send new Start()
println "Barber: Processing customer ${message.customer.name}"
doTheWork(random)
message.customer.send new Done()
message.reply new Next()
break
case Wait:
println "Barber: No customers. Going to have a sleep"
break
}
}
}
}.start()private def doTheWork(Random random) {
Thread.sleep(random.nextInt(10) * 1000)
}final Actor waitingRoomwaitingRoom = group.actor {
final int capacity = 5
final List<Customer> waitingCustomers = []
boolean barberAsleep = true loop {
react {message ->
switch (message) {
case Enter:
if (waitingCustomers.size() == capacity) {
reply new Full()
} else {
waitingCustomers << message.customer
if (barberAsleep) {
assert waitingCustomers.size() == 1
barberAsleep = false
waitingRoom.send new Next()
}
else reply new Wait()
}
break
case Next:
if (waitingCustomers.size()>0) {
def customer = waitingCustomers.remove(0)
barber.send new Enter(customer:customer)
} else {
barber.send new Wait()
barberAsleep = true
}
}
}
}}.start()class Customer extends AbstractPooledActor {
String name
Actor localBarbers void act() {
localBarbers << new Enter(customer:this)
loop {
react {message ->
switch (message) {
case Full:
println "Customer: $name: The waiting room is full. I am leaving."
stop()
break
case Wait:
println "Customer: $name: I will wait."
break
case Start:
println "Customer: $name: I am now being served."
break
case Done:
println "Customer: $name: I have been served."
break }
}
}
}
}class Enter { Customer customer }
class Full {}
class Wait {}
class Next {}
class Start {}
class Done {}new Customer(name:'Joe', localBarbers:waitingRoom).start()
new Customer(name:'Dave', localBarbers:waitingRoom).start()
new Customer(name:'Alice', localBarbers:waitingRoom).start()System.in.read()
Dining Philosophers
Problem descriptionimport groovyx.gpars.actor.AbstractPooledActor
import groovyx.gpars.actor.ActorsActors.defaultActorPGroup.resize 5final class Philosopher extends AbstractPooledActor {
private Random random = new Random() String name
def forks = [] void act() {
assert 2 == forks.size()
loop {
think()
forks*.send new Take()
react {a ->
react {b ->
if ([a, b].any {Rejected.isCase it}) {
println "$name: tOops, can't get my forks! Giving up."
[a, b].find {Accepted.isCase it}?.reply new Finished()
} else {
eat()
reply new Finished()
}
}
}
}
} void think() {
println "$name: tI'm thinking"
Thread.sleep random.nextInt(5000)
println "$name: tI'm done thinking"
} void eat() {
println "$name: tI'm EATING"
Thread.sleep random.nextInt(2000)
println "$name: tI'm done EATING"
}
}final class Fork extends AbstractPooledActor { String name
boolean available = true void act() {
loop {
react {message ->
switch (message) {
case Take:
if (available) {
available = false
reply new Accepted()
} else reply new Rejected()
break
case Finished:
assert !available
available = true
break
default: throw new IllegalStateException("Cannot process the message: $message")
}
}
}
}
}final class Take {}
final class Accepted {}
final class Rejected {}
final class Finished {}def forks = [
new Fork(name:'Fork 1'),
new Fork(name:'Fork 2'),
new Fork(name:'Fork 3'),
new Fork(name:'Fork 4'),
new Fork(name:'Fork 5')
]def philosophers = [
new Philosopher(name:'Joe', forks:[forks[0], forks[1]]),
new Philosopher(name:'Dave', forks:[forks[1], forks[2]]),
new Philosopher(name:'Alice', forks:[forks[2], forks[3]]),
new Philosopher(name:'James', forks:[forks[3], forks[4]]),
new Philosopher(name:'Phil', forks:[forks[4], forks[0]]),
]forks*.start()
philosophers*.start()System.in.read()
Word sort
Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of _WordSortActors_, splits among them the files to sort words in and collects the results.Inspired by [Scala Concurrency blog post by Michael Galpin//Messages
private final class FileToSort { String fileName }
private final class SortResult { String fileName; List<String> words }//Worker actor
final class WordSortActor extends AbstractPooledActor { private List<String> sortedWords(String fileName) {
parseFile(fileName).sort {it.toLowerCase()}
} private List<String> parseFile(String fileName) {
List<String> words = []
new File(fileName).splitEachLine(' ') {words.addAll(it)}
return words
} void act() {
loop {
react {message ->
switch (message) {
case FileToSort:
println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}"
reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName))
}
}
}
}
}//Master actor
final class SortMaster extends AbstractPooledActor { String docRoot = '/'
int numActors = 1 List<List<String>> sorted = []
private CountDownLatch startupLatch = new CountDownLatch(1)
private CountDownLatch doneLatch private void beginSorting() {
int cnt = sendTasksToWorkers()
doneLatch = new CountDownLatch(cnt)
} private List createWorkers() {
return (1..numActors).collect {new WordSortActor().start()}
} private int sendTasksToWorkers() {
List<PooledActor> workers = createWorkers()
int cnt = 0
new File(docRoot).eachFile {
workers[cnt % numActors] << new FileToSort(fileName: it)
cnt += 1
}
return cnt
} public void waitUntilDone() {
startupLatch.await()
doneLatch.await()
} void act() {
beginSorting()
startupLatch.countDown()
loop {
react {
switch (it) {
case SortResult:
sorted << it.words
doneLatch.countDown()
println "Received results for file=${it.fileName}"
}
}
}
}
}//start the actors to sort words
def master = new SortMaster(docRoot: 'C:/dev/TeamCity/logs/', numActors: 5).start()
master.waitUntilDone()
println 'Done'
println master.sorted
Load Balancer
Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.If the load balancer doesn't have any tasks available in the task queue, the worker is stopped.
If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.AbstractPooledActor/**
* Demonstrates work balancing among adaptable set of workers.
* The load balancer receives tasks and queues them in a temporary task queue.
* When a worker finishes his assignment, it asks the load balancer for a new task.
* If the load balancer doesn't have any tasks available in the task queue, the worker is stopped.
* If the number of tasks in the task queue exceeds certain limit, a new worker is created
* to increase size of the worker pool.
*/final class LoadBalancer extends AbstractPooledActor {
int workers = 0
List taskQueue = []
private static final QUEUE_SIZE_TRIGGER = 10 void act() {
loop {
def message = receive()
switch (message) {
case NeedMoreWork:
if (taskQueue.size() == 0) {
println 'No more tasks in the task queue. Terminating the worker.'
message.reply DemoWorker.EXIT
workers -= 1
} else message.reply taskQueue.remove(0)
break
case WorkToDo:
taskQueue << message
if ((workers == 0) || (taskQueue.size() >= QUEUE_SIZE_TRIGGER)) {
println 'Need more workers. Starting one.'
workers += 1
new DemoWorker(this).start()
}
}
println "Active workers=${workers}tTasks in queue=${taskQueue.size()}"
}
}
}final class DemoWorker extends AbstractPooledActor {
final static Object EXIT = new Object()
private static final Random random = new Random() Actor balancer def DemoWorker(balancer) {
this.balancer = balancer
} void act() {
loop {
this.balancer << new NeedMoreWork()
react {
switch (it) {
case WorkToDo:
processMessage(it)
break
case EXIT: stop()
}
}
} } private void processMessage(message) {
synchronized (random) {
Thread.sleep random.nextInt(5000)
}
}
}
final class WorkToDo {}
final class NeedMoreWork {}final Actor balancer = new LoadBalancer().start()//produce tasks
for (i in 1..20) {
Thread.sleep 100
balancer << new WorkToDo()
}//produce tasks in a parallel thread
Thread.start {
for (i in 1..10) {
Thread.sleep 1000
balancer << new WorkToDo()
}
}Thread.sleep 35000 //let the queues get empty
balancer << new WorkToDo()
balancer << new WorkToDo()
Thread.sleep 10000