Chapter 9b: Knowledge-Gathering Pipelines

What You’ll Learn

  • Growing context — thread accumulated knowledge forward as an ever-richer record, so the type between pipeline steps is a compiler-checked proof of progress
  • The mapWith family (mapWith / flatMapWith / ensureWith, Pragmatica Core 1.0.0-rc1) — turn each stage into one lambda-free line
  • Gating vs. evidence — the one place a fallible check may discard its result, and when it must accrete it instead

Prerequisites: Chapter 9: Advanced Patterns, Chapter 5: Parse, Don’t Validate


Chapter 2 framed every backend process as knowledge gathering: each step acquires a piece of knowledge, and the process ends — successfully or not — when enough has accumulated to answer the caller. That chapter sketched a process’s shape as a data dependency graph. It left one question open, the question every pipeline design must eventually answer:

What concrete data structure flows between the steps?

Chapter 5 supplies half the answer: parse, don’t validate — wrap input at the boundary into types whose existence proves validity. This chapter develops the other half to the same depth: how to design the records that travel between steps so each carries exactly the knowledge gathered so far, and how the context-preserving combinators added in Pragmatica Core 1.0.0-rc1 (mapWith, flatMapWith, ensureWith) collapse those records into one-line stages.

The discipline this produces — each step receiving the prior context, adding what it learned, and passing an enriched context forward — is what the methodology calls growing context. This chapter is its concrete realization: the records that carry it, and the combinators that keep each stage to one line.

The Problem: What Flows Between Steps?

A use case rarely answers from its input alone. A typical flow validates the request, loads a profile, checks an entitlement, fetches related data, and only then builds the response. The question every pipeline design must answer: what travels between the steps?

The obvious answers all share one defect. A mutable context bag, passing only what each step needs, a god-record of Option fields — each lets invalid pipeline states compile, because nothing in the types says which knowledge is present at which point. Reordering or skipping a step then fails at runtime instead of at compile time. (Why not the obvious alternatives?, near the end, dissects each; what matters here is what they have in common.)

The requirement they miss: at every point in the pipeline, the type should express exactly the knowledge gathered so far — no more, no less. This is “make invalid states unrepresentable” applied to pipeline progress.

The Data Structure: Growing Context

Growing context is built from stage records — small records that each pair the previous stage with the knowledge just gained, so the composed type is the running proof of how far the pipeline has gotten.

First attempt: concrete chaining

The direct translation of “each stage = previous stage + new knowledge”:

record ValidRequest(UserId userId) {}
record WithProfile(ValidRequest request, Profile profile) {}
record WithArticles(WithProfile request, List<Article> articles) {}

This already delivers the core guarantee — holding a WithArticles proves the profile and articles were gathered, in that order. But WithArticles is welded to one pipeline shape: it can only ever follow WithProfile. A stage that gathers articles after a different predecessor needs a new record.

The refinement: the previous stage as a type parameter

record ValidRequest(UserId userId) {}
record UserProfile<T>(T request, Profile profile) {}
record UserArticles<T>(T request, List<Article> articles) {}

Two things change, both significant.

First, stage records become pipeline-agnostic. UserProfile<T> says “whatever was known before, plus a profile” — it composes after any predecessor, so the same stage record serves every flow that gathers a profile.

Second, the composed type is the progress proof. A value of type

UserArticles<UserProfile<ValidRequest>>

is a machine-checked statement: the request was validated, then a profile was gathered, then articles. Reordering stages, skipping one, or consuming knowledge before it exists is a compile error, not a runtime surprise. The compiler tracks pipeline progress for you.

Conventions

  • Component name request for the previous-stage reference, uniformly. Access chains then read mechanically: a.request().request().userId().
  • The canonical constructor is the stage factory. Its shape — (T previous, Knowledge gathered) — is not an accident; it is the exact shape the pipeline combinators expect (next section). Keep it.
  • A validating stage adds a static factory returning Result, following the value-object convention (QuotaScoped.quotaScoped(...) below).

Reaching back: access patterns

