Skip to content

Interior mutability: the arbitrator

A buffer spills when memory is tight (last lesson) — but “memory” is a single budget shared by every operator running at once, each on its own thread, each charging and releasing bytes. They need one shared object that tracks the total and decides who spills. Here Rust’s ownership model seems to fight you: you share a value across threads with Arc, and Arc gives only shared (&) access — no &mut. So how does anyone update the counters? The answer is interior mutability: a type built to mutate through &self, using atomics, a copy-on-write snapshot, and a condition variable. The MemoryArbitrator is the engine’s masterclass in it.

You’ll be able to: explain why a shared-across-threads type must mutate through &self, read the arbitrator’s atomic counters, and describe the Condvar-based pause signal.

Share a value across threads and you reach for Arc<T>. But Arc<T> only ever hands out &T — never &mut T — because two threads holding &mut to the same value is precisely the data race the borrow checker forbids. So a shared budget tracker can only offer &self methods. To mutate through them, its fields must be interior-mutable types — ones whose own API turns a & into a safe write. That’s the entire trick behind Atomic*, Mutex, RwLock, and ArcSwap.

The arbitrator is Arc-shared across every operator thread, and its fields are exactly these:

clinker-exec ·memory.rs ·MemoryArbitrator type @47d2e12
pub struct MemoryArbitrator {
limit: AtomicU64, // the hard limit, in bytes
peak_rss: AtomicU64, // highest process RSS observed
cumulative_spill_bytes: AtomicU64, // total bytes spilled to disk
// copy-on-write registry of operators to poll / pause / spill:
consumers: ArcSwap<Vec<(ConsumerId, Arc<dyn MemoryConsumer>)>>,
policy: Box<dyn ArbitrationPolicy>,
// ... more atomic counters
}

Every counter is an AtomicU64; the operator registry is an ArcSwap (a lock-free, copy-on-write cell). None of these needs &mut to change — and that’s what lets one Arc<MemoryArbitrator> be read and updated by all the threads at once.

thread (sort op) ─┐
thread (agg op) ─┼──▶ Arc<MemoryArbitrator>
thread (join op) ─┤ limit, peak_rss, cumulative_spill : AtomicU64
thread (source) ─┘ consumers : ArcSwap<Vec<…>> (lock-free snapshot)
every thread holds &self; updates go through the atomics — no &mut, no global lock

Watch the signatures: these methods change the arbitrator’s state, yet every one takes &self. The mutation rides an atomic operation:

crates/clinker-exec/src/pipeline/memory.rs
pub fn observe(&self) {
if let Some(rss) = rss_bytes() {
self.peak_rss.fetch_max(rss, Ordering::Relaxed); // raise the high-water mark
}
}

The per-operator byte counter lives on a ConsumerHandle — an Arc-shared handle each operator holds — and charges/releases bytes the same lock-free way:

clinker-exec ·memory.rs ·ConsumerHandle type @47d2e12
pub fn add_bytes(&self, n: u64) { // &self — charge memory from any thread
let _ = self.bytes.fetch_update(Ordering::Relaxed, Ordering::Relaxed,
|cur| Some(cur.saturating_add(n)));
}
pub fn sub_bytes(&self, n: u64) { // &self — release it again
let _ = self.bytes.fetch_update(Ordering::Relaxed, Ordering::Relaxed,
|cur| Some(cur.saturating_sub(n)));
}

Why atomics rather than wrapping the whole thing in a Mutex? Because charging bytes happens on the hot path, per batch, from many threads. A fetch_add is a single lock-free CPU instruction; a Mutex would serialize every operator through one lock. And the operator registry uses ArcSwap for the same reason: the frequent readers (which operator is using what?) load an immutable snapshot with no lock, while the rare register/unregister clones the Vec and atomically swaps it in. Hot path lock-free; rare path copy-on-write.

Not all coordination is a counter. When the arbitrator decides to pause a source for backpressure, the source’s thread has to actually block until resumed — and busy-spinning would burn a core. That’s what a condition variable is for. The PauseSignal pairs an AtomicBool flag with a Mutex + Condvar:

clinker-exec ·memory.rs ·PauseSignal type @47d2e12
pub struct PauseSignal { paused: AtomicBool, mu: Mutex<()>, cv: Condvar }
impl PauseSignal {
pub fn resume(&self) {
self.paused.store(false, Ordering::Release);
self.cv.notify_all(); // wake every parked waiter
}
pub fn wait_while_paused(&self) {
if !self.is_paused() { return; } // lock-free fast path when not paused
let mut g = self.mu.lock().unwrap();
while self.is_paused() {
g = self.cv.wait(g).unwrap(); // park the thread — no CPU spin
}
}
}

A paused source calls wait_while_paused() and the OS puts its thread to sleep; resume() wakes it with notify_all. The AtomicBool fast path means the common “not paused” case costs nothing — no lock at all. Three interior-mutable primitives, each chosen for its access pattern: atomics for hot counters, ArcSwap for a mostly-read registry, Condvar for genuine blocking.

This is the whole idea in std: an Arc<AtomicU64> mutated by four threads through &self, no &mut and no lock anywhere:

rust // editable

Four threads, one counter, each charging through a shared &. The total is always 10000 — atomic fetch_add makes the increments race-free without any lock. Try swapping the AtomicU64 for a plain u64 behind the Arc and the code won’t compile: a bare u64 isn’t interior-mutable, so Arc gives you no way to write it. That compile error is Rust steering you to the right tool.

// quick check

Why are the MemoryArbitrator's counters AtomicU64 with &self methods, rather than plain u64 fields mutated through &mut self?

You’ve seen how the arbitrator stores shared state safely. Next: what it decides with it — spill, pause, or abort — and how it refuses an impossible budget before a single record moves.