Skip to content

Add a transform/operator

A format plugs into an open dyn seam (lesson 5.3). An operator is the opposite: node kinds are a closed, engine-owned set (lesson 4.1), so adding one isn’t plugging into a seam — it’s editing the closed set itself. That makes this the change that touches the most places. The good news is that the engine’s two wildcard-free matches turn most of those places into compile errors that hand you the to-do list.

You’ll be able to: trace the path from an authored YAML step to a running operator, name the four edit sites and which two the compiler forces, and explain why the engine keeps two mirrored node enums instead of one.

The engine represents a pipeline node twice, on two sides of the plan/runtime boundary you met in Phase 3:

  • PipelineNode — the authored node, as it appears in YAML (type: transform, type: route, …). It lives in the config layer (clinker-plan), carries spans, and is deserialized with strict per-variant checks.
  • PlanNode — the lowered node, the executor’s vocabulary (the closed enum from lesson 4.1). It carries resolved schemas, parallelism class, and other compile-time enrichment.
clinker-plan ·pipeline_node.rs ·PipelineNode type @47d2e12
// The authored side — one variant per YAML node `type:`.
pub enum PipelineNode {
Source { header: NodeHeader, config: SourceBody },
Transform { header: NodeHeader, config: TransformBody },
Aggregate { header: NodeHeader, config: AggregateBody },
Route { header: NodeHeader, config: RouteBody },
Merge { /* ... */ },
// ...11 authored node kinds
}
clinker-plan ·mod.rs ·PlanNode type @47d2e12
// The executor side — the closed set the DAG runs on.
pub enum PlanNode {
Source { name: String, /* ... */ },
Transform { name: String, /* resolved payload, parallelism, schema */ },
Sort { name: String, span: Span, sort_fields: Vec<SortField> },
Aggregation { /* ... */ },
// ...13 variants — a superset of PipelineNode
}

They are deliberately not identical. PipelineNode::Aggregate lowers to PlanNode::Aggregation (the names differ), and PlanNode has variants like Sort and CorrelationCommit that are planner-synthesized — injected during compilation, never authored in YAML, so they have no PipelineNode counterpart. Teaching tip: build your first operator as one that does round-trip from YAML (Transform, Route, Reshape, Cull) so you exercise the whole path.

YAML ──serde/strict──▶ PipelineNode::Route (authored, in clinker-plan/config)
│ lower_node_to_plan_node (the lowering match)
PlanNode::Route (lowered, goes into the DAG)
│ graph.add_node(...) → ExecutionPlanDag → CompiledPlan
dispatch_plan_node (the executor's exhaustive match)
dispatch_route → the operator body (clinker-exec)

The lowering match is one free function — config variant in, plan variant out — with no intermediate IR:

clinker-plan ·pipeline.rs ·lower_node_to_plan_node fn @47d2e12
pub(crate) fn lower_node_to_plan_node(
node: &PipelineNode, name: &str, span: Span,
artifacts: &CompileArtifacts, ctx: &LoweringCtx<'_>,
diags: &mut Vec<Diagnostic>,
) -> Option<PlanNode> {
match node {
PipelineNode::Route { config, .. } => Some(PlanNode::Route {
name: name.to_string(),
span,
mode: config.mode,
branches: config.conditions.keys().cloned().collect(),
default: config.default.clone(),
}),
PipelineNode::Aggregate { config, .. } => Some(PlanNode::Aggregation { /* ... */ }),
// ...one arm per authored variant, no `_ =>`. Returns None when an
// earlier stage already errored on this node.
}
}

This runs inside the compile step (Stage 5), once per authored node, building the DAG:

// PipelineConfig::compile_with_diagnostics, Phase 1: one PlanNode per node.
for spanned in &self.nodes {
let plan_node = lower_node_to_plan_node(node, &name, span, &artifacts, &lowering_ctx, &mut diags);
if let Some(pn) = plan_node {
let idx = graph.add_node(pn); // into DiGraph<PlanNode, PlanEdge>
name_to_idx.insert(name, idx);
}
}

The resulting DiGraph<PlanNode, _> becomes the ExecutionPlanDag inside CompiledPlan — the typed handle from lesson 3.5. At run time the executor walks it and dispatches each node through the wildcard-free match you saw in lesson 4.1:

clinker-exec ·dispatch.rs ·dispatch_plan_node fn @47d2e12
match node {
PlanNode::Source { .. } => dispatch_source(ctx, current_dag, node_idx, &node)?,
PlanNode::Transform { .. } => dispatch_transform(ctx, current_dag, node_idx, &node)?,
PlanNode::Route { .. } => dispatch_route(ctx, current_dag, node_idx, &node)?,
// ...one arm per PlanNode variant, no `_ =>`
}

Adding an operator means a coordinated change across four places — and crucially, two of them are wildcard-free matches that will not compile until you add the arm:

#SiteFileCompiler-forced?
1PipelineNode variant (+ Body struct, strict deserialize)clinker-plan/src/config/pipeline_node.rs— (you author it)
2PlanNode variantclinker-plan/src/plan/execution/mod.rs— (you author it)
3lower_node_to_plan_node arm + Phase-2 edge wiringclinker-plan/src/config/pipeline.rsyes (exhaustive match)
4dispatch_plan_node arm → operator bodyclinker-exec/src/executor/dispatch.rs (+ a *_dispatch.rs module)yes (exhaustive match)

Sites 1–3 are all in clinker-plan (config, plan, lowering); site 4 is in clinker-exec (dispatch + the operator’s actual record logic). That split is the plan/runtime boundary again: planning lowers and validates; execution runs. The operator’s behavior — the code that actually transforms records — lives on the exec side, separate from the dispatch plumbing (for the Transform operator, evaluate_single_transform).

Once you add variants 1 and 2, the two exhaustive matches break the build and point you at sites 3 and 4. That’s the closed-enum payoff doing your change-management — the same guarantee that makes a dyn Operator design worse here (a forgotten case there would be a silent runtime no-op, not a compile error).

Here’s the whole shape: an authored enum, a lowered enum, a lowering match, and a dispatch match. Add a Dedup operator and both matches stop compiling until you add the arm:

rust // editable

Add Dedup to ConfigNode and PlanNode, and you get two compile errors — one at lower, one at dispatch — naming exactly the two arms you still owe. That’s the four-place change made safe.

clinker-exec ·integration_tests.rs ·test_end_to_end_csv_transform test @47d2e12
#[test]
fn test_end_to_end_csv_transform() {
// Inline YAML: source -> transform -> transform -> output, CSV in.
let (counters, dlq, output) = run_pipeline(yaml, csv).unwrap();
assert_eq!(counters.ok_count, 3);
assert_eq!(counters.dlq_count, 0);
// ...assert on the transformed output bytes
}

A new operator’s verify step is a test exactly like this: author a small YAML pipeline that uses your type:, feed input, and assert on the output bytes — testing to the boundary (lesson 5.1).

// quick check

After adding a PipelineNode::Dedup and a PlanNode::Dedup variant, you run cargo check. What does the compiler report, and why is that the design working as intended?

You can now extend all three of the engine’s extension points — expressions, formats, and operators. The last two lessons turn from making a change to landing one: the review gauntlet, then planning a change that respects the engine’s boundaries.