For shallow pipelines (the common case — three to five stages), request() chains are fine and self-documenting.

When an operation needs several pieces of accumulated knowledge and should remain reusable across pipelines, bound the container with capability interfaces instead of naming a concrete stage type:

interface HasUserId { UserId userId(); }

record UserProfile<T extends HasUserId>(T request, Profile profile) implements HasUserId {
    @Override
    public UserId userId() { return request.userId(); }
}

The forwarding implementation makes the fact reachable in one call at any depth, and an operation declared as <T extends HasUserId> Promise<Enriched<T>> enrich(T container) composes into any pipeline that has gathered a user id — without knowing anything else about it.

Fitting the Pipeline: the mapWith Family

The stage shape, and what it used to cost

Every knowledge-gathering stage has the same three-part shape: take some of the accumulated knowledge, run an operation on it, combine the operation’s output with the original container into the next stage. Before 1.0.0-rc1, that shape forced a nested lambda with a captured binding:

// BEFORE: nesting + capture, just to keep the container alive
validRequest.flatMap(valid -> profiles.fetch(valid.userId())
                                      .map(profile -> new UserProfile<>(valid, profile)));

// AFTER: source accessor + operation + next-stage constructor
validRequest.mapWith(ValidRequest::userId, profiles::fetch, UserProfile::new);

The six combinators

Each of Result, Option, and Promise provides the same six methods:

// whole-object forms — operation sees the full container
mapWith(operation, factory)       // op: T -> M<B>,  factory: (T, B) -> U   (pure)
flatMapWith(operation, factory)   // op: T -> M<B>,  factory: (T, B) -> M<U> (may fail)
ensureWith(operation)             // op: T -> M<B>;  B discarded, T continues

// field-scoped forms — getter projects, operation sees only the projection
mapWith(getter, operation, factory)
flatMapWith(getter, operation, factory)
ensureWith(getter, operation)
Combinator Operation result Factory On operation failure/empty
mapWith passed to factory together with the original container pure propagates; factory not invoked
flatMapWith same may fail; its failure propagates propagates; factory not invoked
ensureWith discarded — chain continues with the original container propagates

The semantics carry across the three monads mechanically: on Option, “failure” means empty and propagates as empty; on Promise, the chain genuinely awaits the operation before proceeding.

Why the factory slot fits the data structure

Look at the factory’s shape: (T, B) -> U. That is exactly the canonical constructor of a stage record — (previous container, new knowledge) -> next stage. The data structure and the combinator family are two halves of one design: a well-shaped stage is a single line of method references, with no lambda bodies at all:

.mapWith(ValidRequest::userId, profiles::fetch, UserProfile::new)
//        ^ which knowledge      ^ what to do     ^ what the next stage looks like

A complete pipeline

Promise<Response> handle(Request raw) {
    return ValidRequest.validRequest(raw)                                  // Result<ValidRequest>
        .async()                                                           // → Promise<ValidRequest>
        .mapWith(ValidRequest::userId, profiles::fetch, UserProfile::new)  // Promise<UserProfile<ValidRequest>>
        .ensureWith(p -> p.request().userId(), rateLimiter::check)         // transient gate; container unchanged
        .mapWith(p -> p.request().userId(), articles::byAuthor, UserArticles::new)
                                              // Promise<UserArticles<UserProfile<ValidRequest>>>
        .map(Response::from);                                              // knowledge → answer
}

Note where the coupling lives. profiles.fetch(UserId) and articles.byAuthor(UserId) are written against their narrow natural inputs and know nothing about containers or pipelines; the projection happens in the getter slot, one expression per stage, at the wiring site. The same operation composes into any pipeline whose container can produce a UserId.

Gating stages: ensureWith

Some stages have a success/failure outcome that must gate the pipeline but produce nothing a later stage reads. A rate-limit check, a fire-and-forget audit write, a notification: the outcome is knowledge — “the request may proceed” — but it is transient, consumed by the act of continuing rather than carried forward. No later step will ask for proof, so there is nothing to accrete. That is ensureWith:

