Skip to content
Built by Postindustria. We help teams build agentic production systems.

6. Full Pipeline

The previous walkthroughs showed each NeoGraph feature in isolation. Real pipelines combine several at once. This walkthrough shows a requirement verification system that uses Oracle ensemble, sub-construct isolation, Each fan-out, Operator interrupt, and mixed modes — all wired together.

The pipeline:

  1. decompose — LLM breaks a requirement into claims (produce mode, Oracle ensemble x3)
  2. enrich — sub-pipeline that looks up context and scores each claim (sub-construct)
  3. cluster — group claims by theme
  4. verify — verify each cluster in parallel (Each fan-out)
  5. check-results — pause if any cluster failed (Operator interrupt)
  6. report — format the final output

This example uses @node for the top-level decompose (with Oracle kwargs) but drops to declarative Node + Construct for the rest. Why?

The sub-construct (enrich) needs an explicit I/O boundary: input=Claims, output=ScoredClaims. construct_from_module walks a whole module into one flat Construct — it can’t inline a sub-pipeline with its own isolated state. That’s a legitimate boundary case where the declarative form is the right tool.

For the other scripted nodes, declarative stays for consistency with the sub-construct. In a pipeline without sub-constructs, every node here could be @node-decorated.

from pydantic import BaseModel
class Claims(BaseModel, frozen=True):
items: list[str]
class Context(BaseModel, frozen=True):
references: list[str]
class ScoredClaims(BaseModel, frozen=True):
scored: list[dict[str, str]]
class ClusterGroup(BaseModel, frozen=True):
label: str
claim_ids: list[str]
class Clusters(BaseModel, frozen=True):
groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True):
cluster_label: str
passed: bool
gaps: list[str]
class ValidationResult(BaseModel, frozen=True):
passed: bool
issues: list[str]
class Report(BaseModel, frozen=True):
text: str
from neograph import (
Construct, Each, Node, Operator,
compile, node, register_condition, register_scripted,
)
from langgraph.checkpoint.memory import MemorySaver
# Step 1 — @node with Oracle kwargs for LLM ensemble
@node(output=Claims, prompt="decompose", model="fast",
llm_config={"temperature": 0.8},
ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims: ...
# Step 2 — sub-construct with isolated state
enrich = Construct(
"enrich",
input=Claims,
output=ScoredClaims,
nodes=[
Node.scripted("lookup", fn="lookup_context", input=Claims, output=Context),
Node.scripted("score", fn="score_claims", input=Claims, output=ScoredClaims),
],
)
# Step 3 — scripted clustering
cluster = Node.scripted("cluster", fn="make_clusters", output=Clusters)
# Step 4 — Each fan-out over the clusters
verify = Node.scripted(
"verify", fn="verify_cluster", input=ClusterGroup, output=VerifyResult
) | Each(over="cluster.groups", key="label")
# Step 5 — Operator pauses if any cluster failed
check_results = Node.scripted(
"check-results", fn="check_passed", output=ValidationResult
) | Operator(when="needs_review")
# Step 6 — final report
report = Node.scripted("report", fn="build_report", output=Report)
# Assembly
pipeline = Construct(
"full-verification",
description="End-to-end requirement verification",
nodes=[decompose, enrich, cluster, verify, check_results, report],
)

Register the scripted functions and the Operator condition:

register_scripted("merge_claims", lambda variants, config: ...) # Oracle merge
register_scripted("lookup_context", lambda input_data, config: ...)
register_scripted("score_claims", lambda input_data, config: ...)
register_scripted("make_clusters", lambda input_data, config: ...)
register_scripted("verify_cluster", lambda input_data, config: ...)
register_scripted("check_passed", lambda input_data, config: ...)
register_scripted("build_report", lambda input_data, config: ...)
register_condition("needs_review", lambda state: (
{"issues": state.check_results.issues}
if state.check_results and not state.check_results.passed
else None
))
# Checkpointer is required because Operator is in the pipeline
graph = compile(pipeline, checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "full-001"}}
result = run(graph, input={"node_id": "REQ-FULL-001"}, config=config)
print(result["decompose"].items)
# ['shall authenticate', 'shall log', 'shall encrypt']
print(list(result["verify"].keys()))
# ['security', 'observability']

If check_results finds a failing cluster, the graph pauses:

if "__interrupt__" in result:
print(result["__interrupt__"][0].value)
# {'issues': ['observability cluster failed']}
# Human reviews, approves, we resume
result = run(graph, resume={"approved": True}, config=config)
print(result["report"].text)
  • @node with Oracle kwargsensemble_n=3, merge_fn='merge_claims' in one line, no pipe operator needed for the primary path
  • Sub-constructsConstruct(input=X, output=Y) creates an isolated state boundary; the parent treats it as a single typed node
  • Each fan-out| Each(over='cluster.groups', key='label') runs verify once per cluster in parallel
  • Operator interrupt| Operator(when='needs_review') pauses the graph when the condition payload is truthy
  • Mixed @node + declarative — both forms compose into one Construct(nodes=[...]); the compiler treats them uniformly
  • Checkpointer requirement — any pipeline with Operator must pass a checkpointer to compile()
  • Observability — every node emits structured logs; call_count tracking shows where the LLM/tools were called

This pipeline uses two of the three NeoGraph API surfaces:

  • @node for the LLM ensemble step (the clean, modern path)
  • Node + Construct + | for the sub-construct and scripted modifier chains (the IR-level path)

Both produce the same internal representation and compile to the same LangGraph StateGraph. You can mix them freely within one pipeline. See Runtime Construction for when an LLM or config system builds pipelines programmatically.


Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.