ham-fisted.reduce

Protocol-based parallel reduction architecture and helper functions.

->consumer

(->consumer cfn)

Return an instance of a consumer, double consumer, or long consumer.

bind-double-consumer-reducer!

(bind-double-consumer-reducer! cls-type ctor)(bind-double-consumer-reducer! ctor)

Bind a classtype as a double consumer parallel reducer - the consumer must implement DoubleConsumer, ham_fisted.Reducible, and IDeref.

compose-reducers

(compose-reducers reducers)(compose-reducers options reducers)

Given a map or sequence of reducers return a new reducer that produces a map or vector of results.

If data is a sequence then context is guaranteed to be an object array.

Options:

  • :rfn-datatype - One of nil, :int64, or :float64. This indicates that the rfn's should all be uniform as accepting longs, doubles, or generically objects. Defaults to nil.

consume!

(consume! consumer coll)

Consumer a collection. This is simply a reduction where the return value is ignored.

Returns the consumer.

consumer-accumulator

(consumer-accumulator c v)

Generic reduction function using a consumer

consumer-preducer

(consumer-preducer constructor)

Bind a consumer as a parallel reducer.

Consumer must implement java.util.function.Consumer, ham_fisted.Reducible and clojure.lang.IDeref.

Returns instance of type bound.

See documentation for declare-double-consumer-preducer!.

consumer-reducer

(consumer-reducer ctor)

Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

double-accumulator

macro

(double-accumulator accvar varvar & code)

Type-hinted double reduction accumulator. consumer:

  ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                             0.0
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0

double-consumer-accumulator

Converts from a double consumer to a double reduction accumulator that returns the consumer:

