The risk calculation pipeline processed end-of-day positions: take all the day’s trades, aggregate them to net positions, apply mark-to-market prices, and compute risk metrics. The input was ~800,000 trade records; the output was ~12,000 position records with P&L and Greeks.
The initial implementation used standard Clojure sequence operations:
| |
Clean. Readable. And it created four intermediate collections of 800,000 elements each before producing the final output. That’s 3.2M intermediate objects for a 12K result.
Transducers changed this.
What Transducers Are
A transducer is a function that transforms a reducing function. That’s the shortest accurate definition, and it’s not helpful without an example.
Consider map:
| |
(map f) returns a transducer — a function that can be applied to any reducing context, whether that context is building a vector, summing numbers, or sending to a channel.
| |
The key property: the composition produces a single pass over the input. No intermediate collections. Each element flows through the entire pipeline before the next element starts.
No Intermediate Collections
The standard sequence pipeline:
| |
With lazy sequences, the intermediate sequences aren’t fully realised, but each step still allocates a cons cell (2 pointer fields) per element. For 600K trades, that’s ~2.4M pointer allocations in the intermediate steps.
The transducer pipeline:
| |
Single pass, one allocation per result element. The filter and map are fused.
Benchmark on 800K trades:
Approach Time Allocations
────────────────────────────────────────────────
Standard ->> pipeline 4.2s ~380MB
Transducer (into []) 1.8s ~85MB
Transducer (reduce) 1.3s ~22MB
The “reduce” variant skips the final vector allocation by reducing directly into the result data structure.
The Production Pipeline
The full risk pipeline rewritten with transducers:
| |
The x/by-key and x/reduce are from net.cgrand/xforms, which provides additional transducer-compatible combinators including stateful ones like grouping.
What this achieves:
- All filtering, enrichment, risk computation, and grouping in one pass
- No intermediate collection between pipeline stages
- The
market-dataandref-datalookups are closure-captured, not threaded through
Stateful Transducers
Some pipeline steps require state — running totals, windowed aggregations, deduplication. Transducers support this through the three-arity form:
| |
The state (previous value) lives inside the transducer closure, not in the collection being processed. This is safe for sequential processing; for parallel use, you’d need separate transducer instances per partition.
Transducers Into Core.async
The most useful property of transducers — at least for our pipeline work — was that they compose with core.async channels:
| |
The chan constructor accepts an optional transducer. Items flowing through the channel are transformed by the transducer before being made available to readers. This composes the I/O boundary (channel) with the data transformation (transducer) without creating a separate processing goroutine for each step.
When Transducers Are Worth the Complexity
Not every pipeline benefits from transducers. The value is proportional to:
Data volume: for 10-element collections, the intermediate allocation is irrelevant. For 800K-element collections processed repeatedly, the allocation reduction matters.
Pipeline depth: a two-step pipeline doesn’t benefit much from fusion. A six-step pipeline where each step filters or transforms significantly benefits from a single pass.
Reuse across contexts: if the same pipeline logic needs to run over a vector, a lazy sequence, and a core.async channel, transducers provide that reuse. If the pipeline has a single context (always a vector), standard seq operations are simpler.
For the risk pipeline — large batches, multiple steps, running every end-of-day — transducers were the right call. For utility functions over small collections, they added complexity with no measurable benefit. The discipline is knowing which is which.