3.1 Parallel Collections - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.0.0
3.1 Parallel Collections
Dealing with data frequently involves manipulating collections. Lists, arrays, sets, maps, iterators, strings and lot of other data types can be viewed as collections of items. The common pattern to process such collections is to take elements sequentially, one-by-one, and make an action for each of the items in row.Take, for example, the min() function, which is supposed to return the smallest element of a collection. When you call the min() method on a collection of numbers, the caller thread will create an accumulator or so-far-the-smallest-value initialized to the minimum value of the given type, let say to zero. And then the thread will iterate through the elements of the collection and compare them with the value in the accumulator . Once all elements have been processed, the minimum value is stored in the accumulator .This algorithm, however simple, is totally wrong on multi-core hardware. Running the min() function on a dual-core chip can leverage at most 50% of the computing power of the chip. On a quad-core it would be only 25%. Correct, this algorithm effectively wastes 75% of the computing power of the chip.Tree-like structures proved to be more appropriate for parallel processing. The min() function in our example doesn't need to iterate through all the elements in row and compare their values with the accumulator . What it can do instead is relying on the multi-core nature of your hardware. A parallel_min() function could, for example, compare pairs (or tuples of certain size) of neighboring values in the collection and promote the smallest value from the tuple into a next round of comparison. Searching for minimum in different tuples can safely happen in parallel and so tuples in the same round can be processed by different cores at the same time without races or contention among threads.Meet Parallel Arrays
The jsr-166y library brings a very convenient abstraction called Parallel Arrays . GPars leverages the Parallel Arrays implementation in several ways. The GParsPool and GParsExecutorsPool classes provide parallel variants of the common Groovy iteration methods like each() , collect() , findAll() and such.def selfPortraits = images.findAllParallel{it.contains me}.collectParallel {it.resize()}
def smallestSelfPortrait = images.parallel.filter{it.contains me}.map{it.resize()}.min{it.sizeInMB}
3.1.1 GParsPool
Use of GParsPool - the JSR-166y based concurrent collection processorUsage of GParsPool
The GParsPool class enables a ParallelArray-based (from JSR-166y) concurrency DSL for collections and objects.Examples of use://summarize numbers concurrently GParsPool.withPool { final AtomicInteger result = new AtomicInteger(0) [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)} assert 15 == result } //multiply numbers asynchronously GParsPool.withPool { final List result = [1, 2, 3, 4, 5].collectParallel {it * 2} assert ([2, 4, 6, 8, 10].equals(result)) }
//check whether all elements within a collection meet certain criteria GParsPool.withPool(5) {ForkJoinPool pool -> assert [1, 2, 3, 4, 5].everyParallel {it > 0} assert ![1, 2, 3, 4, 5].everyParallel {it > 1} }
withPool(10) {...} withPool(20, exceptionHandler) {...}
withPool { assert [1, 2, 3, 4, 5].everyParallel {it > 0} assert ![1, 2, 3, 4, 5].everyParallel {it > 1} }
- eachParallel()
- eachWithIndexParallel()
- collectParallel()
- collectManyParallel()
- findAllParallel()
- findAnyParallel
- findParallel()
- everyParallel()
- anyParallel()
- grepParallel()
- groupByParallel()
- foldParallel()
- minParallel()
- maxParallel()
- sumParallel()
- splitParallel()
- countParallel()
- foldParallel()
Meta-class enhancer
As an alternative you can use the ParallelEnhancer class to enhance meta-classes of any classes or individual instances with the parallel methods.import groovyx.gpars.ParallelEnhancerdef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ParallelEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }def animals = ['dog', 'ant', 'cat', 'whale']
ParallelEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found')
println (animals.everyParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')
Exception handling
If an exception is thrown while processing any of the passed-in closures, the first exception gets re-thrown from the xxxParallel methods and the algorithm stops as soon as possible.The exception handling mechanism of GParsPool builds on the one built into the Fork/Join framework. Since Fork/Join algorithms are by nature hierarchical, once any part of the algorithm fails, there's usually little benefit from continuing the computation, since some branches of the algorithm will never return a result.Bear in mind that the GParsPool implementation doesn't give any guarantees about its behavior after a first unhandled exception occurs, beyond stopping the algorithm and re-throwing the first detected exception to the caller. This behavior, after all, is consistent with what the traditional sequential iteration methods do.
Transparently parallel collections
On top of adding new xxxParallel() methods, GPars can also let you change the semantics of the original iteration methods. For example, you may be passing a collection into a library method, which will process your collection in a sequential way, let say using the collect() method. By changing the semantics of the collect() method on your collection you can effectively parallelize the library sequential code.GParsPool.withPool { //The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent()) }/** * A function implemented using standard sequential collect() and findAll() methods. */ def selectImportantNames(names) { names.collect {it.toUpperCase()}.findAll{it.size() > 4} }
import static groovyx.gpars.GParsPool.withPooldef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]println 'Sequential: ' list.each { print it + ',' } println()withPool { println 'Sequential: ' list.each { print it + ',' } println() list.makeConcurrent() println 'Concurrent: ' list.each { print it + ',' } println() list.makeSequential() println 'Sequential: ' list.each { print it + ',' } println() }println 'Sequential: ' list.each { print it + ',' } println()
import static groovyx.gpars.GParsPool.withPooldef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]println 'Sequential: ' list.each { print it + ',' } println()withPool { println 'Sequential: ' list.each { print it + ',' } println() list.asConcurrent { println 'Concurrent: ' list.each { print it + ',' } println() } println 'Sequential: ' list.each { print it + ',' } println() }println 'Sequential: ' list.each { print it + ',' } println()
/** * A function implemented using standard sequential collect() and findAll() methods. */ def selectImportantNames(names) { names.collect {it.toUpperCase()}.findAll{it.size() > 4} }def names = ['Joe', 'Alice', 'Dave', 'Jason'] ParallelEnhancer.enhanceInstance(names) //The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(names.makeConcurrent())
import groovyx.gpars.ParallelEnhancerdef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]println 'Sequential: '
list.each { print it + ',' }
println()ParallelEnhancer.enhanceInstance(list)println 'Sequential: '
list.each { print it + ',' }
println()list.asConcurrent {
println 'Concurrent: '
list.each { print it + ',' }
println()}
list.makeSequential()println 'Sequential: '
list.each { print it + ',' }
println()
Avoid side-effects in functions
We have to warn you. Since the closures that are provided to the parallel methods like eachParallel() or collectParallel() may be run in parallel, you have to make sure that each of the closures is written in a thread-safe manner. The closures must hold no internal state, share data nor have side-effects beyond the boundaries the single element that they've been invoked on. Violations of these rules will open the door for race conditions and deadlocks, the most severe enemies of a modern multi-core programmer.Don't do this:def thumbnails = [] images.eachParallel {thumbnails << it.thumbnail} //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!
3.1.2 GParsExecutorsPool
Use of GParsExecutorsPool - the Java Executors' based concurrent collection processorUsage of GParsExecutorsPool
The GParsPool class enables a Java Executors-based concurrency DSL for collections and objects.The GParsExecutorsPool class can be used as a pure-JDK-based collection parallel processor. Unlike the GParsPool class, GParsExecutorsPool doesn't require jsr-166y jar file, but leverages the standard JDK executor services to parallelize closures processing a collections or an object iteratively. It needs to be states, however, that GParsPool performs typically much better than GParsExecutorsPool does.Examples of use://multiply numbers asynchronously GParsExecutorsPool.withPool { Collection<Future> result = [1, 2, 3, 4, 5].collectParallel{it * 10} assert new HashSet([10, 20, 30, 40, 50]) == new HashSet((Collection)result*.get()) } //multiply numbers asynchronously using an asynchronous closure GParsExecutorsPool.withPool { def closure={it * 10} def asyncClosure=closure.async() Collection<Future> result = [1, 2, 3, 4, 5].collect(asyncClosure) assert new HashSet([10, 20, 30, 40, 50]) == new HashSet((Collection)result*.get()) }
//find an element meeting specified criteria
GParsExecutorsPool.withPool(5) {ExecutorService service ->
service.submit({performLongCalculation()} as Runnable)
}
withPool(10) {...} withPool(20, threadFactory) {...}
withPool {
def result = [1, 2, 3, 4, 5].findParallel{Number number -> number > 2}
assert result in [3, 4, 5]
}
- eachParallel()
- eachWithIndexParallel()
- collectParallel()
- findAllParallel()
- findParallel()
- allParallel()
- anyParallel()
- grepParallel()
- groupByParallel()
Meta-class enhancer
As an alternative you can use the GParsExecutorsPoolEnhancer class to enhance meta-classes for any classes or individual instances with asynchronous methods.import groovyx.gpars.GParsExecutorsPoolEnhancerdef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
GParsExecutorsPoolEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }def animals = ['dog', 'ant', 'cat', 'whale']
GParsExecutorsPoolEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found')
println (animals.allParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')
Exception handling
If exceptions are thrown while processing any of the passed-in closures, an instance of AsyncException wrapping all the original exceptions gets re-thrown from the xxxParallel methods.Avoid side-effects in functions
Once again we need to warn you about using closures with side-effects effecting objects beyond the scope of the single currently processed element or closures which keep state. Don't do that! It is dangerous to pass them to any of the xxxParallel() methods.3.1.3 Memoize
The memoize function enables caching of function's return values. Repeated calls to the memoized function with the same argument values will, instead of invoking the calculation encoded in the original function, retrieve the result value from an internal transparent cache. Provided the calculation is considerably slower than retrieving a cached value from the cache, this allows users to trade-off memory for performance. Checkout out the example, where we attempt to scan multiple websites for particular content:The memoize functionality of GPars has been contributed to Groovy in version 1.8 and if you run on Groovy 1.8 or later, it is recommended to use the Groovy functionality. Memoize in GPars is almost identical, except that it searches the memoize caches concurrently using the surrounding thread pool and so may give performance benefits in some scenarios.The GPars memoize functionality has been renamed to avoid future conflicts with the memoize functionality in Groovy. GPars now calls the methods with a preceding letter g , such as gmemoize().
Examples of use
GParsPool.withPool {
def urls = ['http://www.dzone.com', 'http://www.theserverside.com', 'http://www.infoq.com']
Closure download = {url ->
println "Downloading $url"
url.toURL().text.toUpperCase()
}
Closure cachingDownload = download.gmemoize() println 'Groovy sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GROOVY')}
println 'Grails sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRAILS')}
println 'Griffon sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRIFFON')}
println 'Gradle sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRADLE')}
println 'Concurrency sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('CONCURRENCY')}
println 'GPars sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GPARS')}
}
Fibonacci example
A purely functional, recursive implementation, following closely the definition of Fibonacci numbers is exponentially complex:Closure fib = {n -> n > 1 ? call(n - 1) + call(n - 2) : n}
Closure fib fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.gmemoize()
Available variants
memoize
The basic variant, which keeps values in the internal cache for the whole lifetime of the memoized function. Provides the best performance characteristics of all the variants.memoizeAtMost
Allows the user to set a hard limit on number of items cached. Once the limit has been reached, all subsequently added values will eliminate the oldest value from the cache using the LRU (Last Recently Used) strategy.So for our Fibonacci number example, we could safely reduce the cache size to two items:Closure fib fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.memoizeAtMost(2)
- Keep the memory footprint of the cache within defined boundaries
- Preserve desired performance characteristics of the function. Too large caches may take longer to retrieve the cached value than it would have taken to calculate the result directly.