charred.bulk

Helpers for bulk operations such as concatenating a large sequence of csv files.

batch-csv-rows

(batch-csv-rows batch-size row-seq)(batch-csv-rows batch-size options row-seq)

Given a potentially very large sequence of rows, lazily return batches of rows. Returned object has an efficient iterator, IReduceInit (3 arg reduce) implementations and an efficient seq (safest) implementation. When using raw iterator, each previous batch must be completely read before the .hasNext function of the iterator will return an accurate result.

If :csv-load-thread-name is provided then the loading will happen in a separate thread and an auto-closeable seq will be returned that is safe to use in pmap or similar operations. This allows the CSV system to run at full speed while potentially cpu-intensive processing can happen in other threads. Recommended batch sizes for this pathway are around 128000. The loading can be stopped prematurely by closing the returned value and reading the entirety of the sequence.

Options:

  • :header? - When true, the header row will be returned as the first row of each batch. Defaults to true.
  • :csv-load-thread-name - When not nil, perform the csv parsing in a separate thread named :csv-load-thread-name.
  • :csv-load-queue-depth - Queue depth used for each batch of rows. Defaults to 4.
  • :csv-load-queue-timeout - Timeout in milliseconds used for .put calls. Defaults to 5 seconds.
  • :csv-load-log-level - Defaults to :debug. Options are :debug or :info.

Examples:

;; HANGS - each batch is not read entirely before .next call of iterator so iterator forever
;; has next.
user> (->> (charred/read-csv-supplier (java.io.File. "stocks.csv"))
           (bulk/batch-csv-rows 100)
           (vec))
;; Works - the seq implementation is always safe
user> (->> (charred/read-csv-supplier (java.io.File. "stocks.csv"))
           (bulk/batch-csv-rows 100)
           (seq)
           (vec))
;; Works fine -
user> (->> (charred/read-csv-supplier (java.io.File. "stocks.csv"))
           (bulk/batch-csv-rows 100)
           (map vec))

;; Very memory efficient -
user> (->> (charred/read-csv-supplier (java.io.File. "stocks.csv"))
           (bulk/batch-csv-rows 100)
           (reduce (fn [rc batch] (+ rc (count (vec batch))))
                   0))
;; Threaded example - note the batch is already a vector
user> (->> (charred/read-csv-supplier (java.io.File. "stocks.csv"))
           (bulk/batch-csv-rows 100 {:csv-load-thread-name "CSV loader" :csv-load-log-level :info})
           (reduce (fn [rc batch] (+ rc (count batch)))
                   0))
Aug 19, 2023 9:19:43 AM charred.bulk$batch_csv_rows$fn__6468 invoke
INFO: CSV load thread finished - 566 rows read
Aug 19, 2023 9:19:43 AM charred.bulk$batch_csv_rows$reify__6472 close
INFO: CSV load thread joined
566

cat-csv-inputs

(cat-csv-inputs)(cat-csv-inputs options)

Stateful transducer that, given a sequence of inputs, produces a single sequence of parsed csv rows. This transducer slices off the header rows of downstream inputs when :header? is true.

Options:

  • :header? - defaults to true - assume first row of each file is a header row.

Options are passed through to read-csv-supplier.

Example:

(transduce (comp (bulk/cat-csv-inputs options) (map tfn)) (charred/write-csv-rf options) fseq)

concatenate-csv

(concatenate-csv output fseq)(concatenate-csv output options fseq)

Given a sequence of csv files, concatenate into a single csv file.

  • fseq - a sequence of java.io.File's or other inputs to read-csv-supplier
  • output - an output stream or other closeable stream.

Returns the number of rows written.

Options:

  • :header? - defaults to true - assume first row of each file is a reader row.
  • :tfn - function from row->row that receives all output rows (header rows, aside from the first are elided). If this function returns 'nil' that row is then elided from output.

Example:

user> (->> (repeat 10 (java.io.File. "/home/chrisn/dev/tech.all/tech.ml.dataset/test/data/stocks.csv"))
           (bulk/concatenate-csv "test/data/big-stocks.csv" {:header? false}))
5610
user> (->> (repeat 10 (java.io.File. "/home/chrisn/dev/tech.all/tech.ml.dataset/test/data/stocks.csv"))
           (bulk/concatenate-csv "test/data/big-stocks.csv" {:header? true}))
5601