.ensureWith(p -> p.request().userId(), rateLimiter::check)

The operation must succeed (failure fails the chain), its result is discarded, and the container flows through untouched. Contrast with onSuccess: a Consumer that cannot fail and — on Promise — does not gate the chain. ensureWith is the fallible, gating counterpart that was previously missing; before it, these stages either lost the container or ended in the .map(ignored -> original) shuffle.

ensureWith is where parse-don’t-validate runs out: the one place a fallible check legitimately yields no type, precisely because nothing downstream will ask for proof. The moment a later step does need the fact, that is the signal the operation should return evidence rather than pass/fail. An audit write whose confirmation is consumed downstream is then a mapWith, not an ensureWith:

.mapWith(p -> p.request().userId(), audit::write, Audited::new)   // record Audited<T>(T request, AuditId id)

The same audit operation lands in two homes, decided by whether anyone reads the id. A load-bearing outcome always carries a value to accrete; a pass/fail with no value is, by that token, transient — a load-bearing check that returns only a boolean is itself the parse-don’t-validate anti-pattern.

Validating stage constructors: flatMapWith

Parse-don’t-validate applies at stage boundaries too. When entering the next stage is itself conditional on the gathered knowledge, give the stage record a validating factory and use flatMapWith:

record QuotaScoped<T>(T request, Quota quota) {
    static <T> Result<QuotaScoped<T>> quotaScoped(T request, Quota quota) {
        return Verify.ensure(quota, q -> q.remaining() > 0, QUOTA_EXHAUSTED)
                     .map(q -> new QuotaScoped<>(request, q));
    }
}

validRequest.flatMapWith(ValidRequest::userId, quotas::lookup, QuotaScoped::quotaScoped);

Operation failure and constructor rejection both propagate as the chain’s failure. The result is a pipeline in which holding a QuotaScoped<...> proves quota was available — the progress-proof property extends to business conditions, not just data presence.

Stages that need several facts: the whole-object forms

When one projection is not enough, drop the getter and let the operation reach through the container:

.mapWith(p -> recommender.suggest(p.request().userId(), p.profile().interests()),
         Recommended::new)

For cross-pipeline reuse of such operations, prefer capability bounds (HasUserId, HasProfile) over concrete container types, as shown earlier.

What mapWith is NOT for

Multi-projection decomposition stays with all(...). There are deliberately no mapWith2(getter1, getter2, ...) arities. When a stage decomposes one value into several fallible projections, that is the instance all(...)MapperN job.

Independent parallel operations stay with Fork-Join. mapWith runs one operation. When a stage gathers several independent pieces concurrently, fork-join them and accrete the results in one step — the identity projection keeps the container in play:

.all(Promise::success,                              // keep the container itself
     v -> profiles.fetch(v.userId()),               // independent fetch 1
     v -> preferences.fetch(v.userId()))            // independent fetch 2
.map(Enriched::new)                                 // (ValidRequest, Profile, Prefs) -> Enriched<...>

with record Enriched<T>(T request, Profile profile, Prefs prefs) — the canonical-constructor convention extends naturally to stages that gather more than one fact at once.

Design notes

  • The operation is always effectful (returns the carrier). A pure operation needs no combinator: map(t -> factory(t, f(t.field()))) already covers it.
  • Purity lives in the method name, not in overloads. Fn1<B, A> and Fn1<Result<B>, A> erase identically, so same-name pure/effectful overloads would be ambiguous for implicitly-typed lambdas. Hence mapWith (pure factory) vs flatMapWith (fallible factory); only arity (whole-object vs field-scoped) overloads safely.

Why not the obvious alternatives?

Three designs reach for the space between steps before the accreting record does. Each fails the express exactly the knowledge so far requirement in its own way:

