Skip to content
Built by Postindustria. We help teams build agentic production systems.

Modifier Keywords

The @node decorator accepts keyword arguments that attach modifiers — Each, Oracle, and Operator — without touching the programmatic API. Each pattern shown below compiles to the same graph as its Node | Modifier equivalent.

Process each item in a collection independently. The map_over= keyword specifies a dotted path to the collection. The map_key= keyword names the field used to key results in the output dict.

from neograph import node, construct_from_module, compile, run
import sys
from pydantic import BaseModel
class ClusterGroup(BaseModel, frozen=True):
label: str
claim_ids: list[str]
class Clusters(BaseModel, frozen=True):
groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True):
cluster_label: str
coverage_pct: int
@node(outputs=Clusters)
def discover_clusters() -> Clusters:
return Clusters(groups=[
ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2"]),
ClusterGroup(label="logging", claim_ids=["REQ-3"]),
])
@node(outputs=VerifyResult, map_over="discover_clusters.groups", map_key="label")
def verify(cluster: ClusterGroup) -> VerifyResult:
coverage = {"authentication": 85, "logging": 60}
return VerifyResult(
cluster_label=cluster.label,
coverage_pct=coverage.get(cluster.label, 0),
)
pipeline = construct_from_module(sys.modules[__name__])
graph = compile(pipeline)
result = run(graph, input={"node_id": "analysis-001"})
# result["verify"] is dict[str, VerifyResult], keyed by label
for label, vr in result["verify"].items():
print(f"{label}: {vr.coverage_pct}%")

How it works:

  • map_over='discover_clusters.groups' tells the framework to iterate over the groups field of the discover_clusters output.
  • map_key='label' means each result is keyed by cluster.label in the output dict.
  • The cluster parameter does not match any upstream @node — the framework recognizes it as the fan-out item receiver and skips it in topology wiring.
  • Under the hood, @node applies | Each(over=..., key=...) to the created Node.

Both map_over and map_key are required together. Passing one without the other raises ConstructError.

Ensemble with ensemble_n, merge_fn, and merge_prompt

Section titled “Ensemble with ensemble_n, merge_fn, and merge_prompt”

Run a node N times in parallel, then merge the results. This is the Oracle pattern — multiple independent generations combined into a consensus.

