Modifier Keywords
The @node decorator accepts keyword arguments that attach modifiers — Each, Oracle, and Operator — without touching the programmatic API. Each pattern shown below compiles to the same graph as its Node | Modifier equivalent.
Fan-out with map_over and map_key
Section titled “Fan-out with map_over and map_key”Process each item in a collection independently. The map_over= keyword specifies a dotted path to the collection. The map_key= keyword names the field used to key results in the output dict.
from neograph import node, construct_from_module, compile, runimport sysfrom pydantic import BaseModel
class ClusterGroup(BaseModel, frozen=True): label: str claim_ids: list[str]
class Clusters(BaseModel, frozen=True): groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True): cluster_label: str coverage_pct: int
@node(outputs=Clusters)def discover_clusters() -> Clusters: return Clusters(groups=[ ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2"]), ClusterGroup(label="logging", claim_ids=["REQ-3"]), ])
@node(outputs=VerifyResult, map_over="discover_clusters.groups", map_key="label")def verify(cluster: ClusterGroup) -> VerifyResult: coverage = {"authentication": 85, "logging": 60} return VerifyResult( cluster_label=cluster.label, coverage_pct=coverage.get(cluster.label, 0), )
pipeline = construct_from_module(sys.modules[__name__])graph = compile(pipeline)result = run(graph, input={"node_id": "analysis-001"})
# result["verify"] is dict[str, VerifyResult], keyed by labelfor label, vr in result["verify"].items(): print(f"{label}: {vr.coverage_pct}%")How it works:
map_over='discover_clusters.groups'tells the framework to iterate over thegroupsfield of thediscover_clustersoutput.map_key='label'means each result is keyed bycluster.labelin the output dict.- The
clusterparameter does not match any upstream@node— the framework recognizes it as the fan-out item receiver and skips it in topology wiring. - Under the hood,
@nodeapplies| Each(over=..., key=...)to the created Node.
Both map_over and map_key are required together. Passing one without the other raises ConstructError.
Ensemble with ensemble_n, merge_fn, and merge_prompt
Section titled “Ensemble with ensemble_n, merge_fn, and merge_prompt”Run a node N times in parallel, then merge the results. This is the Oracle pattern — multiple independent generations combined into a consensus.
from neograph import node, register_scripted, construct_from_module, compile, runimport sys
def merge_claims(variants, config): """Deduplicate claims across all variants.""" seen = set() merged = [] for variant in variants: for claim in variant.items: if claim not in seen: seen.add(claim) merged.append(claim) return Claims(items=merged)
register_scripted("merge_claims", merge_claims)
@node(outputs=Claims, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: # Each of the 3 copies runs this function independently return Claims(items=["shall authenticate", "shall encrypt"])
pipeline = construct_from_module(sys.modules[__name__], name="oracle-demo")The merge step can be either scripted or LLM-powered:
| Keyword | What it does |
|---|---|
merge_fn="name" | Call a registered scripted function. Receives (variants, config). |
merge_prompt="template" | Call an LLM with the variants as context. Uses the node’s model=. |
Exactly one of merge_fn or merge_prompt is required. Setting both raises ConstructError.
ensemble_n defaults to 3 if omitted when either merge keyword is present. It must be >= 2.
For LLM-mode nodes, the ensemble runs the same prompt N times (useful with non-zero temperature):
@node(outputs=Claims, prompt='rw/decompose', model='reason', llm_config={"temperature": 0.8}, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: ...Multi-model ensemble with models=
Section titled “Multi-model ensemble with models=”Run the same prompt on different models and merge. Each generator gets a different model tier:
@node(outputs=Summary, prompt='rw/summarize', models=["reason", "fast", "creative"], merge_fn="pick_best")def summarize() -> Summary: ...When models= is set, ensemble_n is inferred from len(models). You can also set both for redundancy — generators are assigned models round-robin:
# 9 generators across 3 models (3 each)@node(outputs=Summary, prompt='rw/summarize', models=["reason", "fast", "creative"], ensemble_n=9, merge_fn="pick_best")def summarize() -> Summary: ...Body-as-merge
Section titled “Body-as-merge”When models= is set without merge_fn or merge_prompt, the function body IS the merge function. It receives list[OutputType] at runtime:
@node(outputs=Summary, prompt='rw/summarize', models=["reason", "fast", "creative"])def summarize(data: Claims) -> Summary: # data is list[Summary] at runtime (the collected variants) return max(data, key=lambda v: v.confidence)One definition captures what to generate (prompt), where to send it (models), and how to merge (body).
Interrupt with interrupt_when
Section titled “Interrupt with interrupt_when”Pause the graph for human review when a condition is met. This is the Operator pattern — human-in-the-loop checkpoints that require a checkpointer.
The interrupt_when= keyword accepts either a string (registered condition name) or a callable:
Callable form
Section titled “Callable form”from neograph import node, construct_from_module, compile, runfrom langgraph.checkpoint.memory import MemorySaverimport sys
@node(outputs=Analysis)def analyze() -> Analysis: return Analysis(claims=["auth", "logging"], coverage_pct=55)
@node( outputs=ValidationResult, interrupt_when=lambda state: ( {"issues": state.check.issues, "message": "Please review"} if state.check and not state.check.passed else None ),)def check(analyze: Analysis) -> ValidationResult: if analyze.coverage_pct < 80: return ValidationResult(passed=False, issues=["Coverage below threshold"]) return ValidationResult(passed=True, issues=[])
pipeline = construct_from_module(sys.modules[__name__], name="review")graph = compile(pipeline, checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "review-001"}}result = run(graph, input={"node_id": "REQ-001"}, config=config)
if "__interrupt__" in result: # Human reviews, then resume result = run(graph, resume={"approved": True}, config=config)The callable receives the full graph state and returns either a dict (interrupt payload shown to the human) or None (no interrupt, continue). A checkpointer is required — compile() enforces this when an Operator is present.
String form
Section titled “String form”from neograph import register_condition
def needs_review(state): val = state.check_results if val and not val.passed: return {"issues": val.issues} return None
register_condition("needs_review", needs_review)
@node(outputs=ValidationResult, interrupt_when="needs_review")def check(analyze: Analysis) -> ValidationResult: ...Loop with loop_when and max_iterations
Section titled “Loop with loop_when and max_iterations”Repeat a node until a condition is met. The node’s output feeds back as its own input on the next iteration.
from pydantic import BaseModelfrom neograph import node, construct_from_functions, compile, run
class Draft(BaseModel, frozen=True): content: str score: float = 0.0 iteration: int = 0
@node(outputs=Draft)def seed() -> Draft: return Draft(content="initial", score=0.0)
@node(outputs=Draft, loop_when=lambda d: d is None or d.score < 0.8, max_iterations=5)def refine(draft: Draft) -> Draft: return Draft(content=f"v{draft.iteration + 1}", iteration=draft.iteration + 1, score=draft.score + 0.3)
pipeline = construct_from_functions("writer", [seed, refine])graph = compile(pipeline)result = run(graph, input={"node_id": "loop-demo"})
# result["refine"] is a list (all iterations preserved)final = result["refine"][-1]print(f"Final: score={final.score}, iterations={final.iteration}")How it works:
loop_when=receives the node’s latest output. ReturnTrueto continue looping,Falseto exit. The value may beNoneon the first iteration (before the node has produced output), so the callable must be None-safe — e.g.lambda d: d is None or d.score < 0.8.max_iterations=caps the loop. Default is 10. When exceeded, behavior depends onon_exhaust=:"error"(default): raisesExecutionError"last": exits with the last result
- The result is an append-list: every iteration’s output is preserved.
- The parameter name doesn’t need to match the upstream. For self-loops, the framework resolves it by type — if exactly one upstream produces a compatible type, it wires automatically.
Multi-node loop body
Section titled “Multi-node loop body”For loops involving multiple nodes (e.g., review + revise), wrap the loop body as a sub-construct with Loop:
from neograph import Loop
@node(outputs=ReviewResult)def review(draft: Draft) -> ReviewResult: ...
@node(outputs=Draft)def revise(draft: Draft, review: ReviewResult) -> Draft: ...
# Sub-construct loops as a unit: Draft in, Draft outrefine = construct_from_functions( "refine", [review, revise], input=Draft, output=Draft,) | Loop(when=lambda d: d is None or d.score < 0.8, max_iterations=10)
pipeline = construct_from_functions("writer", [seed, refine, finalize])The parent pipeline sees only Draft -> Draft. The iteration is hidden inside the sub-construct.
ForwardConstruct equivalent
Section titled “ForwardConstruct equivalent”In ForwardConstruct, use self.loop() for multi-node loop bodies:
class Writer(ForwardConstruct): draft = Node(outputs=Draft, prompt='draft', model='fast') review = Node(outputs=ReviewResult, prompt='review', model='reason') revise = Node(outputs=Draft, prompt='revise', model='reason')
def forward(self, topic): d = self.draft(topic) d = self.loop( body=[self.review, self.revise], when=lambda r: r.score < 0.8, max_iterations=5, )(d) return dPython for/while loops in forward() don’t compile to graph cycles — they trace the body once (same limitation as torch.jit.trace). self.loop() is the explicit cycle primitive.
Restrictions
Section titled “Restrictions”loop_when=andmap_over=cannot be combined on the same node. Use a sub-construct with internal loop inside anEachfan-out instead.- Loop on a node is always a self-loop. Multi-node loops use
construct | Loop(when=...)orself.loop()in ForwardConstruct.
Skip with skip_when and skip_value
Section titled “Skip with skip_when and skip_value”Skip the LLM call when a condition is met. Useful for cost optimization — avoid expensive model invocations when the input is trivial or already resolved.
from pydantic import BaseModelfrom neograph import node, construct_from_functions, compile, run
class Analysis(BaseModel, frozen=True): text: str complexity: float = 0.0
class Summary(BaseModel, frozen=True): content: str
@node(outputs=Analysis)def analyze() -> Analysis: return Analysis(text="simple case", complexity=0.1)
@node( outputs=Summary, prompt="rw/summarize", model="reason", skip_when=lambda a: a.complexity < 0.3, skip_value=lambda a: Summary(content=f"Trivial: {a.text}"),)def summarize(analyze: Analysis) -> Summary: ...
pipeline = construct_from_functions("pipeline", [analyze, summarize])graph = compile(pipeline)result = run(graph, input={"node_id": "skip-demo"})
# complexity < 0.3, so the LLM call is skipped entirelyprint(result["summarize"].content) # "Trivial: simple case"How it works:
skip_when=receives the extracted input data (after_extract_input, before rendering). ReturnsTrueto skip the LLM call,Falseto proceed normally. For single-upstream nodes, the callable receives the typed value directly (not a dict wrapper).skip_value=produces the output when skipped. Receives the same input asskip_when. Required inside a Loop — without it, the loop counter still increments but no output is written.- When skipped, the node logs
node_skippedwithreason="skip_when"and the elapsed time.
When to use:
- Cost optimization: skip expensive LLM calls when the input is trivial or a cached result suffices.
- Conditional execution: nodes that should only run when a quality threshold is not met.
- Loop exit: inside a
Loop,skip_when+skip_valuecan short-circuit iterations when the output is already good enough.
skip_when applies to all LLM modes (think, agent, act). It does not apply to scripted nodes — use a conditional inside the function body instead.
Context injection with context
Section titled “Context injection with context”Inject verbatim state fields into the LLM prompt alongside typed input. Unlike regular inputs, context values are not type-checked or rendered through the framework’s BAML renderer — they are passed as-is to the prompt compiler.
from pydantic import BaseModelfrom neograph import node, construct_from_functions, compile, run
class Catalog(BaseModel, frozen=True): content: str
class Claims(BaseModel, frozen=True): items: list[str]
class ScoredClaims(BaseModel, frozen=True): scored: list[dict]
@node(outputs=Catalog)def build_catalog() -> Catalog: return Catalog(content=( "=== Graph Catalog (BFS from UC-001) ===\n" "UC-001: User Authentication [auth.py:10-85]\n" " BR-001: Password min 12 chars\n" " BR-002: Session timeout 30min\n" ))
@node(outputs=Claims)def decompose() -> Claims: return Claims(items=["auth uses SSO", "data encrypted at rest"])
@node( outputs=ScoredClaims, prompt="rw/score", model="reason", context=["build_catalog"],)def score(decompose: Claims) -> ScoredClaims: ...
pipeline = construct_from_functions("pipeline", [build_catalog, decompose, score])How it works:
context=takes a list of state field names (strings). At runtime, the factory reads each named field from state and passes the values as acontextdict to the prompt compiler:invoke_structured(..., context={"build_catalog": <value>}).- Context values bypass the renderer dispatch chain. The prompt compiler receives them verbatim — no BAML
describe_value, no XML/JSON wrapping. This is the right choice for pre-formatted content like graph catalogs, domain briefings, or cached summaries that are already crafted for LLM consumption. - Context names are validated at compile time. The state model creates fields for each declared context name, and the validator checks that the names correspond to known upstream producers.
Difference from inputs:
inputs | context | |
|---|---|---|
| Type-checked | Yes (Pydantic model matching) | No (passed as-is) |
| Rendered | Yes (BAML / renderer chain) | No (verbatim) |
| Topology wiring | Determines DAG edges | Does not affect topology |
| Prompt placement | Main input section | Separate context dict |
Context fields are forwarded into sub-constructs automatically. If a node inside a sub-construct declares context=["build_catalog"], the parent state value is copied into the sub-graph’s state at invocation time. See example 14 (examples/14_context_injection.py) for the full pattern with sub-constructs and Each fan-out.
Loop history with loop_history
Section titled “Loop history with loop_history”Collect every iteration’s output into a separate history field for debugging or audit trails.
from pydantic import BaseModelfrom neograph import node, construct_from_functions, compile, run
class Draft(BaseModel, frozen=True): content: str score: float = 0.0 iteration: int = 0
@node(outputs=Draft)def seed() -> Draft: return Draft(content="initial", score=0.0)
@node( outputs=Draft, loop_when=lambda d: d is None or d.score < 0.8, max_iterations=5, loop_history=True,)def refine(draft: Draft) -> Draft: return Draft( content=f"v{draft.iteration + 1}", iteration=draft.iteration + 1, score=draft.score + 0.3, )
pipeline = construct_from_functions("writer", [seed, refine])graph = compile(pipeline)result = run(graph, input={"node_id": "history-demo"})
# The main output is the append-list of all iterationsfinal = result["refine"][-1]print(f"Final: score={final.score}")
# The history field collects each iteration independentlyhistory = result.get("neo_loop_history_refine", [])for i, draft in enumerate(history): print(f" Iteration {i}: score={draft.score}")How it works:
loop_history=True(orLoop(history=True)in the programmatic API) creates an additional state fieldneo_loop_history_{node_name}that collects each iteration’s output into a list.- The history field uses a list reducer — each iteration appends to it independently. This is separate from the node’s main output field (which is also an append-list for Loop nodes).
- The history field is written by
_build_state_updatein the factory layer alongside the main output and loop counter increment.
When to use:
- Debugging: inspect how the output evolved across iterations.
- Audit trail: keep a record of each refinement step for compliance or review.
- Analytics: measure convergence rate or detect oscillation.
Restrictions:
- Only supported on Node-level Loops. Applying
Loop(history=True)to a Construct raisesConstructErrorat pipe-composition time. For Construct-level loops, collect history inside the sub-construct’s internal nodes instead.
The programmatic equivalent:
from neograph import Loop
refine = Node("refine", mode="scripted", outputs=Draft) | Loop( when=lambda d: d is None or d.score < 0.8, max_iterations=5, history=True,)Fan-out with .map()
Section titled “Fan-out with .map()”The .map() method is sugar over | Each(over=..., key=...). It accepts either a lambda or a string path.
Lambda form (refactor-safe)
Section titled “Lambda form (refactor-safe)”@node(outputs=VerifyResult)def verify(cluster: ClusterGroup) -> VerifyResult: return VerifyResult(cluster_label=cluster.label, coverage_pct=85)
verify = verify.map(lambda s: s.discover_clusters.groups, key="label")The lambda is introspected once at definition time via a recording proxy. Pyright and Pylance catch typos in the attribute chain — renaming discover_clusters to find_clusters shows as a red squiggle, unlike a string path.
String form
Section titled “String form”verify = verify.map("discover_clusters.groups", key="label")Equivalent to verify | Each(over="discover_clusters.groups", key="label"). Use this when the string path is computed at runtime or when you do not need static analysis coverage.
When to use which
Section titled “When to use which”| Form | Advantage |
|---|---|
map_over= keyword | Inline with @node(...), no separate .map() call |
.map(lambda ...) | Refactor-safe; static analysers catch typos |
.map("string") | Escape hatch; works with dynamic paths |
| Each(over=..., key=...) | Programmatic API; used by LLM-driven runtime construction |
All four produce the same Each modifier on the node. The compiler, state builder, and factory see no difference.
Programmatic equivalent
Section titled “Programmatic equivalent”Every modifier keyword compiles to the same Node | Modifier that the runtime API uses. If you prefer the pipe syntax (for example, when an LLM is constructing the graph at runtime), the equivalent is:
from neograph import Node, Each, Oracle, Operatorfrom neograph import Loop
# Fan-outverify = Node("verify", mode="scripted", outputs=VerifyResult) | Each(over="clusters.groups", key="label")
# Ensemble (same model)decompose = Node("decompose", mode="think", outputs=Claims, prompt="rw/decompose", model="reason") | Oracle(n=3, merge_fn="merge_claims")
# Ensemble (multi-model)decompose = Node("decompose", mode="think", outputs=Claims, prompt="rw/decompose") | Oracle(models=["reason", "fast", "creative"], merge_fn="merge_claims")
# Interruptcheck = Node("check", mode="scripted", outputs=ValidationResult) | Operator(when="needs_review")
# Loop (self-loop on node) — when= must be None-saferefine = Node("refine", mode="scripted", outputs=Draft) | Loop(when=lambda d: d is None or d.score < 0.8, max_iterations=5)
# Loop (on sub-construct)body = Construct("refine", input=Draft, output=Draft, nodes=[review, revise])body = body | Loop(when=lambda d: d is None or d.score < 0.8, max_iterations=10)Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.