Skip to content

Add a reader/writer/format

In lesson 3.1 you met the IO seam from the outside: formats are an open, runtime-chosen plug-in surface, so the engine reaches them through Box<dyn FormatReader>. Now you’re on the inside, adding a format. This lesson shows the full change-set — and it pulls together two earlier ideas: the dyn seam (3.1) is where your code plugs in, and the closed exhaustive match (4.1) is what forces every dispatch site to acknowledge your new format.

You’ll be able to: implement the FormatReader/FormatWriter traits, explain the two-stage path from a YAML type: string to a concrete reader, and list the sites a new format touches — and which one the compiler makes mandatory.

A format is whatever can stream Records in and out. The contracts are tiny:

clinker-format ·traits.rs ·FormatReader trait @47d2e12
/// Streaming record reader. Yields records one at a time.
///
/// `&mut self` on `schema()` because some formats (e.g. CSV) must read
/// the first row to discover column names. Must be `Send` for executor
/// ownership transfer; not `Sync` — single-threaded streaming.
pub trait FormatReader: Send {
fn schema(&mut self) -> Result<Arc<Schema>, FormatError>;
fn next_record(&mut self) -> Result<Option<Record>, FormatError>;
// ...plus a few defaulted methods for envelopes / multi-file readers
}
clinker-format ·traits.rs ·FormatWriter trait @47d2e12
/// Streaming record writer. Consumes records one at a time.
pub trait FormatWriter: Send {
fn write_record(&mut self, record: &Record) -> Result<(), FormatError>;
fn flush(&mut self) -> Result<(), FormatError>;
// ...plus defaulted begin_document / end_document for envelope framing
}

Two required methods on each, both fallible, both streaming. next_record returning Ok(None) means end-of-stream. The : Send bound is the same one lesson 4.2 explained: the executor moves readers onto worker threads, so a reader must be Send. A minimal new format implements just those two-each methods; the defaulted ones (envelopes, multi-file) are opt-in.

Here’s the shape of a real implementation — CsvReader, the simplest complete one:

clinker-format ·reader.rs ·CsvReader type @47d2e12
pub struct CsvReader<R: Read> {
inner: csv::Reader<SkipBom<R>>,
schema: Option<Arc<Schema>>, // discovered from the header, cached
config: CsvReaderConfig,
row_count: u64,
record_buf: csv::StringRecord,
}
impl<R: Read + Send> FormatReader for CsvReader<R> {
fn schema(&mut self) -> Result<Arc<Schema>, FormatError> {
self.ensure_schema() // reads the header row once
}
fn next_record(&mut self) -> Result<Option<Record>, FormatError> {
let schema = self.ensure_schema()?;
if !self.inner.read_record(&mut self.record_buf)? {
return Ok(None); // end of stream
}
let values = self.record_buf.iter().map(|f| Value::String(f.into())).collect();
Ok(Some(Record::new(schema, values)))
}
}

That’s the whole pattern: hold the underlying byte source plus a cached Arc<Schema>, and have next_record pull one row and emit Ok(Some(Record)) until the source is dry. The impl is generic over R: Read + Send, not tied to files.

From a YAML string to your reader, in two stages

Section titled “From a YAML string to your reader, in two stages”

A pipeline says type: csv in YAML. How does that string reach CsvReader? Not by a string-match you write — it’s a two-stage hop, and understanding it tells you exactly what to edit.

Stage 1 — string to enum variant, by serde. The config layer has a closed enum of known formats, adjacently tagged so serde maps type: csv to a variant automatically:

clinker-plan ·format.rs ·InputFormat type @47d2e12
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "options", rename_all = "snake_case")]
pub enum InputFormat {
Csv(Option<CsvInputOptions>),
Json(Option<JsonInputOptions>),
Xml(Option<XmlInputOptions>),
FixedWidth(Option<FixedWidthInputOptions>),
Edifact(Option<EdifactInputOptions>),
X12(Option<X12InputOptions>),
Hl7(Option<Hl7InputOptions>),
Swift(Option<SwiftInputOptions>),
}

The #[serde(tag = "type", rename_all = "snake_case")] is what turns the string "csv" into the variant InputFormat::Csv. You never write name-matching code.

Stage 2 — enum variant to boxed reader, by an exhaustive match. In the executor, one function matches the variant and returns the boxed trait object:

clinker-exec ·ingest.rs ·build_format_reader fn @47d2e12
fn build_format_reader(
input: &clinker_plan::config::SourceConfig,
source: ReopenableSource,
) -> Result<Box<dyn FormatReader>, PipelineError> {
match &input.format {
InputFormat::Csv(opts) => Ok(Box::new(CsvReader::from_reader(/* ... */))),
InputFormat::Json(opts) => Ok(Box::new(JsonReader::from_source(/* ... */)?)),
InputFormat::Xml(opts) => { /* ... */ }
// ...one arm per variant, NO `_ =>` catch-all
}
}

This is the lesson-4.1 payoff in action: the match is wildcard-free, so the moment you add a variant to InputFormat, this function stops compiling until you add its arm. The open dyn seam (return type Box<dyn FormatReader>) and the closed enum dispatch (match with no wildcard) cooperate — dyn lets your reader plug in; the exhaustive enum makes the compiler hand you the list of sites to wire.

Here’s the enum-match-to-trait-object pattern on its own. Add a Tsv variant to Format and the build_reader match below stops compiling — the compiler names the exact site:

rust // editable

The return type Box<dyn Reader> is the open seam; the wildcard-free match is the closed dispatch. That’s the same division of labor as the real build_format_reader.

To add a format end-to-end:

  1. Add a variant (and its options struct) to InputFormat and/or OutputFormat in crates/clinker-plan/src/config/format.rs, and add its lowercase name to the format_name() match (also wildcard-free — the compiler reminds you).
  2. Implement FormatReader (and/or FormatWriter) in a new module under crates/clinker-format/src/<fmt>/.
  3. Add the dispatch arm in build_format_reader (executor/ingest.rs) and/or the writer dispatch in executor/registry.rs. These are the compiler-mandatory edits.

Prove it with a round-trip: read a sample, write it back, read again, assert every field survived — exactly what the CSV test does:

clinker-format ·writer.rs ·test_csv_roundtrip_lossless test @47d2e12
#[test]
fn test_csv_roundtrip_lossless() {
let input = "name,age,active\nAlice,30,true\nBob,25,false\n";
// read -> records, write -> string, read again -> assert schemas + fields match
}

// quick check

You add an InputFormat::Tsv variant but forget to add its arm to build_format_reader. What does the compiler do?

You’ve extended the engine’s edges. Next: extend its middle — add a new operator to the execution DAG, the change that touches the most places.