from neograph import node, register_scripted, construct_from_module, compile, run
import sys
def merge_claims(variants, config):
"""Deduplicate claims across all variants."""
seen = set()
merged = []
for variant in variants:
for claim in variant.items:
if claim not in seen:
seen.add(claim)
merged.append(claim)
return Claims(items=merged)
register_scripted("merge_claims", merge_claims)
@node(outputs=Claims, ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims:
# Each of the 3 copies runs this function independently
return Claims(items=["shall authenticate", "shall encrypt"])
pipeline = construct_from_module(sys.modules[__name__], name="oracle-demo")

The merge step can be either scripted or LLM-powered:

KeywordWhat it does
merge_fn="name"Call a registered scripted function. Receives (variants, config).
merge_prompt="template"Call an LLM with the variants as context. Uses the node’s model=.

Exactly one of merge_fn or merge_prompt is required. Setting both raises ConstructError.

ensemble_n defaults to 3 if omitted when either merge keyword is present. It must be >= 2.

For LLM-mode nodes, the ensemble runs the same prompt N times (useful with non-zero temperature):

@node(outputs=Claims, prompt='rw/decompose', model='reason',
llm_config={"temperature": 0.8},
ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims: ...

Run the same prompt on different models and merge. Each generator gets a different model tier:

@node(outputs=Summary, prompt='rw/summarize',
models=["reason", "fast", "creative"],
merge_fn="pick_best")
def summarize() -> Summary: ...

When models= is set, ensemble_n is inferred from len(models). You can also set both for redundancy — generators are assigned models round-robin:

# 9 generators across 3 models (3 each)
@node(outputs=Summary, prompt='rw/summarize',
models=["reason", "fast", "creative"], ensemble_n=9,
merge_fn="pick_best")
def summarize() -> Summary: ...

When models= is set without merge_fn or merge_prompt, the function body IS the merge function. It receives list[OutputType] at runtime:

@node(outputs=Summary, prompt='rw/summarize',
models=["reason", "fast", "creative"])
def summarize(data: Claims) -> Summary:
# data is list[Summary] at runtime (the collected variants)
return max(data, key=lambda v: v.confidence)

One definition captures what to generate (prompt), where to send it (models), and how to merge (body).

Pause the graph for human review when a condition is met. This is the Operator pattern — human-in-the-loop checkpoints that require a checkpointer.

The interrupt_when= keyword accepts either a string (registered condition name) or a callable:

from neograph import node, construct_from_module, compile, run
from langgraph.checkpoint.memory import MemorySaver
import sys
@node(outputs=Analysis)
def analyze() -> Analysis:
return Analysis(claims=["auth", "logging"], coverage_pct=55)
@node(
outputs=ValidationResult,
interrupt_when=lambda state: (
{"issues": state.check.issues, "message": "Please review"}
if state.check and not state.check.passed
else None
),
)
def check(analyze: Analysis) -> ValidationResult:
if analyze.coverage_pct < 80:
return ValidationResult(passed=False, issues=["Coverage below threshold"])
return ValidationResult(passed=True, issues=[])
pipeline = construct_from_module(sys.modules[__name__], name="review")
graph = compile(pipeline, checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "review-001"}}
result = run(graph, input={"node_id": "REQ-001"}, config=config)
if "__interrupt__" in result:
# Human reviews, then resume
result = run(graph, resume={"approved": True}, config=config)

The callable receives the full graph state and returns either a dict (interrupt payload shown to the human) or None (no interrupt, continue). A checkpointer is required — compile() enforces this when an Operator is present.

from neograph import register_condition
def needs_review(state):
val = state.check_results
if val and not val.passed:
return {"issues": val.issues}
return None
register_condition("needs_review", needs_review)
@node(outputs=ValidationResult, interrupt_when="needs_review")
def check(analyze: Analysis) -> ValidationResult: ...

Repeat a node until a condition is met. The node’s output feeds back as its own input on the next iteration.

from pydantic import BaseModel
from neograph import node, construct_from_functions, compile, run
class Draft(BaseModel, frozen=True):
content: str
score: float = 0.0
iteration: int = 0
@node(outputs=Draft)
def seed() -> Draft:
return Draft(content="initial", score=0.0)
@node(outputs=Draft, loop_when=lambda d: d is None or d.score < 0.8, max_iterations=5)
def refine(draft: Draft) -> Draft:
return Draft(content=f"v{draft.iteration + 1}", iteration=draft.iteration + 1,
score=draft.score + 0.3)
pipeline = construct_from_functions("writer", [seed, refine])
graph = compile(pipeline)
result = run(graph, input={"node_id": "loop-demo"})
# result["refine"] is a list (all iterations preserved)
final = result["refine"][-1]
print(f"Final: score={final.score}, iterations={final.iteration}")

How it works:

  • loop_when= receives the node’s latest output. Return True to continue looping, False to exit. The value may be None on the first iteration (before the node has produced output), so the callable must be None-safe — e.g. lambda d: d is None or d.score < 0.8.
  • max_iterations= caps the loop. Default is 10. When exceeded, behavior depends on on_exhaust=:
    • "error" (default): raises ExecutionError
    • "last": exits with the last result
  • The result is an append-list: every iteration’s output is preserved.
  • The parameter name doesn’t need to match the upstream. For self-loops, the framework resolves it by type — if exactly one upstream produces a compatible type, it wires automatically.

For loops involving multiple nodes (e.g., review + revise), wrap the loop body as a sub-construct with Loop:

from neograph import Loop
@node(outputs=ReviewResult)
def review(draft: Draft) -> ReviewResult: ...
@node(outputs=Draft)
def revise(draft: Draft, review: ReviewResult) -> Draft: ...
# Sub-construct loops as a unit: Draft in, Draft out
refine = construct_from_functions(
"refine", [review, revise], input=Draft, output=Draft,
) | Loop(when=lambda d: d is None or d.score < 0.8, max_iterations=10)
pipeline = construct_from_functions("writer", [seed, refine, finalize])

The parent pipeline sees only Draft -> Draft. The iteration is hidden inside the sub-construct.

In ForwardConstruct, use self.loop() for multi-node loop bodies:

class Writer(ForwardConstruct):
draft = Node(outputs=Draft, prompt='draft', model='fast')
review = Node(outputs=ReviewResult, prompt='review', model='reason')
revise = Node(outputs=Draft, prompt='revise', model='reason')
def forward(self, topic):
d = self.draft(topic)
d = self.loop(
body=[self.review, self.revise],
when=lambda r: r.score < 0.8,
max_iterations=5,
)(d)
return d

Python for/while loops in forward() don’t compile to graph cycles — they trace the body once (same limitation as torch.jit.trace). self.loop() is the explicit cycle primitive.

  • loop_when= and map_over= cannot be combined on the same node. Use a sub-construct with internal loop inside an Each fan-out instead.
  • Loop on a node is always a self-loop. Multi-node loops use construct | Loop(when=...) or self.loop() in ForwardConstruct.

Skip the LLM call when a condition is met. Useful for cost optimization — avoid expensive model invocations when the input is trivial or already resolved.

from pydantic import BaseModel
from neograph import node, construct_from_functions, compile, run
class Analysis(BaseModel, frozen=True):
text: str
complexity: float = 0.0
class Summary(BaseModel, frozen=True):
content: str
@node(outputs=Analysis)
def analyze() -> Analysis:
return Analysis(text="simple case", complexity=0.1)
@node(
outputs=Summary,
prompt="rw/summarize",
model="reason",
skip_when=lambda a: a.complexity < 0.3,
skip_value=lambda a: Summary(content=f"Trivial: {a.text}"),
)
def summarize(analyze: Analysis) -> Summary: ...
pipeline = construct_from_functions("pipeline", [analyze, summarize])
graph = compile(pipeline)
result = run(graph, input={"node_id": "skip-demo"})
# complexity < 0.3, so the LLM call is skipped entirely
print(result["summarize"].content) # "Trivial: simple case"

How it works:

  • skip_when= receives the extracted input data (after _extract_input, before rendering). Returns True to skip the LLM call, False to proceed normally. For single-upstream nodes, the callable receives the typed value directly (not a dict wrapper).
  • skip_value= produces the output when skipped. Receives the same input as skip_when. Required inside a Loop — without it, the loop counter still increments but no output is written.
  • When skipped, the node logs node_skipped with reason="skip_when" and the elapsed time.

When to use:

  • Cost optimization: skip expensive LLM calls when the input is trivial or a cached result suffices.
  • Conditional execution: nodes that should only run when a quality threshold is not met.
  • Loop exit: inside a Loop, skip_when + skip_value can short-circuit iterations when the output is already good enough.

skip_when applies to all LLM modes (think, agent, act). It does not apply to scripted nodes — use a conditional inside the function body instead.

Inject verbatim state fields into the LLM prompt alongside typed input. Unlike regular inputs, context values are not type-checked or rendered through the framework’s BAML renderer — they are passed as-is to the prompt compiler.

from pydantic import BaseModel
from neograph import node, construct_from_functions, compile, run
class Catalog(BaseModel, frozen=True):
content: str
class Claims(BaseModel, frozen=True):
items: list[str]
class ScoredClaims(BaseModel, frozen=True):
scored: list[dict]
@node(outputs=Catalog)
def build_catalog() -> Catalog:
return Catalog(content=(
"=== Graph Catalog (BFS from UC-001) ===\n"
"UC-001: User Authentication [auth.py:10-85]\n"
" BR-001: Password min 12 chars\n"
" BR-002: Session timeout 30min\n"
))
@node(outputs=Claims)
def decompose() -> Claims:
return Claims(items=["auth uses SSO", "data encrypted at rest"])
@node(
outputs=ScoredClaims,
prompt="rw/score",
model="reason",
context=["build_catalog"],
)
def score(decompose: Claims) -> ScoredClaims: ...
pipeline = construct_from_functions("pipeline", [build_catalog, decompose, score])

How it works:

  • context= takes a list of state field names (strings). At runtime, the factory reads each named field from state and passes the values as a context dict to the prompt compiler: invoke_structured(..., context={"build_catalog": <value>}).
  • Context values bypass the renderer dispatch chain. The prompt compiler receives them verbatim — no BAML describe_value, no XML/JSON wrapping. This is the right choice for pre-formatted content like graph catalogs, domain briefings, or cached summaries that are already crafted for LLM consumption.
  • Context names are validated at compile time. The state model creates fields for each declared context name, and the validator checks that the names correspond to known upstream producers.

Difference from inputs:

inputscontext
Type-checkedYes (Pydantic model matching)No (passed as-is)
RenderedYes (BAML / renderer chain)No (verbatim)
Topology wiringDetermines DAG edgesDoes not affect topology
Prompt placementMain input sectionSeparate context dict

Context fields are forwarded into sub-constructs automatically. If a node inside a sub-construct declares context=["build_catalog"], the parent state value is copied into the sub-graph’s state at invocation time. See example 14 (examples/14_context_injection.py) for the full pattern with sub-constructs and Each fan-out.

Collect every iteration’s output into a separate history field for debugging or audit trails.

from pydantic import BaseModel
from neograph import node, construct_from_functions, compile, run
class Draft(BaseModel, frozen=True):
content: str
score: float = 0.0
iteration: int = 0
@node(outputs=Draft)
def seed() -> Draft:
return Draft(content="initial", score=0.0)
@node(
outputs=Draft,
loop_when=lambda d: d is None or d.score < 0.8,
max_iterations=5,
loop_history=True,
)
def refine(draft: Draft) -> Draft:
return Draft(
content=f"v{draft.iteration + 1}",
iteration=draft.iteration + 1,
score=draft.score + 0.3,
)
pipeline = construct_from_functions("writer", [seed, refine])
graph = compile(pipeline)
result = run(graph, input={"node_id": "history-demo"})
# The main output is the append-list of all iterations
final = result["refine"][-1]
print(f"Final: score={final.score}")
# The history field collects each iteration independently
history = result.get("neo_loop_history_refine", [])
for i, draft in enumerate(history):
print(f" Iteration {i}: score={draft.score}")

How it works:

  • loop_history=True (or Loop(history=True) in the programmatic API) creates an additional state field neo_loop_history_{node_name} that collects each iteration’s output into a list.
  • The history field uses a list reducer — each iteration appends to it independently. This is separate from the node’s main output field (which is also an append-list for Loop nodes).
  • The history field is written by _build_state_update in the factory layer alongside the main output and loop counter increment.

When to use:

  • Debugging: inspect how the output evolved across iterations.
  • Audit trail: keep a record of each refinement step for compliance or review.
  • Analytics: measure convergence rate or detect oscillation.

Restrictions:

  • Only supported on Node-level Loops. Applying Loop(history=True) to a Construct raises ConstructError at pipe-composition time. For Construct-level loops, collect history inside the sub-construct’s internal nodes instead.

The programmatic equivalent:

from neograph import Loop
refine = Node("refine", mode="scripted", outputs=Draft) | Loop(
when=lambda d: d is None or d.score < 0.8,
max_iterations=5,
history=True,
)

The .map() method is sugar over | Each(over=..., key=...). It accepts either a lambda or a string path.

@node(outputs=VerifyResult)
def verify(cluster: ClusterGroup) -> VerifyResult:
return VerifyResult(cluster_label=cluster.label, coverage_pct=85)
verify = verify.map(lambda s: s.discover_clusters.groups, key="label")

The lambda is introspected once at definition time via a recording proxy. Pyright and Pylance catch typos in the attribute chain — renaming discover_clusters to find_clusters shows as a red squiggle, unlike a string path.

verify = verify.map("discover_clusters.groups", key="label")

Equivalent to verify | Each(over="discover_clusters.groups", key="label"). Use this when the string path is computed at runtime or when you do not need static analysis coverage.

FormAdvantage
map_over= keywordInline with @node(...), no separate .map() call
.map(lambda ...)Refactor-safe; static analysers catch typos
.map("string")Escape hatch; works with dynamic paths
| Each(over=..., key=...)Programmatic API; used by LLM-driven runtime construction

All four produce the same Each modifier on the node. The compiler, state builder, and factory see no difference.

Every modifier keyword compiles to the same Node | Modifier that the runtime API uses. If you prefer the pipe syntax (for example, when an LLM is constructing the graph at runtime), the equivalent is:

from neograph import Node, Each, Oracle, Operator
from neograph import Loop
# Fan-out
verify = Node("verify", mode="scripted", outputs=VerifyResult) | Each(over="clusters.groups", key="label")
# Ensemble (same model)
decompose = Node("decompose", mode="think", outputs=Claims,
prompt="rw/decompose", model="reason") | Oracle(n=3, merge_fn="merge_claims")
# Ensemble (multi-model)
decompose = Node("decompose", mode="think", outputs=Claims,
prompt="rw/decompose") | Oracle(models=["reason", "fast", "creative"], merge_fn="merge_claims")
# Interrupt
check = Node("check", mode="scripted", outputs=ValidationResult) | Operator(when="needs_review")
# Loop (self-loop on node) — when= must be None-safe
refine = Node("refine", mode="scripted", outputs=Draft) | Loop(when=lambda d: d is None or d.score < 0.8, max_iterations=5)
# Loop (on sub-construct)
body = Construct("refine", input=Draft, output=Draft, nodes=[review, revise])
body = body | Loop(when=lambda d: d is None or d.score < 0.8, max_iterations=10)

Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.