Skip to content
Built by Postindustria. We help teams build agentic production systems.

Organizing Large Pipelines

Small pipelines fit in a single file. Real systems don’t. NeoGraph provides three composition mechanisms that scale from a handful of nodes to hundreds.

The simplest pattern: one Python module per pipeline. All @node functions live in the module, and construct_from_module assembles them.

src/
pipelines/
__init__.py
decompose.py # @node functions for decomposition
verify.py # @node functions for verification
report.py # @node functions for reporting
pipelines/decompose.py
import sys
from neograph import node, construct_from_module
@node(output=Claims, prompt='rw/decompose', model='reason')
def decompose(topic: RawText) -> Claims: ...
@node(output=Classified, prompt='rw/classify', model='fast')
def classify(decompose: Claims) -> Classified: ...
pipeline = construct_from_module(sys.modules[__name__], name="decompose")

Each module is self-contained. construct_from_module only sees @node functions in its own module — it won’t pick up nodes from imports unless they’re assigned to module-level names.

Import @node functions from other modules to build larger pipelines. Since @node returns a Node instance, you can import it and include it in a Construct directly.

pipelines/full.py
from neograph import Construct, compile
from pipelines.decompose import decompose, classify
from pipelines.verify import verify_clusters
from pipelines.report import build_report
pipeline = Construct("full-analysis", nodes=[
decompose,
classify,
verify_clusters,
build_report,
])
graph = compile(pipeline)

When composing across modules, you assemble the Construct manually rather than using construct_from_module. This gives you explicit control over which nodes are included and their ordering.

When part of your pipeline has internal state that shouldn’t leak into the parent, wrap it in a Construct with declared I/O boundaries.

from neograph import Construct, Node, register_scripted
def lookup_context(input_data, config):
return Context(references=["auth.py:42", "logger.py:18"])
def score_claims(input_data, config):
scored = [{"claim": c, "score": "high"} for c in input_data.items]
return ScoredClaims(scored=scored)
register_scripted("lookup_context", lookup_context)
register_scripted("score_claims", score_claims)
# Sub-construct: Claims in, ScoredClaims out. Internal nodes are hidden.
enrich = Construct(
"enrich",
input=Claims,
output=ScoredClaims,
nodes=[
Node.scripted("lookup", fn="lookup_context", input=Claims, output=Context),
Node.scripted("score", fn="score_claims", input=Claims, output=ScoredClaims),
],
)

The parent pipeline sees enrich as a single unit with type Claims -> ScoredClaims. The intermediate Context node is invisible:

pipeline = Construct("analysis", nodes=[
decompose,
enrich, # sub-pipeline: Claims -> ScoredClaims
build_report,
])
graph = compile(pipeline)
result = run(graph, input={"node_id": "REQ-001"})
# result has "decompose" and "build_report" keys
# "lookup" and "score" are NOT in the result — they're internal to enrich

Sub-constructs also compose with modifiers. You can ensemble an entire sub-pipeline:

from neograph import Oracle
enrich_oracle = enrich | Oracle(n=3, merge_fn="merge_scored")
SituationApproach
Single pipeline, < 15 nodesOne module + construct_from_module
Multiple related pipelines sharing nodesImport @node instances across modules, assemble with Construct(nodes=[...])
Internal state that shouldn’t leakSub-construct with input= / output= boundaries
Branching logic (if/for)ForwardConstruct — see Control Flow
Runtime/dynamic graph construction`Node

@node + construct_from_module is the default. Use it when your pipeline is a DAG with no conditional branching. Parameter names wire edges. Topological sort handles ordering. This is the simplest and most common pattern.

ForwardConstruct adds Python control flow as graph topology. Use it when you need if (conditional edges) or for (fan-out) that depend on intermediate node outputs. See the ForwardConstruct section.

Runtime construction (Node | Modifier + Construct(nodes=[...])) is for programmatic graph building — an LLM emitting a pipeline via tool calls, a config system defining workflows, or any case where the graph shape isn’t known at import time.

All three compile to the same underlying graph. Mix them freely in the same project.


Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.