6. Full Pipeline
The previous walkthroughs showed each NeoGraph feature in isolation. Real pipelines combine several at once. This walkthrough shows a requirement verification system that uses Oracle ensemble, sub-construct isolation, Each fan-out, Operator interrupt, and mixed modes — all wired together.
The pipeline:
- decompose — LLM breaks a requirement into claims (produce mode, Oracle ensemble x3)
- enrich — sub-pipeline that looks up context and scores each claim (sub-construct)
- cluster — group claims by theme
- verify — verify each cluster in parallel (Each fan-out)
- check-results — pause if any cluster failed (Operator interrupt)
- report — format the final output
The mixed API question
Section titled “The mixed API question”This example uses @node for the top-level decompose (with Oracle kwargs) but drops to declarative Node + Construct for the rest. Why?
The sub-construct (enrich) needs an explicit I/O boundary: input=Claims, output=ScoredClaims. construct_from_module walks a whole module into one flat Construct — it can’t inline a sub-pipeline with its own isolated state. That’s a legitimate boundary case where the declarative form is the right tool.
For the other scripted nodes, declarative stays for consistency with the sub-construct. In a pipeline without sub-constructs, every node here could be @node-decorated.
The schemas
Section titled “The schemas”from pydantic import BaseModel
class Claims(BaseModel, frozen=True): items: list[str]
class Context(BaseModel, frozen=True): references: list[str]
class ScoredClaims(BaseModel, frozen=True): scored: list[dict[str, str]]
class ClusterGroup(BaseModel, frozen=True): label: str claim_ids: list[str]
class Clusters(BaseModel, frozen=True): groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True): cluster_label: str passed: bool gaps: list[str]
class ValidationResult(BaseModel, frozen=True): passed: bool issues: list[str]
class Report(BaseModel, frozen=True): text: strThe pipeline
Section titled “The pipeline”from neograph import ( Construct, Each, Node, Operator, compile, node, register_condition, register_scripted,)from langgraph.checkpoint.memory import MemorySaver
# Step 1 — @node with Oracle kwargs for LLM ensemble@node(output=Claims, prompt="decompose", model="fast", llm_config={"temperature": 0.8}, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: ...
# Step 2 — sub-construct with isolated stateenrich = Construct( "enrich", input=Claims, output=ScoredClaims, nodes=[ Node.scripted("lookup", fn="lookup_context", input=Claims, output=Context), Node.scripted("score", fn="score_claims", input=Claims, output=ScoredClaims), ],)
# Step 3 — scripted clusteringcluster = Node.scripted("cluster", fn="make_clusters", output=Clusters)
# Step 4 — Each fan-out over the clustersverify = Node.scripted( "verify", fn="verify_cluster", input=ClusterGroup, output=VerifyResult) | Each(over="cluster.groups", key="label")
# Step 5 — Operator pauses if any cluster failedcheck_results = Node.scripted( "check-results", fn="check_passed", output=ValidationResult) | Operator(when="needs_review")
# Step 6 — final reportreport = Node.scripted("report", fn="build_report", output=Report)
# Assemblypipeline = Construct( "full-verification", description="End-to-end requirement verification", nodes=[decompose, enrich, cluster, verify, check_results, report],)Register the scripted functions and the Operator condition:
register_scripted("merge_claims", lambda variants, config: ...) # Oracle mergeregister_scripted("lookup_context", lambda input_data, config: ...)register_scripted("score_claims", lambda input_data, config: ...)register_scripted("make_clusters", lambda input_data, config: ...)register_scripted("verify_cluster", lambda input_data, config: ...)register_scripted("check_passed", lambda input_data, config: ...)register_scripted("build_report", lambda input_data, config: ...)
register_condition("needs_review", lambda state: ( {"issues": state.check_results.issues} if state.check_results and not state.check_results.passed else None))Running the pipeline
Section titled “Running the pipeline”# Checkpointer is required because Operator is in the pipelinegraph = compile(pipeline, checkpointer=MemorySaver())config = {"configurable": {"thread_id": "full-001"}}
result = run(graph, input={"node_id": "REQ-FULL-001"}, config=config)
print(result["decompose"].items)# ['shall authenticate', 'shall log', 'shall encrypt']
print(list(result["verify"].keys()))# ['security', 'observability']Handling the interrupt
Section titled “Handling the interrupt”If check_results finds a failing cluster, the graph pauses:
if "__interrupt__" in result: print(result["__interrupt__"][0].value) # {'issues': ['observability cluster failed']}
# Human reviews, approves, we resume result = run(graph, resume={"approved": True}, config=config) print(result["report"].text)What this demonstrates
Section titled “What this demonstrates”@nodewith Oracle kwargs —ensemble_n=3, merge_fn='merge_claims'in one line, no pipe operator needed for the primary path- Sub-constructs —
Construct(input=X, output=Y)creates an isolated state boundary; the parent treats it as a single typed node - Each fan-out —
| Each(over='cluster.groups', key='label')runsverifyonce per cluster in parallel - Operator interrupt —
| Operator(when='needs_review')pauses the graph when the condition payload is truthy - Mixed @node + declarative — both forms compose into one
Construct(nodes=[...]); the compiler treats them uniformly - Checkpointer requirement — any pipeline with
Operatormust pass a checkpointer tocompile() - Observability — every node emits structured logs;
call_counttracking shows where the LLM/tools were called
Three surfaces, one compiler
Section titled “Three surfaces, one compiler”This pipeline uses two of the three NeoGraph API surfaces:
@nodefor the LLM ensemble step (the clean, modern path)Node+Construct+|for the sub-construct and scripted modifier chains (the IR-level path)
Both produce the same internal representation and compile to the same LangGraph StateGraph. You can mix them freely within one pipeline. See Runtime Construction for when an LLM or config system builds pipelines programmatically.
Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.