Interior mutability: the arbitrator
A buffer spills when memory is tight (last lesson) — but “memory” is a single budget shared by
every operator running at once, each on its own thread, each charging and releasing bytes.
They need one shared object that tracks the total and decides who spills. Here Rust’s ownership
model seems to fight you: you share a value across threads with Arc, and Arc gives only
shared (&) access — no &mut. So how does anyone update the counters? The answer is
interior mutability: a type built to mutate through &self, using atomics, a copy-on-write
snapshot, and a condition variable. The MemoryArbitrator is the engine’s masterclass in it.
You’ll be able to: explain why a shared-across-threads type must mutate through &self, read
the arbitrator’s atomic counters, and describe the Condvar-based pause signal.
The &self problem
Section titled “The &self problem”Share a value across threads and you reach for Arc<T>. But Arc<T> only ever hands out &T —
never &mut T — because two threads holding &mut to the same value is precisely the data race
the borrow checker forbids. So a shared budget tracker can only offer &self methods. To mutate
through them, its fields must be interior-mutable types — ones whose own API turns a & into
a safe write. That’s the entire trick behind Atomic*, Mutex, RwLock, and ArcSwap.
The arbitrator is Arc-shared across every operator thread, and its fields are exactly these:
clinker-exec ·memory.rs ·MemoryArbitrator type @47d2e12
pub struct MemoryArbitrator { limit: AtomicU64, // the hard limit, in bytes peak_rss: AtomicU64, // highest process RSS observed cumulative_spill_bytes: AtomicU64, // total bytes spilled to disk // copy-on-write registry of operators to poll / pause / spill: consumers: ArcSwap<Vec<(ConsumerId, Arc<dyn MemoryConsumer>)>>, policy: Box<dyn ArbitrationPolicy>, // ... more atomic counters}Every counter is an AtomicU64; the operator registry is an ArcSwap (a lock-free,
copy-on-write cell). None of these needs &mut to change — and that’s what lets one
Arc<MemoryArbitrator> be read and updated by all the threads at once.
thread (sort op) ─┐ thread (agg op) ─┼──▶ Arc<MemoryArbitrator> thread (join op) ─┤ limit, peak_rss, cumulative_spill : AtomicU64 thread (source) ─┘ consumers : ArcSwap<Vec<…>> (lock-free snapshot) every thread holds &self; updates go through the atomics — no &mut, no global lockMutating through &self
Section titled “Mutating through &self”Watch the signatures: these methods change the arbitrator’s state, yet every one takes &self.
The mutation rides an atomic operation:
pub fn observe(&self) { if let Some(rss) = rss_bytes() { self.peak_rss.fetch_max(rss, Ordering::Relaxed); // raise the high-water mark }}The per-operator byte counter lives on a ConsumerHandle — an Arc-shared handle each operator
holds — and charges/releases bytes the same lock-free way:
clinker-exec ·memory.rs ·ConsumerHandle type @47d2e12
pub fn add_bytes(&self, n: u64) { // &self — charge memory from any thread let _ = self.bytes.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| Some(cur.saturating_add(n)));}pub fn sub_bytes(&self, n: u64) { // &self — release it again let _ = self.bytes.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| Some(cur.saturating_sub(n)));}Why atomics rather than wrapping the whole thing in a Mutex? Because charging bytes happens on
the hot path, per batch, from many threads. A fetch_add is a single lock-free CPU
instruction; a Mutex would serialize every operator through one lock. And the operator
registry uses ArcSwap for the same reason: the frequent readers (which operator is using
what?) load an immutable snapshot with no lock, while the rare register/unregister clones the Vec
and atomically swaps it in. Hot path lock-free; rare path copy-on-write.
The pause signal: a Condvar in miniature
Section titled “The pause signal: a Condvar in miniature”Not all coordination is a counter. When the arbitrator decides to pause a source for
backpressure, the source’s thread has to actually block until resumed — and busy-spinning
would burn a core. That’s what a condition variable is for. The PauseSignal pairs an
AtomicBool flag with a Mutex + Condvar:
clinker-exec ·memory.rs ·PauseSignal type @47d2e12
pub struct PauseSignal { paused: AtomicBool, mu: Mutex<()>, cv: Condvar }
impl PauseSignal { pub fn resume(&self) { self.paused.store(false, Ordering::Release); self.cv.notify_all(); // wake every parked waiter } pub fn wait_while_paused(&self) { if !self.is_paused() { return; } // lock-free fast path when not paused let mut g = self.mu.lock().unwrap(); while self.is_paused() { g = self.cv.wait(g).unwrap(); // park the thread — no CPU spin } }}A paused source calls wait_while_paused() and the OS puts its thread to sleep; resume() wakes
it with notify_all. The AtomicBool fast path means the common “not paused” case costs nothing —
no lock at all. Three interior-mutable primitives, each chosen for its access pattern: atomics for
hot counters, ArcSwap for a mostly-read registry, Condvar for genuine blocking.
Share a counter across threads
Section titled “Share a counter across threads”This is the whole idea in std: an Arc<AtomicU64> mutated by four threads through &self, no
&mut and no lock anywhere:
> output appears here — press Run
Four threads, one counter, each charging through a shared &. The total is always 10000 — atomic
fetch_add makes the increments race-free without any lock. Try swapping the AtomicU64 for a
plain u64 behind the Arc and the code won’t compile: a bare u64 isn’t interior-mutable, so
Arc gives you no way to write it. That compile error is Rust steering you to the right tool.
// quick check
Why are the MemoryArbitrator's counters AtomicU64 with &self methods, rather than plain u64 fields mutated through &mut self?
Arc only yields &T. To mutate shared state you need interior mutability; AtomicU64 turns a &self into a lock-free write, so per-batch charging from many threads doesn't serialize through a lock.
Read the arbitrator
Section titled “Read the arbitrator”You’ve seen how the arbitrator stores shared state safely. Next: what it decides with it — spill, pause, or abort — and how it refuses an impossible budget before a single record moves.