Add a transform/operator
A format plugs into an open dyn seam (lesson 5.3). An operator is the opposite: node
kinds are a closed, engine-owned set (lesson 4.1), so adding one isn’t plugging into a
seam — it’s editing the closed set itself. That makes this the change that touches the most
places. The good news is that the engine’s two wildcard-free matches turn most of those
places into compile errors that hand you the to-do list.
You’ll be able to: trace the path from an authored YAML step to a running operator, name the four edit sites and which two the compiler forces, and explain why the engine keeps two mirrored node enums instead of one.
Two mirrored enums, one boundary
Section titled “Two mirrored enums, one boundary”The engine represents a pipeline node twice, on two sides of the plan/runtime boundary you met in Phase 3:
PipelineNode— the authored node, as it appears in YAML (type: transform,type: route, …). It lives in the config layer (clinker-plan), carries spans, and is deserialized with strict per-variant checks.PlanNode— the lowered node, the executor’s vocabulary (the closed enum from lesson 4.1). It carries resolved schemas, parallelism class, and other compile-time enrichment.
clinker-plan ·pipeline_node.rs ·PipelineNode type @47d2e12
// The authored side — one variant per YAML node `type:`.pub enum PipelineNode { Source { header: NodeHeader, config: SourceBody }, Transform { header: NodeHeader, config: TransformBody }, Aggregate { header: NodeHeader, config: AggregateBody }, Route { header: NodeHeader, config: RouteBody }, Merge { /* ... */ }, // ...11 authored node kinds} clinker-plan ·mod.rs ·PlanNode type @47d2e12
// The executor side — the closed set the DAG runs on.pub enum PlanNode { Source { name: String, /* ... */ }, Transform { name: String, /* resolved payload, parallelism, schema */ }, Sort { name: String, span: Span, sort_fields: Vec<SortField> }, Aggregation { /* ... */ }, // ...13 variants — a superset of PipelineNode}They are deliberately not identical. PipelineNode::Aggregate lowers to
PlanNode::Aggregation (the names differ), and PlanNode has variants like Sort and
CorrelationCommit that are planner-synthesized — injected during compilation, never
authored in YAML, so they have no PipelineNode counterpart. Teaching tip: build your
first operator as one that does round-trip from YAML (Transform, Route, Reshape, Cull)
so you exercise the whole path.
The path from YAML to a running operator
Section titled “The path from YAML to a running operator” YAML ──serde/strict──▶ PipelineNode::Route (authored, in clinker-plan/config) │ │ lower_node_to_plan_node (the lowering match) ▼ PlanNode::Route (lowered, goes into the DAG) │ │ graph.add_node(...) → ExecutionPlanDag → CompiledPlan ▼ dispatch_plan_node (the executor's exhaustive match) │ ▼ dispatch_route → the operator body (clinker-exec)The lowering match is one free function — config variant in, plan variant out — with no intermediate IR:
clinker-plan ·pipeline.rs ·lower_node_to_plan_node fn @47d2e12
pub(crate) fn lower_node_to_plan_node( node: &PipelineNode, name: &str, span: Span, artifacts: &CompileArtifacts, ctx: &LoweringCtx<'_>, diags: &mut Vec<Diagnostic>,) -> Option<PlanNode> { match node { PipelineNode::Route { config, .. } => Some(PlanNode::Route { name: name.to_string(), span, mode: config.mode, branches: config.conditions.keys().cloned().collect(), default: config.default.clone(), }), PipelineNode::Aggregate { config, .. } => Some(PlanNode::Aggregation { /* ... */ }), // ...one arm per authored variant, no `_ =>`. Returns None when an // earlier stage already errored on this node. }}This runs inside the compile step (Stage 5), once per authored node, building the DAG:
// PipelineConfig::compile_with_diagnostics, Phase 1: one PlanNode per node.for spanned in &self.nodes { let plan_node = lower_node_to_plan_node(node, &name, span, &artifacts, &lowering_ctx, &mut diags); if let Some(pn) = plan_node { let idx = graph.add_node(pn); // into DiGraph<PlanNode, PlanEdge> name_to_idx.insert(name, idx); }}The resulting DiGraph<PlanNode, _> becomes the ExecutionPlanDag inside CompiledPlan
— the typed handle from lesson 3.5. At run time the executor walks it and dispatches each
node through the wildcard-free match you saw in lesson 4.1:
clinker-exec ·dispatch.rs ·dispatch_plan_node fn @47d2e12
match node { PlanNode::Source { .. } => dispatch_source(ctx, current_dag, node_idx, &node)?, PlanNode::Transform { .. } => dispatch_transform(ctx, current_dag, node_idx, &node)?, PlanNode::Route { .. } => dispatch_route(ctx, current_dag, node_idx, &node)?, // ...one arm per PlanNode variant, no `_ =>`}The four edit sites
Section titled “The four edit sites”Adding an operator means a coordinated change across four places — and crucially, two of them are wildcard-free matches that will not compile until you add the arm:
| # | Site | File | Compiler-forced? |
|---|---|---|---|
| 1 | PipelineNode variant (+ Body struct, strict deserialize) | clinker-plan/src/config/pipeline_node.rs | — (you author it) |
| 2 | PlanNode variant | clinker-plan/src/plan/execution/mod.rs | — (you author it) |
| 3 | lower_node_to_plan_node arm + Phase-2 edge wiring | clinker-plan/src/config/pipeline.rs | yes (exhaustive match) |
| 4 | dispatch_plan_node arm → operator body | clinker-exec/src/executor/dispatch.rs (+ a *_dispatch.rs module) | yes (exhaustive match) |
Sites 1–3 are all in clinker-plan (config, plan, lowering); site 4 is in
clinker-exec (dispatch + the operator’s actual record logic). That split is the
plan/runtime boundary again: planning lowers and validates; execution runs. The operator’s
behavior — the code that actually transforms records — lives on the exec side, separate
from the dispatch plumbing (for the Transform operator, evaluate_single_transform).
Once you add variants 1 and 2, the two exhaustive matches break the build and point you at
sites 3 and 4. That’s the closed-enum payoff doing your change-management — the same
guarantee that makes a dyn Operator design worse here (a forgotten case there would be
a silent runtime no-op, not a compile error).
Two enums, two matches, in miniature
Section titled “Two enums, two matches, in miniature”Here’s the whole shape: an authored enum, a lowered enum, a lowering match, and a dispatch
match. Add a Dedup operator and both matches stop compiling until you add the arm:
> output appears here — press Run
Add Dedup to ConfigNode and PlanNode, and you get two compile errors — one at
lower, one at dispatch — naming exactly the two arms you still owe. That’s the
four-place change made safe.
Prove it end to end
Section titled “Prove it end to end” clinker-exec ·integration_tests.rs ·test_end_to_end_csv_transform test @47d2e12
#[test]fn test_end_to_end_csv_transform() { // Inline YAML: source -> transform -> transform -> output, CSV in. let (counters, dlq, output) = run_pipeline(yaml, csv).unwrap(); assert_eq!(counters.ok_count, 3); assert_eq!(counters.dlq_count, 0); // ...assert on the transformed output bytes}A new operator’s verify step is a test exactly like this: author a small YAML pipeline
that uses your type:, feed input, and assert on the output bytes — testing to the
boundary (lesson 5.1).
// quick check
After adding a PipelineNode::Dedup and a PlanNode::Dedup variant, you run cargo check. What does the compiler report, and why is that the design working as intended?
Both the lowering match (config→plan) and the dispatch match (plan→operator) have no _ => arm, so a new variant makes each non-exhaustive. The compiler names both sites — turning 'did I wire the operator everywhere?' into a build error instead of a runtime surprise. That's why node kinds are a closed enum, not a dyn seam.
Trace an operator
Section titled “Trace an operator”You can now extend all three of the engine’s extension points — expressions, formats, and operators. The last two lessons turn from making a change to landing one: the review gauntlet, then planning a change that respects the engine’s boundaries.