(Quick Reference)

3.2 Map-Reduce - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.2.0

3.2 Map-Reduce

The Parallel Collection Map/Reduce DSL gives GPars a more functional flavor. In general, the Map/Reduce DSL may be used for the same purpose as the xxxParallel() family methods and has very similar semantics. On the other hand, Map/Reduce can perform considerably faster, if you need to chain multiple methods to process a single collection in multiple steps:
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
            .map {it.toURL().text.toUpperCase()}
            .filter {it.contains('GROOVY')}
            .map{it.split()}
            .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
            .sum()

The xxxParallel() methods have to follow the contract of their non-parallel peers. So a collectParallel() method must return a legal collection of items, which you can again treat as a Groovy collection. Internally the parallel collect method builds an efficient parallel structure, called parallel array, performs the required operation concurrently and before returning destroys the Parallel Array building the collection of results to return to you. A potential call to let say findAllParallel() on the resulting collection would repeat the whole process of construction and destruction of a Parallel Array instance under the covers.

With Map/Reduce you turn your collection into a Parallel Array and back only once. The Map/Reduce family of methods do not return Groovy collections, but are free to pass along the internal Parallel Arrays directly. Invoking the parallel property on a collection will build a Parallel Array for the collection and return a thin wrapper around the Parallel Array instance. Then you can chain all required methods like:

  • map()
  • reduce()
  • filter()
  • size()
  • sum()
  • min()
  • max()
  • sort()
  • groupBy()
  • combine()

Returning back to a plain Groovy collection instance is always just a matter of retrieving the collection property.

def myNumbers = (1..1000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collection

Avoid side-effects in functions

Once again we need to warn you. To avoid nasty surprises, please, keep your closures, which you pass to the Map/Reduce functions, stateless and clean from side-effects.

Availability

This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .

Classical Example

A classical example, inspired by http://github.com/thevery, counting occurrences of words in a string:

import static groovyx.gpars.GParsPool.withPool

def words = "This is just a plain text to count words in" print count(words)

def count(arg) { withPool { return arg.parallel .map{[it, 1]} .groupBy{it[0]}.getParallel() .map {it.value=it.value.size();it} .sort{-it.value}.collection } }

The same example, now implemented the more general combine operation:

def words = "This is just a plain text to count words in"
print count(words)

def count(arg) { withPool { return arg.parallel .map{[it, 1]} .combine(0) {sum, value -> sum + value}.getParallel() .sort{-it.value}.collection } }

Combine

The combine operation expects on its input a list of tuples (two-element lists) considered to be key-value pairs (such as [key1, value1, key2, value2, key1, value3, key3, value4 … ] ) with potentially repeating keys. When invoked, combine merges the values for identical keys using the provided accumulator function and produces a map mapping the original (unique) keys to their accumulated values. E.g. [a, b, c, d, a, e, c, f] will be combined into a : b+e, c : d+f, while the '+' operation on the values needs to be provided by the user as the accumulation closure.

The accumulation function argument needs to specify a function to use for combining (accumulating) the values belonging to the same key. An initial accumulator value needs to be provided as well. Since the combine method processes items in parallel, the initial accumulator value will be reused multiple times. Thus the provided value must allow for reuse. It should be either a cloneable or immutable value or a closure returning a fresh initial accumulator each time requested. Good combinations of accumulator functions and reusable initial values include:

accumulator = {List acc, value -> acc << value} initialValue = []
accumulator = {List acc, value -> acc << value} initialValue = {-> []}
accumulator = {int sum, int value -> acc + value} initialValue = 0
accumulator = {int sum, int value -> sum + value} initialValue = {-> 0}
accumulator = {ShoppingCart cart, Item value -> cart.addItem(value)} initialValue = {-> new ShoppingCart()}

The return type is a map. E.g. ['he', 1, 'she', 2, 'he', 2, 'me', 1, 'she, 5, 'he', 1 with the initial value provided a 0 will be combined into 'he' : 4, 'she' : 7, 'he', : 2, 'me' : 1

The keys will be mutually compared using their equals and hashCode methods. Consider using @Canonical or @EqualsAndHashCode to annotate classes that you use as keys. Just like with all hash maps in Groovy, be sure you're using a String not a GString as a key!

For more involved scenarios when you combine() complex objects, a good strategy here is to have a class that can be used as a key for the common use cases and apply different keys for uncommon cases.

import groovy.transform.ToString
import groovy.transform.TupleConstructor

import static groovyx.gpars.GParsPool.withPool

TupleConstructor ToString class PricedCar implements Cloneable { String model String color Double price

boolean equals(final o) { if (this.is(o)) return true if (getClass() != o.class) return false

final PricedCar pricedCar = (PricedCar) o

if (color != pricedCar.color) return false if (model != pricedCar.model) return false

return true }

int hashCode() { int result result = (model != null ? model.hashCode() : 0) result = 31 * result + (color != null ? color.hashCode() : 0) return result }

@Override protected Object clone() { return super.clone() } }

def cars = [new PricedCar('F550', 'blue', 2342.223), new PricedCar('F550', 'red', 234.234), new PricedCar('Da', 'white', 2222.2), new PricedCar('Da', 'white', 1111.1)]

withPool { //Combine by model def result = cars.parallel.map { [it.model, it] }.combine(new PricedCar('', 'N/A', 0.0)) {sum, value -> sum.model = value.model sum.price += value.price sum }.values()

println result

//Combine by model and color (the PricedCar's equals and hashCode)) result = cars.parallel.map { [it, it] }.combine(new PricedCar('', 'N/A', 0.0)) {sum, value -> sum.model = value.model sum.color = value.color sum.price += value.price sum }.values()

println result }