Add a reader/writer/format
In lesson 3.1 you met the IO seam from the outside: formats are an open, runtime-chosen
plug-in surface, so the engine reaches them through Box<dyn FormatReader>. Now you’re on
the inside, adding a format. This lesson shows the full change-set — and it pulls together
two earlier ideas: the dyn seam (3.1) is where your code plugs in, and the closed
exhaustive match (4.1) is what forces every dispatch site to acknowledge your new
format.
You’ll be able to: implement the FormatReader/FormatWriter traits, explain the
two-stage path from a YAML type: string to a concrete reader, and list the sites a new
format touches — and which one the compiler makes mandatory.
Two traits, two required methods each
Section titled “Two traits, two required methods each”A format is whatever can stream Records in and out. The contracts are tiny:
clinker-format ·traits.rs ·FormatReader trait @47d2e12
/// Streaming record reader. Yields records one at a time.////// `&mut self` on `schema()` because some formats (e.g. CSV) must read/// the first row to discover column names. Must be `Send` for executor/// ownership transfer; not `Sync` — single-threaded streaming.pub trait FormatReader: Send { fn schema(&mut self) -> Result<Arc<Schema>, FormatError>; fn next_record(&mut self) -> Result<Option<Record>, FormatError>; // ...plus a few defaulted methods for envelopes / multi-file readers} clinker-format ·traits.rs ·FormatWriter trait @47d2e12
/// Streaming record writer. Consumes records one at a time.pub trait FormatWriter: Send { fn write_record(&mut self, record: &Record) -> Result<(), FormatError>; fn flush(&mut self) -> Result<(), FormatError>; // ...plus defaulted begin_document / end_document for envelope framing}Two required methods on each, both fallible, both streaming. next_record returning
Ok(None) means end-of-stream. The : Send bound is the same one lesson 4.2 explained:
the executor moves readers onto worker threads, so a reader must be Send. A minimal new
format implements just those two-each methods; the defaulted ones (envelopes, multi-file)
are opt-in.
A concrete reader: CSV
Section titled “A concrete reader: CSV”Here’s the shape of a real implementation — CsvReader, the simplest complete one:
clinker-format ·reader.rs ·CsvReader type @47d2e12
pub struct CsvReader<R: Read> { inner: csv::Reader<SkipBom<R>>, schema: Option<Arc<Schema>>, // discovered from the header, cached config: CsvReaderConfig, row_count: u64, record_buf: csv::StringRecord,}
impl<R: Read + Send> FormatReader for CsvReader<R> { fn schema(&mut self) -> Result<Arc<Schema>, FormatError> { self.ensure_schema() // reads the header row once } fn next_record(&mut self) -> Result<Option<Record>, FormatError> { let schema = self.ensure_schema()?; if !self.inner.read_record(&mut self.record_buf)? { return Ok(None); // end of stream } let values = self.record_buf.iter().map(|f| Value::String(f.into())).collect(); Ok(Some(Record::new(schema, values))) }}That’s the whole pattern: hold the underlying byte source plus a cached Arc<Schema>,
and have next_record pull one row and emit Ok(Some(Record)) until the source is dry.
The impl is generic over R: Read + Send, not tied to files.
From a YAML string to your reader, in two stages
Section titled “From a YAML string to your reader, in two stages”A pipeline says type: csv in YAML. How does that string reach CsvReader? Not by a
string-match you write — it’s a two-stage hop, and understanding it tells you exactly what
to edit.
Stage 1 — string to enum variant, by serde. The config layer has a closed enum of
known formats, adjacently tagged so serde maps type: csv to a variant automatically:
clinker-plan ·format.rs ·InputFormat type @47d2e12
#[derive(Debug, Clone, Serialize, Deserialize)]#[serde(tag = "type", content = "options", rename_all = "snake_case")]pub enum InputFormat { Csv(Option<CsvInputOptions>), Json(Option<JsonInputOptions>), Xml(Option<XmlInputOptions>), FixedWidth(Option<FixedWidthInputOptions>), Edifact(Option<EdifactInputOptions>), X12(Option<X12InputOptions>), Hl7(Option<Hl7InputOptions>), Swift(Option<SwiftInputOptions>),}The #[serde(tag = "type", rename_all = "snake_case")] is what turns the string "csv"
into the variant InputFormat::Csv. You never write name-matching code.
Stage 2 — enum variant to boxed reader, by an exhaustive match. In the executor, one function matches the variant and returns the boxed trait object:
clinker-exec ·ingest.rs ·build_format_reader fn @47d2e12
fn build_format_reader( input: &clinker_plan::config::SourceConfig, source: ReopenableSource,) -> Result<Box<dyn FormatReader>, PipelineError> { match &input.format { InputFormat::Csv(opts) => Ok(Box::new(CsvReader::from_reader(/* ... */))), InputFormat::Json(opts) => Ok(Box::new(JsonReader::from_source(/* ... */)?)), InputFormat::Xml(opts) => { /* ... */ } // ...one arm per variant, NO `_ =>` catch-all }}This is the lesson-4.1 payoff in action: the match is wildcard-free, so the moment you
add a variant to InputFormat, this function stops compiling until you add its arm. The
open dyn seam (return type Box<dyn FormatReader>) and the closed enum dispatch
(match with no wildcard) cooperate — dyn lets your reader plug in; the exhaustive enum
makes the compiler hand you the list of sites to wire.
The exhaustive seam, in miniature
Section titled “The exhaustive seam, in miniature”Here’s the enum-match-to-trait-object pattern on its own. Add a Tsv variant to Format
and the build_reader match below stops compiling — the compiler names the exact site:
> output appears here — press Run
The return type Box<dyn Reader> is the open seam; the wildcard-free match is the
closed dispatch. That’s the same division of labor as the real build_format_reader.
The change-set, and the round-trip test
Section titled “The change-set, and the round-trip test”To add a format end-to-end:
- Add a variant (and its options struct) to
InputFormatand/orOutputFormatincrates/clinker-plan/src/config/format.rs, and add its lowercase name to theformat_name()match (also wildcard-free — the compiler reminds you). - Implement
FormatReader(and/orFormatWriter) in a new module undercrates/clinker-format/src/<fmt>/. - Add the dispatch arm in
build_format_reader(executor/ingest.rs) and/or the writer dispatch inexecutor/registry.rs. These are the compiler-mandatory edits.
Prove it with a round-trip: read a sample, write it back, read again, assert every field survived — exactly what the CSV test does:
clinker-format ·writer.rs ·test_csv_roundtrip_lossless test @47d2e12
#[test]fn test_csv_roundtrip_lossless() { let input = "name,age,active\nAlice,30,true\nBob,25,false\n"; // read -> records, write -> string, read again -> assert schemas + fields match}// quick check
You add an InputFormat::Tsv variant but forget to add its arm to build_format_reader. What does the compiler do?
The dispatch match has no _ => arm, so adding a variant breaks compilation until every dispatch site handles it. That's the closed-enum guarantee from lesson 4.1 doing your change-management for you — paired with the open Box return that lets your new reader plug in.
Wire one yourself
Section titled “Wire one yourself”You’ve extended the engine’s edges. Next: extend its middle — add a new operator to the execution DAG, the change that touches the most places.