Reductions
The ham-fisted project extends the concept of Clojure's reduce
in a few ways,
taking influence from java streams and Clojure transducers. The most important
way is a formal definition of a parallel reduction (analogous to pmap
for map
).
Most interesting is the 3 argument form of (reduce rfn init coll)
. Problems
exist with the 2 argument form (reduce rfn coll)
as the reduction function -
rfn
's leftmost argument is sometimes a value from the collection and at other
times an accumulated value. Some reductions have the property that the
accumulator is in the set of objects in the collection (such as numeric +
),
these reductions are not the most general. They are a special case of a
reduction where the accumulator may be a different type entirely than the values
in the collection.
Parallelizable Containers
Efficient parallel reductions depend on parallelizable containers.
Java has three types of containers that operate efficiently in parallel.
- Finite random access containers (ex: an array)
- Containers that can provide spliterators (ex: hashtable)
- A concatenation of containers suitable for parallel computation over the parts
These three types of containers we can parallelize; random access containers, maps and sets (or more generally anything with a correct spliterator implementation), and concatenations of sub-containers each of which may not itself have a parallelizable reduction.
Parallelized Reduction
A parallelized reduction works by splitting up elements of the data source. Many reduction contexts operate simultaneous each of which will perform a serial reduction. A separate step merges the results back together. This may be thought of as the "true" map-reduce, but either way it may be useful to compare a parallelized reduction in detail to a serial reduction.
To perform a parallel reduction, four things must be provided:
init-val-fn
- a function to produce initial accumulators for each reduction contextrfn
- a function that takes an accumulator and a value and updates the accumulator --- This is the typical reduction function passed as the first argument to Clojure'sreduce
merge-fn
- a function that takes two accumulators and merges them to produces one result accumulator.coll
- a collection of items to reduce to a single output
Here are the function signatures (Keep in mind ... preduce
:reduce
:: pmap
:map
):
(defn preduce [init-val-fn rfn merge-fn coll] ...)
Notably Java streams have a 'collect' method that takes the same four arguments
where the collection is the this
object:
interface Stream<E> {
<R> R collect(Supplier<R> supplier,
BiConsumer<R,? super T> accumulator,
BiConsumer<R,R> combiner);
}
The parallelizable reduction operates as a a serial reduction if the init-val-fn is called exactly once with no arguments and the entire collection is passed along with rfn to reduce:
(reduce rfn (init-val-fn) coll)
From there preduce
essentially switches on the type of coll and performs one
of four distinct types of reductions:
- serial
- parallel over and index space
- parallel over and spliterator space
- parallel over sub-containers
Map, Filter, Concat Chains
It is common in functional programming to implement data transformations as
chains of map
, filter
, and concat
operations. Analyzing sequences of
these operations is insight with regards to reduction in general and
parallelization of reductions.
The first insight is found in the Clojure transducer pathways and involves collapsing
the reduction function when possible for map and filter applications. Let's start with
a reduction of the form (->> coll (map x) (filter y) (reduce ...))
.
The filter operator can specialize its reduce implementation by producing a new reduction function and reducing over its source collection:
public Object reduce(IFn rfn, Object init) {
return source.reduce(new IFn() {
public Object invoke(Object acc, Object v) {
if(pred.test(v))
return rfn.invoke(acc, v);
else
return acc;
}
}, init);
}
This results in 'collapsing' the reduction allowing the source to perform the
iteration across its elements and simply dynamically creating a slightly more
complex reduction function, rfn
. A similar pathway exists for map
as we can
always delegate up the chain making a slightly more complex reduction function
as long as we are reducing over a single source of data. This optimization
leads to many fewer function calls and intermediate collections when compared
with naive implementations of map
and filter
. Clojure's transducers do this
automatically.
Collapsing the reduction also allows us to parallelize reductions like the initial one stated before as if the filter object has a parallelReduction method that does an identical collapsing pathway then if the source is parallelizable then the reduction itself can still parallelize:
public Object parallelReduction(IFn initValFn, IFn rfn, IFn mergeFn) {
return source.parallelReduction(initValFn, new IFn() {...}, mergeFn);
}
If the source collection itself allows for parallel reduction, then it's
possible to achieve similar 'collapsing' in preduce
. Clojure's transducers do
not have this particular optimization for parallel reduction, but Java streams
do.
Also worth noting, these optimizations are only available if we use the 4
argument form of reduce and if we assume that map
, filter
, and concat
are lazy
and non-caching.
With those assumptions in place it is possible to parallelize a reduction over the entries, keys or values of map using simple primitive composition:
user> (require '[ham-fisted.api :as hamf])
nil
user> (require '[ham-fisted.lazy-noncaching :as lznc])
nil
user> (def data (hamf/immut-map (lznc/map #(vector % %) (range 20000))))
#'user/data
user> (type data)
ham_fisted.PersistentHashMap
user> (hamf/preduce + + + (lznc/map key data))
199990000
Stream-based Reductions
Java streams have a notion of parallel reduction built-in. Their design suffers from two flaws, one minor and one major.
The first minor flaw is that you can ask a stream for a parallel version of itself and it will give you one if possible else return a copy of itself. Unfortunately this only works on the first stream in a pipeline so for instance:
coll.stream().map().filter().parallel().collect();
yields a serial reduction while:
coll.stream().parallel().map().filter().collect();
yields a parallel reduction.
This is unfortunate because it means you must go back in time to get a parallel version of the stream if you want to perform a parallel collection; something that may or may not be easily done at the point in time when you decide you do in fact want to parallel reduction (especially in library code).
The second and more major flaw is that stream-based parallelization does not allow the user to pass in their own fork-join pool at any point. This limits use to the built in pool where it's pad form to park threads or do blocking operations.
reducers.clj And Parallel Folds
Clojure has an alpha namespace that provides a parallel reduction, reducers.clj. The signature for this method is:
(defn fold
"Reduces a collection using a (potentially parallel) reduce-combine
strategy. The collection is partitioned into groups of approximately
n (default 512), each of which is reduced with reducef (with a seed
value obtained by calling (combinef) with no arguments). The results
of these reductions are then reduced with combinef (default
reducef). combinef must be associative, and, when called with no
arguments, (combinef) must produce its identity element. These
operations may be performed in parallel, but the results will
preserve order."
{:added "1.5"}
([reducef coll] (fold reducef reducef coll))
([combinef reducef coll] (fold 512 combinef reducef coll))
([n combinef reducef coll]
(coll-fold coll n combinef reducef)))
In this case we use overloads of combinef
or reducef
to provider the initial accumulator
(called the identity element), the rfn, finalization and the merge function. combinef
called
with no arguments provides each thread context's accumulator and called with two arguments
performs a merge of two accumulators. reducef
called with 2 arguments provides
the reduction from a value into the accumulator and when called with one argument
finalizes both the potentially stateful reducing function and finalizes the
accumulator. It prescribes the parallelization system but users can override a protocol
to do it themselves.
This the same major drawback as the java stream system, namely users cannot provide their own pool for parallelization.
An interesting decision was made here as to whether one can actually parallelize the
reduction or not. Transducers, the elements providing reducef
, may be stateful
such as (take 15)
. One interesting difference is that state is done with a closure in
the reduction function as opposed to providing a custom accumulator that wraps the user's
accumulator but tracks state.
One aspect we haven't discussed but that is also handled here in an interesting
manner is that whether a reduction can be parallelized or not is a function both
of the container and of the reducer. reducers.clj
does a sort of
double-dispatch where the transducer may choose to implement the parallel
reduction, called coll-fold
or not and is queried first and if it allows
parallel reduction then the collection itself is dispatched. Overall this is a
great, safe choice because it disallows completely parallel dispatch if the
transducer or the collection do not support it.
Parallel Reducers
If we combine all three functions: init-val-fn
, rfn
, and merge-fn
into one object
then we get a ParallelReducer, defined in protocols.clj. This protocol allows the
user to pass a single object into a parallelized reduction as opposed to three functions
which is useful when we want to have many reducers reduce over a single source of data.
A finalize
method is added in order to allow compositions of reducers, and to allow
reducers to hide state and information from end users:
(defprotocol ParallelReducer
"Parallel reducers are simple a single object that you can pass into preduce as
opposed to 3 separate functions."
(->init-val-fn [item]
"Returns the initial values for a parallel reduction. This function
takes no arguments and returns the initial reduction value.")
(->rfn [item]
"Returns the reduction function for a parallel reduction. This function takes
two arguments, the initial value and a value from the collection and returns a new
initial value.")
(->merge-fn [item]
"Returns the merge function for a parallel reduction. This function takes
two initial values and returns a new initial value.")
(finalize [item v]
"A finalize function called on the result of the reduction after it is
reduced but before it is returned to the user. Identity is a reasonable default."))
There are defaults to the reducer protocol for an IFn which assumes it can be
called with no arguments for a initial value and two arguments for both
reduction and merge. This works for things like +
and *
. Additionally
there are implementations provided for the ham_fisted Sum (Kahans compensated)
and SimpleSum
DoubleConsumer
classes.
With the three functions bundled into one logical protocol or object it is easy then to create complex (aggregate) and efficient parallelized reductions:
user> (require '[ham-fisted.reduce :as hamf-rf])
nil
user> (hamf-rf/preduce-reducers {:sum + :product *} (range 1 20))
{:product 121645100408832000, :sum 190}
user>
This goes over the data in parallel, exactly once.
Consumers, Transducers, and rfn
Chains
If we look at the reduction in terms of a push model as opposed to a pull model
where the stream will push data into a consumer then we can implement similar
chains or map and filter. These are based on creating a new consumer that takes
the older consumer and the filter predicate or mapping function. In this way
one can implement a pipeline on the input stream, or perhaps diverging pipelines
on each reduction function in a multiple reducer scenario. Since the init and
merge functions operate in accumulator space, which remains unchanged, one can
develop up increasingly sophisticated reduction functions and still perform a
parallelized reduction. Naturally, everything is composed in reverse (push
instead of pull), which is the reason that comp
works in reverse when working
with transducers.
In fact, given that the covers are pulled back on composing reduction functions,
the definition of the single argument clojure.core/filter
becomes more clear:
(defn filter
"Returns a lazy sequence of the items in coll for which
(pred item) returns logical true. pred must be free of side-effects.
Returns a transducer when no collection is provided."
{:added "1.0"
:static true}
([pred]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(if (pred input)
(rf result input)
result)))))))
It returns a function that, when given a reduction function, returns a new reduction function that when called in the two argument form is identical to the result above (although expressed in pure Clojure as opposed to Java).
Starting with the concept that a reduction begins at the collection, flows downward through the pipeline and bottoms out at the reducer then the lazy-noncaching namespace and Java streams implement parallelization flowing from the container downward. Separately consumer chains and transducers implement the pipeline flowing up from the reducer itself. Thus building the pipeline either downward from the source or upward from the final reduction produces subtly different properties. Regardless, every system must disable parallelization where it will cause an incorrect answer (to ensure correctness)
- such as in a stateful transducer.
Broadly speaking, however, it can be faster to enable full parallelization and filter invalid results than it is to force an early serialization our problem and thus lose lots of our parallelization potential. When concerned with performance, attempt to move transformations as much as possible into a parallelizable domain.
For the take-n
use case specifically mentioned above and potentially for others we
can parallelize the reduction and do the take-n both in the parallelized phase and
in the merge phase assuming we are using an ordered parallelization, so that doesn't
itself necessarily force a serialized reduction but there are of course
transformations and reductions that do. There are intermediate points however that are
perhaps somewhat wasteful in terms of cpu load but do allow for more parallelization - a
tradeoff that is sometimes worth it. Generically speaking we can visualize this sort
of a tradeoff as triangle of three points where one point is data locality, one
point parallelism, and one point redundancy. Specifically if we are willing to
trade some cpu efficiency for some redundancy, for instance, then we often get more
parallelization. Likewise if we are willing to save/load data from 'far' away from
the CPU, then we can cut down on redundancy but at the cost of locality. For more
on this line of thinking please take a moment and read at least some of Jonathan Ragan-Kelly's
excellent PhD thesis - a better explanation
of the above line of reasoning begins on page 20.
Primitive Typed Serial Reductions
This comes last for a good reason :-) - it doesn't make a huge difference in performance but it should be noted allowing objects to implemented typed reductions:
default Object doubleReduction(IFn.ODO op, Object init);
default Object longReduction(IFn.OLO op, Object init)
where the next incoming value is a primitive object but the accumulator is still
a generic object allows us to use things like DoubleConsumers
and
LongConsumers
and avoid boxing the stream. Furthermore if the aforementioned
map
and filter
primitives are careful about their rfn composition we can
maintain a completely primitive typed pipeline through an entire processing
chain.
One Final Note About Performance
Collapsing reductions brings the source iteration pathway closer to the final reduction pathway in terms of machine stack space which allows HotSpot to optimize the entire chain more readily. Regardless of how good HotSpot gets, however, parallelizing will nearly always result in a larger win but both work together to enable peak performance on the JVM given arbitrary partially typed compositions of sequences and reductions.
When increasing the data size yet again, one can of course use the same design to distribute the computations to different machines. As some people have figured out, however, simply implementing the transformations you need efficiently reduces or completely eliminates the need to distribute computation in the first place leading to a simpler, easier to test and more robust system.
Ideally we can make achieving great performance for various algorithms clear and easy and thus avoid myriad of issues regarding distributing computing in the first place.