The risk calculation pipeline processed end-of-day positions: take all the day’s trades, aggregate them to net positions, apply mark-to-market prices, and compute risk metrics. The input was ~800,000 trade records; the output was ~12,000 position records with P&L and Greeks.

The initial implementation used standard Clojure sequence operations:

1
2
3
4
5
6
(->> trades
     (filter open-trade?)
     (map enrich-with-market-data)
     (group-by :currency-pair)
     (map-vals aggregate-position)
     (map-vals compute-risk-metrics))

Clean. Readable. And it created four intermediate collections of 800,000 elements each before producing the final output. That’s 3.2M intermediate objects for a 12K result.

Transducers changed this.

What Transducers Are

A transducer is a function that transforms a reducing function. That’s the shortest accurate definition, and it’s not helpful without an example.

Consider map:

1
2
3
4
5
;; Standard map — returns a lazy sequence:
(map f coll)

;; The same transformation as a transducer:
(map f)  ;; called without collection — returns a transducer

(map f) returns a transducer — a function that can be applied to any reducing context, whether that context is building a vector, summing numbers, or sending to a channel.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
;; Apply the transducer to reduce into a vector:
(transduce (map f) conj [] coll)

;; Apply the transducer to reduce by summing:
(transduce (map f) + 0 coll)

;; Compose multiple transducers (processed in order, left to right):
(def xf (comp (filter open-trade?)
              (map enrich-with-market-data)))

(transduce xf conj [] trades)

The key property: the composition produces a single pass over the input. No intermediate collections. Each element flows through the entire pipeline before the next element starts.

No Intermediate Collections

The standard sequence pipeline:

1
2
3
(->> trades
     (filter open-trade?)         ; creates seq of ~600K
     (map enrich-with-market-data)) ; creates seq of ~600K

With lazy sequences, the intermediate sequences aren’t fully realised, but each step still allocates a cons cell (2 pointer fields) per element. For 600K trades, that’s ~2.4M pointer allocations in the intermediate steps.

The transducer pipeline:

1
2
3
4
(def xf (comp (filter open-trade?)
              (map enrich-with-market-data)))

(into [] xf trades)

Single pass, one allocation per result element. The filter and map are fused.

Benchmark on 800K trades:

Approach                   Time      Allocations
────────────────────────────────────────────────
Standard ->> pipeline       4.2s      ~380MB
Transducer (into [])        1.8s       ~85MB
Transducer (reduce)         1.3s       ~22MB

The “reduce” variant skips the final vector allocation by reducing directly into the result data structure.

The Production Pipeline

The full risk pipeline rewritten with transducers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
(defn risk-xf
  "Transducer pipeline: filter → enrich → validate → format"
  [market-data ref-data]
  (comp
    (filter open-trade?)
    (map #(enrich-trade % market-data))
    (remove #(< (:notional %) MIN-NOTIONAL))    ; filter after enrichment
    (map #(compute-risk % ref-data))
    (filter #(nil? (:error %)))))               ; only successful computations

(defn process-trades [trades market-data ref-data]
  (let [xf (risk-xf market-data ref-data)]
    (into {}
          (comp xf
                (map (juxt :currency-pair identity))  ; key by pair
                (x/by-key :currency-pair              ; group and aggregate
                          (x/reduce aggregate-position)))
          trades)))

The x/by-key and x/reduce are from net.cgrand/xforms, which provides additional transducer-compatible combinators including stateful ones like grouping.

What this achieves:

  • All filtering, enrichment, risk computation, and grouping in one pass
  • No intermediate collection between pipeline stages
  • The market-data and ref-data lookups are closure-captured, not threaded through

Stateful Transducers

Some pipeline steps require state — running totals, windowed aggregations, deduplication. Transducers support this through the three-arity form:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
;; A stateful transducer that deduplicates consecutive identical values:
(defn dedupe-xf []
  (fn [rf]
    (let [prev (volatile! ::none)]   ; mutable state inside the transducer
      (fn
        ([] (rf))                    ; zero-arity: init
        ([result] (rf result))       ; one-arity: completion
        ([result input]              ; two-arity: step
         (let [prior @prev]
           (vreset! prev input)
           (if (= prior input)
             result                  ; skip duplicate
             (rf result input))))))))

;; Use it:
(transduce (dedupe-xf) conj [] [1 1 2 2 3 3 1])
;; => [1 2 3 1]

The state (previous value) lives inside the transducer closure, not in the collection being processed. This is safe for sequential processing; for parallel use, you’d need separate transducer instances per partition.

Transducers Into Core.async

The most useful property of transducers — at least for our pipeline work — was that they compose with core.async channels:

1
2
3
4
5
6
7
8
(require '[clojure.core.async :as a])

(def trade-ch  (a/chan 1000))
(def result-ch (a/chan 100 (risk-xf market-data ref-data)))

;; trades put onto trade-ch flow through the transducer pipeline
;; and arrive at result-ch already processed
(a/pipe trade-ch result-ch)

The chan constructor accepts an optional transducer. Items flowing through the channel are transformed by the transducer before being made available to readers. This composes the I/O boundary (channel) with the data transformation (transducer) without creating a separate processing goroutine for each step.

When Transducers Are Worth the Complexity

Not every pipeline benefits from transducers. The value is proportional to:

Data volume: for 10-element collections, the intermediate allocation is irrelevant. For 800K-element collections processed repeatedly, the allocation reduction matters.

Pipeline depth: a two-step pipeline doesn’t benefit much from fusion. A six-step pipeline where each step filters or transforms significantly benefits from a single pass.

Reuse across contexts: if the same pipeline logic needs to run over a vector, a lazy sequence, and a core.async channel, transducers provide that reuse. If the pipeline has a single context (always a vector), standard seq operations are simpler.

For the risk pipeline — large batches, multiple steps, running every end-of-day — transducers were the right call. For utility functions over small collections, they added complexity with no measurable benefit. The discipline is knowing which is which.