ham-fisted.api> (reduce double-consumer-accumulator
                             (Sum$SimpleSum.)
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0

double-consumer-preducer

(double-consumer-preducer constructor)

Return a preducer for a double consumer.

Consumer must implement java.util.function.DoubleConsumer, ham_fisted.Reducible and clojure.lang.IDeref.

user> (require '[ham-fisted.api :as hamf])
nil
user> (import '[java.util.function DoubleConsumer])
java.util.function.DoubleConsumer
user> (import [ham_fisted Reducible])
ham_fisted.Reducible
user> (import '[clojure.lang IDeref])
clojure.lang.IDeref
user> (deftype MeanR [^{:unsynchronized-mutable true :tag 'double} sum
                      ^{:unsynchronized-mutable true :tag 'long} n-elems]
        DoubleConsumer
        (accept [this v] (set! sum (+ sum v)) (set! n-elems (unchecked-inc n-elems)))
        Reducible
        (reduce [this o]
          (set! sum (+ sum (.-sum ^MeanR o)))
          (set! n-elems (+ n-elems (.-n-elems ^MeanR o)))
          this)
        IDeref (deref [this] (/ sum n-elems)))
user.MeanR
user> (hamf/declare-double-consumer-preducer! MeanR (MeanR. 0 0))
nil
  user> (hamf/preduce-reducer (double-consumer-preducer #(MeanR. 0 0)) (hamf/range 200000))
99999.5

double-consumer-reducer

(double-consumer-reducer ctor)

Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

immut-map-kv

(immut-map-kv keyfn valfn data)(immut-map-kv ks vs)

indexed-accum

macro

(indexed-accum accvar idxvar varvar & code)

Create an indexed accumulator that recieves and additional long index during a reduction:

ham-fisted.api> (reduce (indexed-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]

indexed-double-accum

macro

(indexed-double-accum accvar idxvar varvar & code)

Create an indexed double accumulator that recieves and additional long index during a reduction:

ham-fisted.api> (reduce (indexed-double-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0.0] [1 1.0] [2 2.0] [3 3.0] [4 4.0]]

indexed-long-accum

macro

(indexed-long-accum accvar idxvar varvar & code)

Create an indexed long accumulator that recieves and additional long index during a reduction:

ham-fisted.api> (reduce (indexed-long-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]

long-accumulator

macro

(long-accumulator accvar varvar & code)

Type-hinted double reduction accumulator. consumer:

  ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                             0.0
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0

long-consumer-accumulator

Converts from a long consumer to a long reduction accumulator that returns the consumer:

ham-fisted.api> (reduce double-consumer-accumulator
                             (Sum$SimpleSum.)
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0

long-consumer-reducer

(long-consumer-reducer ctor)

Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

options->parallel-options

(options->parallel-options options)

Convert an options map to a parallel options object.

Options:

  • :pool - supply the forkjoinpool to use.
  • :max-batch-size - Defaults to 64000, used for index-based parallel pathways to control the number size of each parallelized batch.
  • :ordered? - When true process inputs and provide results in order.
  • :parallelism - The amount of parallelism to expect. Defaults to the number of threads in the fork-join pool provided.
  • :cat-parallelism - Either :seq-wise or :elem-wise - when parallelizing over a concatenation of containers either parallelize each container - :elem-wise or use one thread per container - seq-wise. Defaults to seq-wise as this doesn't require each container itself to support parallelization but relies on the sequence of containers to be long enough to saturate the processor.
  • :put-timeout-ms - The time to wait to put data into the queue. This is a safety mechanism so that if the processing system fails we don't just keep putting things into a queue.
  • :unmerged-result? - Use with care. For parallel reductions do not perform the merge step but instead return the sequence of partially reduced results.
  • :n-lookahead - How for to look ahead for pmap and upmap to add new jobs to the queue. Defaults to `(* 2 parallelism).

parallel-reducer

(parallel-reducer init-fn rfn merge-fn fin-fn)(parallel-reducer init-fn rfn merge-fn)

Implement a parallel reducer by explicitly passing in the various required functions.

  • 'init-fn' - Takes no argumenst and returns a new accumulation target.
  • 'rfn' - clojure rf function - takes two arguments, the accumulation target and a new value and produces a new accumulation target.
  • 'merge-fn' - Given two accumulation targets returns a new combined accumulation target.
  • 'fin-fn' - optional - Given an accumulation target returns the desired final type.
user> (hamf-rf/preduce-reducer
       (hamf-rf/parallel-reducer
        hamf/mut-set
        #(do (.add ^java.util.Set %1 %2) %1)
        hamf/union
        hamf/sort)
       (lznc/map (fn ^long [^long v] (rem v 13)) (hamf/range 1000000)))
[0 1 2 3 4 5 6 7 8 9 10 11 12]

preduce

(preduce init-val-fn rfn merge-fn coll)(preduce init-val-fn rfn merge-fn options coll)

Parallelized reduction. Currently coll must either be random access or a lznc map/filter chain based on one or more random access entities, hashmaps and sets from this library or any java.util set, hashmap or concurrent versions of these. If input cannot be parallelized this lowers to a normal serial reduction.

For potentially small-n invocations providing the parallel options explicitly will improve performance surprisingly - converting the options map to the parallel options object takes a bit of time.

  • init-val-fn - Potentially called in reduction threads to produce each initial value.
  • rfn - normal clojure reduction function. Typehinting the second argument to double or long will sometimes produce a faster reduction.
  • merge-fn - Merge two reduction results into one.

Options:

  • :pool - The fork-join pool to use. Defaults to common pool which assumes reduction is cpu-bound.
  • :parallelism - What parallelism to use - defaults to pool's getParallelism method.
  • :max-batch-size - Rough maximum batch size for indexed or grouped reductions. This can both even out batch times and ensure you don't get into safepoint trouble with jdk-8.
  • :min-n - minimum number of elements before initiating a parallelized reduction - Defaults to 1000 but you should customize this particular to your specific reduction.
  • :ordered? - True if results should be in order. Unordered results sometimes are slightly faster but again you should test for your specific situation..
  • :cat-parallelism - Either :seq-wise or :elem-wise, defaults to :seq-wise. Test for your specific situation, this really is data-dependent. This contols how a concat primitive parallelizes the reduction across its contains. Elemwise means each container's reduction is individually parallelized while seqwise indicates to do a pmap style initial reduction across containers then merge the results.
  • :put-timeout-ms - Number of milliseconds to wait for queue space before throwing an exception in unordered reductions. Defaults to 50000.
  • :unmerged-result? - Defaults to false. When true, the sequence of results be returned directly without any merge steps in a lazy-noncaching container. Beware the noncaching aspect -- repeatedly evaluating this result may kick off the parallelized reduction multiple times. To ensure caching if unsure call seq on the result.

preduce-reducer

(preduce-reducer reducer options coll)(preduce-reducer reducer coll)

Given an instance of ham-fisted.protocols/ParallelReducer, perform a parallel reduction.

In the case where the result is requested unmerged then finalize will be called on each result in a lazy noncaching way. In this case you can use a non-parallelized reducer and simply get a sequence of results as opposed to one.

  • reducer - instance of ParallelReducer
  • options - Same options as preduce.
  • coll - something potentially with a parallelizable reduction.

See options for ham-fisted.reduce/preduce.

Additional Options:

  • :skip-finalize? - when true, the reducer's finalize method is not called on the result.

preduce-reducers

(preduce-reducers reducers options coll)(preduce-reducers reducers coll)

Given a map or sequence of ham-fisted.protocols/ParallelReducer, produce a map or sequence of reduced values. Reduces over input coll once in parallel if coll is large enough. See options for ham-fisted.reduce/preduce.

ham-fisted.api> (preduce-reducers {:sum (Sum.) :mult *} (range 20))
{:mult 0, :sum #<Sum@5082c3b7: {:sum 190.0, :n-elems 20}>}

reduce-reducer

(reduce-reducer reducer coll)

Serially reduce a reducer.

ham-fisted.api> (reduce-reducer (Sum.) (range 1000))
#<Sum@afbedb: {:sum 499500.0, :n-elems 1000}>

reduce-reducers

(reduce-reducers reducers coll)

Serially reduce a map or sequence of reducers into a map or sequence of results.

ham-fisted.api> (reduce-reducers {:a (Sum.) :b *} (range 1 21))
{:b 2432902008176640000, :a #<Sum@6bcebeb1: {:sum 210.0, :n-elems 20}>}

reducer->completef

(reducer->completef reducer)

Return fold-compatible pair of reducef, completef given a parallel reducer. Note that folded reducers are not finalized as of this time:

ham-fisted.api> (def data (vec (range 200000)))
#'ham-fisted.api/data
ham-fisted.api> (r/fold (reducer->completef (Sum.)) (reducer->rfn (Sum.)) data)
#<Sum@858c206: {:sum 1.99999E10, :n-elems 200000}>

reducer->rf

(reducer->rf reducer)

Given a reducer, return a transduce-compatible rf -

ham-fisted.api> (transduce (clojure.core/map #(+ % 2)) (reducer->rf (Sum.)) (range 200))
{:sum 20300.0, :n-elems 200}

reducer-with-finalize

(reducer-with-finalize reducer fin-fn)

reducer-xform->reducer

(reducer-xform->reducer reducer xform)

Given a reducer and a transducer xform produce a new reducer which will apply the transducer pipeline before is reduction function.

ham-fisted.api> (reduce-reducer (reducer-xform->reducer (Sum.) (clojure.core/filter even?))
                                (range 1000))
#<Sum@479456: {:sum 249500.0, :n-elems 500}>

!! - If you use a stateful transducer here then you must not use the reducer in a parallelized reduction.

reducible-merge

Parallel reduction merge function that expects both sides to be an instances of Reducible