Design What it is Why it fails
Mutable context bag a ProcessingContext (or Map<String, Object>) whose fields steps populate as they go the type is blind to progress — reordering steps compiles and fails at runtime; mutability poisons Fork-Join thread safety; every reader must reconstruct the temporal protocol by hand
Pass only what’s needed each step takes exactly its input, returns exactly its output clean in isolation, but earlier knowledge is gone — a late step needing an early value must re-fetch it, thread it through every intermediate signature, or smuggle it in a field
God-record of options one record with Option<Profile>, Option<List<Article>>, … invalid states are representable again — a consumer cannot tell “not gathered yet” from “none exists”; the context bag’s temporal coupling, with extra ceremony

The accreting stage record removes the defect at its root: the type itself advances with the pipeline, so “knowledge not yet gathered” is not a value a later step can hold — it is a state that does not compile.

Boundaries and Trade-offs

Type-parameter growth. Five stages of accretion produce E<D<C<B<A>>>>-shaped types. Within a single use case this is rarely written out — local inference (var) and the final .map(Response::from) absorb it. When a composed type does need naming (a step interface boundary, a sub-pipeline seam), that is the signal to flatten into a named milestone record:

record EnrichedOrder(ValidRequest request, Profile profile, List<Article> articles) {}

Rule of thumb: accrete generically within a use case; flatten to a named record at boundaries other code must speak about.

Deep request() chains. Two hops read fine; three or more is the same flattening signal — or a capability interface if the depth comes from operation reuse rather than pipeline length.

When not to use knowledge gathering at all. A two-step pipeline where each step needs only the previous step’s output is a plain Sequencer — flatMap is the right tool, and a stage record would be ceremony. Reach for accretion when a later step needs earlier knowledge; that is the defining trigger.

Cost. Stage records are shallow wrappers holding references — no data is copied, and the per-stage allocation is one small object. The structure is also immutable by construction, so accreted containers cross Fork-Join boundaries safely.

Reordering. Changing stage order changes the composed type and breaks exactly the expressions that depended on the old order. This is the design working as intended: the compiler, not the test suite, reports pipeline-shape mistakes.

Testing

Stage records with canonical constructors need no dedicated tests — there is nothing to get wrong. Validating stage factories are tested like value objects: success plus one test per rejection rule. Pipelines are tested per the evolutionary strategy (Chapters 11–12): stub the step operations, assert the happy path, then one test per failing step — including the ensureWith stages, whose failure must fail the chain, and whose success must leave the container untouched (assert the same instance flows through).

Key Takeaways

  • Growing context: each pipeline stage’s container = **previous container as a type parameter
    • new knowledge**, so the composed type is a compiler-checked proof of pipeline progress.
  • The canonical constructor (T, Knowledge) of a stage record is exactly the factory shape mapWith/flatMapWith expect — a well-shaped stage is one lambda-free line: getter, operation, constructor.
  • Operations take their narrow natural input; the getter slot couples them to the pipeline at the wiring site only.
  • ensureWith = transient gates whose outcome no later step reads (gate, discard, pass through); flatMapWith = validating stage constructors; whole-object forms + capability bounds = stages needing several facts. A load-bearing outcome returns evidence and is accreted, not gated away.
  • Multi-projection decomposition belongs to all(...); independent parallel gathering belongs to Fork-Join with an accreting combine step.
  • Accrete within a use case; flatten to named milestone records at boundaries; a deep request() chain is the flattening signal.

Exercises

  1. Reorder and watch it break. Take the three-stage pipeline (ValidRequest → profile → articles). Swap the profile and articles stages. Before compiling, predict which expression the compiler will reject and why. Confirm.
  2. Gate or evidence? For each check — a rate limiter, an entitlement lookup whose tier a later step reads, a fire-and-forget audit, an idempotency-key reservation — decide whether it is ensureWith (transient) or mapWith/flatMapWith (accreted), and name the value the accreted ones carry.
  3. Capability bound. Rewrite an operation that reaches p.request().userId() so it accepts <T extends HasUserId> instead of a concrete stage type, and show it composing into a second pipeline that gathered the user id by a different route.

What’s Next

Chapter 10 consolidates the thread-safety rules across JBCT patterns — including why the immutable, reference-only stage records of this chapter cross Fork-Join boundaries safely.