Threads & channels
The dispatch loop from last lesson doesn’t sit idle waiting for a source to hand it the next
record. Clinker runs each source on its own OS thread, reading and decoding in parallel,
pushing records through a bounded channel into the dispatcher. That overlaps slow IO with
compute — and, because the channel is bounded, it throttles a fast reader when the consumer
falls behind, keeping memory in check. The thing that makes this concurrency safe rather than
terrifying is Rust’s ownership model: Send and Sync are checked by the compiler, so a data
race is a build error.
You’ll be able to: describe the one-thread-per-source ingest model, explain how a bounded
channel produces backpressure, and say what the Send bound on RecordSource guarantees.
One thread per source
Section titled “One thread per source”Every declared source gets a dedicated, named thread that drives its reader and pushes records:
clinker-exec ·ingest.rs ·ingest_source fn @47d2e12
// crates/clinker-exec/src/executor/mod.rs — one OS thread per Sourcelet handle = std::thread::Builder::new() .name(format!("clinker-ingest-{}", src_cfg.name)) .spawn(move || ingest_source(src_cfg_owned, source_input, config_clone, stream, shutdown))?;The producer thread loops next_record() and pushes each one into the channel; the dispatch loop
(the consumer) drains it. Producers and consumer run concurrently — while source A’s thread
is blocked decoding a slow file, the dispatcher is busy running operators on records already
delivered.
The bounded channel is the backpressure
Section titled “The bounded channel is the backpressure”The channel connecting a reader thread to the dispatcher is a bounded crossbeam_channel.
Its element is a StreamEvent — either a body record or a document-boundary marker:
clinker-exec ·stream_event.rs ·StreamEvent type @47d2e12
pub enum StreamEvent { Record(Record, u64), // a row and its source row number Punctuation(Punctuation), // a document-boundary marker (open / close)}let (tx, rx) = crossbeam_channel::bounded(capacity); // DEFAULT_CAPACITY = 1024// ...self.tx.send(StreamEvent::record(record, row_num))?; // BLOCKS when the channel is fullRead what “bounded” buys you. The channel holds at most capacity events. When it’s full, the
producer’s send blocks the reader thread until the consumer drains one. So a fast reader
cannot race ahead of a slow pipeline and pile a million records into memory — it’s paced to the
consumer. That is backpressure, and it falls out of the channel’s bound for free, with no
explicit throttling logic:
source A ──[reader thread]──▶ bounded channel (cap 1024) ──┐ source B ──[reader thread]──▶ bounded channel (cap 1024) ──┼──▶ dispatch loop ──▶ operators source C ──[reader thread]──▶ bounded channel (cap 1024) ──┘ ▲ a full channel blocks └──────────── backpressure ───────────────── its reader threadSend and Sync: the compiler proves it safe
Section titled “Send and Sync: the compiler proves it safe”Moving a reader onto another thread is only sound if the reader can cross threads. Rust spells
that requirement out in the trait bound: a RecordSource must be Send:
clinker-exec ·mod.rs ·RecordSource trait @47d2e12
/// Must be `Send` for the per-Source `std::thread` to own it; not `Sync`/// — each source is single-threaded streaming.pub trait RecordSource: Send { /* schema(), next_record(), ... */ }Two auto-traits carry the whole thread-safety story:
Send= “ownership of this value may move to another thread.” The reader isSend, so the spawn can take it bymove. A non-Sendtype (say, one holding anRc) would make thatspawna compile error — caught before it ever runs.Sync= “a&Tmay be shared across threads.” The reader is deliberately notSync: one thread owns it, no sharing. But data that is shared — the schema, the document context — is held behindArcand isSync, so many threads can read it at once. (That’s the Phase 2Arcstory, now load-bearing for concurrency.)
You don’t sprinkle locks and hope. The bounds are checked at compile time: if the types line up,
the threading is race-free by construction. Heavy CPU operators (sorts, joins) take a second
path — a shared Rayon thread pool the executor installs work onto — but the same Send/
Sync discipline governs what may cross into it.
Feel the backpressure
Section titled “Feel the backpressure”std has a bounded channel too — sync_channel. Here one producer thread feeds a deliberately
slow consumer through a tiny buffer; watch the producer stall when the buffer fills:
> output appears here — press Run
The producer can get at most two rows ahead before send blocks; it then advances only as the
consumer drains. Shrink the slow loop to nothing and the producer races to “done” immediately —
the bound is the only thing pacing it. Grow the channel and you trade memory for slack. That dial
is exactly what the executor’s DEFAULT_CAPACITY sets.
// quick check
How does a bounded channel create backpressure between a source reader and the dispatch loop?
Backpressure is a side effect of the bound: a full channel blocks send, stalling the reader until space frees up. No records are dropped and no explicit throttle is needed — the capacity is the throttle.
Inspect the concurrency
Section titled “Inspect the concurrency”You’ve seen records flow concurrently from sources into the dispatcher. But a blocking operator — a sort, a big aggregation — has to hold records while it works, and there may be more than fit in memory. Next: the buffer that can live in RAM, on disk, or both.