(Quick Reference)

5.5 Classic Examples - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.2.0

5.5 Classic Examples

A few examples on Actors use

Examples

  • The Sieve of Eratosthenes
  • Sleeping Barber
  • Dining Philosophers
  • Word Sort
  • Load Balancer

The Sieve of Eratosthenes

Problem description

import groovyx.gpars.actor.DynamicDispatchActor

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using actors * * In principle, the algorithm consists of concurrently run chained filters, * each of which detects whether the current number can be divided by a single prime number. * (generate nums 1, 2, 3, 4, 5, ...) -> (filter by mod 2) -> (filter by mod 3) -> (filter by mod 5) -> (filter by mod 7) -> (filter by mod 11) -> (caution! Primes falling out here) * The chain is built (grows) on the fly, whenever a new prime is found. */

int requestedPrimeNumberBoundary = 1000

final def firstFilter = new FilterActor(2).start()

/** * Generating candidate numbers and sending them to the actor chain */ (2..requestedPrimeNumberBoundary).each { firstFilter it } firstFilter.sendAndWait 'Poison'

/** * Filter out numbers that can be divided by a single prime number */ final class FilterActor extends DynamicDispatchActor { private final int myPrime private def follower

def FilterActor(final myPrime) { this.myPrime = myPrime; }

/** * Try to divide the received number with the prime. If the number cannot be divided, send it along the chain. * If there's no-one to send it to, I'm the last in the chain, the number is a prime and so I will create and chain * a new actor responsible for filtering by this newly found prime number. */ def onMessage(int value) { if (value % myPrime != 0) { if (follower) follower value else { println "Found $value" follower = new FilterActor(value).start() } } }

/** * Stop the actor on poisson reception */ def onMessage(def poisson) { if (follower) { def sender = sender follower.sendAndContinue(poisson, {this.stop(); sender?.send('Done')}) //Pass the poisson along and stop after a reply } else { //I am the last in the chain stop() reply 'Done' } } }

Sleeping Barber

Problem description

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.Actor

final def group = new DefaultPGroup()

final def barber = group.actor { final def random = new Random() loop { react {message -> switch (message) { case Enter: message.customer.send new Start() println "Barber: Processing customer ${message.customer.name}" doTheWork(random) message.customer.send new Done() reply new Next() break case Wait: println "Barber: No customers. Going to have a sleep" break } } } }

private def doTheWork(Random random) { Thread.sleep(random.nextInt(10) * 1000) }

final Actor waitingRoom

waitingRoom = group.actor { final int capacity = 5 final List<Customer> waitingCustomers = [] boolean barberAsleep = true

loop { react {message -> switch (message) { case Enter: if (waitingCustomers.size() == capacity) { reply new Full() } else { waitingCustomers << message.customer if (barberAsleep) { assert waitingCustomers.size() == 1 barberAsleep = false waitingRoom.send new Next() } else reply new Wait() } break case Next: if (waitingCustomers.size()>0) { def customer = waitingCustomers.remove(0) barber.send new Enter(customer:customer) } else { barber.send new Wait() barberAsleep = true } } } }

}

class Customer extends DefaultActor { String name Actor localBarbers

void act() { localBarbers << new Enter(customer:this) loop { react {message -> switch (message) { case Full: println "Customer: $name: The waiting room is full. I am leaving." stop() break case Wait: println "Customer: $name: I will wait." break case Start: println "Customer: $name: I am now being served." break case Done: println "Customer: $name: I have been served." stop(); break

} } } } }

class Enter { Customer customer } class Full {} class Wait {} class Next {} class Start {} class Done {}

def customers = [] customers << new Customer(name:'Joe', localBarbers:waitingRoom).start() customers << new Customer(name:'Dave', localBarbers:waitingRoom).start() customers << new Customer(name:'Alice', localBarbers:waitingRoom).start()

sleep 15000 customers << new Customer(name: 'James', localBarbers: waitingRoom).start() sleep 5000 customers*.join() barber.stop() waitingRoom.stop()

Dining Philosophers

Problem description

import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actors

Actors.defaultActorPGroup.resize 5

final class Philosopher extends DefaultActor { private Random random = new Random()

String name def forks = []

void act() { assert 2 == forks.size() loop { think() forks*.send new Take() def messages = [] react {a -> messages << [a, sender] react {b -> messages << [b, sender] if ([a, b].any {Rejected.isCase it}) { println "$name: tOops, can't get my forks! Giving up." final def accepted = messages.find {Accepted.isCase it[0]} if (accepted!=null) accepted[1].send new Finished() } else { eat() reply new Finished() } } } } }

void think() { println "$name: tI'm thinking" Thread.sleep random.nextInt(5000) println "$name: tI'm done thinking" }

void eat() { println "$name: tI'm EATING" Thread.sleep random.nextInt(2000) println "$name: tI'm done EATING" } }

final class Fork extends DefaultActor {

String name boolean available = true

void act() { loop { react {message -> switch (message) { case Take: if (available) { available = false reply new Accepted() } else reply new Rejected() break case Finished: assert !available available = true break default: throw new IllegalStateException("Cannot process the message: $message") } } } } }

final class Take {} final class Accepted {} final class Rejected {} final class Finished {}

def forks = [ new Fork(name:'Fork 1'), new Fork(name:'Fork 2'), new Fork(name:'Fork 3'), new Fork(name:'Fork 4'), new Fork(name:'Fork 5') ]

def philosophers = [ new Philosopher(name:'Joe', forks:[forks[0], forks[1]]), new Philosopher(name:'Dave', forks:[forks[1], forks[2]]), new Philosopher(name:'Alice', forks:[forks[2], forks[3]]), new Philosopher(name:'James', forks:[forks[3], forks[4]]), new Philosopher(name:'Phil', forks:[forks[4], forks[0]]), ]

forks*.start() philosophers*.start()

sleep 10000 forks*.stop() philosophers*.stop()

Word sort

Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of WordSortActors , splits among them the files to sort words in and collects the results.

Inspired by Scala Concurrency blog post by Michael Galpin

//Messages
private final class FileToSort { String fileName }
private final class SortResult { String fileName; List<String> words }

//Worker actor class WordSortActor extends DefaultActor {

private List<String> sortedWords(String fileName) { parseFile(fileName).sort {it.toLowerCase()} }

private List<String> parseFile(String fileName) { List<String> words = [] new File(fileName).splitEachLine(' ') {words.addAll(it)} return words }

void act() { loop { react {message -> switch (message) { case FileToSort: println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}" reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName)) } } } } }

//Master actor final class SortMaster extends DefaultActor {

String docRoot = '/' int numActors = 1

List<List<String>> sorted = [] private CountDownLatch startupLatch = new CountDownLatch(1) private CountDownLatch doneLatch

private void beginSorting() { int cnt = sendTasksToWorkers() doneLatch = new CountDownLatch(cnt) }

private List createWorkers() { return (1..numActors).collect {new WordSortActor().start()} }

private int sendTasksToWorkers() { List<Actor> workers = createWorkers() int cnt = 0 new File(docRoot).eachFile { workers[cnt % numActors] << new FileToSort(fileName: it) cnt += 1 } return cnt }

public void waitUntilDone() { startupLatch.await() doneLatch.await() }

void act() { beginSorting() startupLatch.countDown() loop { react { switch (it) { case SortResult: sorted << it.words doneLatch.countDown() println "Received results for file=${it.fileName}" } } } } }

//start the actors to sort words def master = new SortMaster(docRoot: 'c:/tmp/Logs/', numActors: 5).start() master.waitUntilDone() println 'Done'

File file = new File("c:/tmp/Logs/sorted_words.txt") file.withPrintWriter { printer -> master.sorted.each { printer.println it } }

Load Balancer

Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.

If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.DefaultActor

/** * Demonstrates work balancing among adaptable set of workers. * The load balancer receives tasks and queues them in a temporary task queue. * When a worker finishes his assignment, it asks the load balancer for a new task. * If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. * If the number of tasks in the task queue exceeds certain limit, a new worker is created * to increase size of the worker pool. */

final class LoadBalancer extends DefaultActor { int workers = 0 List taskQueue = [] private static final QUEUE_SIZE_TRIGGER = 10

void act() { loop { react { message -> switch (message) { case NeedMoreWork: if (taskQueue.size() == 0) { println 'No more tasks in the task queue. Terminating the worker.' reply DemoWorker.EXIT workers -= 1 } else reply taskQueue.remove(0) break case WorkToDo: taskQueue << message if ((workers == 0) || (taskQueue.size() >= QUEUE_SIZE_TRIGGER)) { println 'Need more workers. Starting one.' workers += 1 new DemoWorker(this).start() } } println "Active workers=${workers}tTasks in queue=${taskQueue.size()}" } } } }

final class DemoWorker extends DefaultActor { final static Object EXIT = new Object() private static final Random random = new Random()

Actor balancer

def DemoWorker(balancer) { this.balancer = balancer }

void act() { loop { this.balancer << new NeedMoreWork() react { switch (it) { case WorkToDo: processMessage(it) break case EXIT: terminate() } } }

}

private void processMessage(message) { synchronized (random) { Thread.sleep random.nextInt(5000) } } } final class WorkToDo {} final class NeedMoreWork {}

final Actor balancer = new LoadBalancer().start()

//produce tasks for (i in 1..20) { Thread.sleep 100 balancer << new WorkToDo() }

//produce tasks in a parallel thread Thread.start { for (i in 1..10) { Thread.sleep 1000 balancer << new WorkToDo() } }

Thread.sleep 35000 //let the queues get empty balancer << new WorkToDo() balancer << new WorkToDo() Thread.sleep 10000

balancer.stop() balancer.join()