Modifier Keywords
The @node decorator accepts keyword arguments that attach modifiers — Each, Oracle, and Operator — without touching the programmatic API. Each pattern shown below compiles to the same graph as its Node | Modifier equivalent.
Fan-out with map_over and map_key
Section titled “Fan-out with map_over and map_key”Process each item in a collection independently. The map_over= keyword specifies a dotted path to the collection. The map_key= keyword names the field used to key results in the output dict.
from neograph import node, construct_from_module, compile, runimport sysfrom pydantic import BaseModel
class ClusterGroup(BaseModel, frozen=True): label: str claim_ids: list[str]
class Clusters(BaseModel, frozen=True): groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True): cluster_label: str coverage_pct: int
@node(output=Clusters)def discover_clusters() -> Clusters: return Clusters(groups=[ ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2"]), ClusterGroup(label="logging", claim_ids=["REQ-3"]), ])
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")def verify(cluster: ClusterGroup) -> VerifyResult: coverage = {"authentication": 85, "logging": 60} return VerifyResult( cluster_label=cluster.label, coverage_pct=coverage.get(cluster.label, 0), )
pipeline = construct_from_module(sys.modules[__name__])graph = compile(pipeline)result = run(graph, input={"node_id": "analysis-001"})
# result["verify"] is dict[str, VerifyResult], keyed by labelfor label, vr in result["verify"].items(): print(f"{label}: {vr.coverage_pct}%")How it works:
map_over='discover_clusters.groups'tells the framework to iterate over thegroupsfield of thediscover_clustersoutput.map_key='label'means each result is keyed bycluster.labelin the output dict.- The
clusterparameter does not match any upstream@node— the framework recognizes it as the fan-out item receiver and skips it in topology wiring. - Under the hood,
@nodeapplies| Each(over=..., key=...)to the created Node.
Both map_over and map_key are required together. Passing one without the other raises ConstructError.
Ensemble with ensemble_n, merge_fn, and merge_prompt
Section titled “Ensemble with ensemble_n, merge_fn, and merge_prompt”Run a node N times in parallel, then merge the results. This is the Oracle pattern — multiple independent generations combined into a consensus.
from neograph import node, register_scripted, construct_from_module, compile, runimport sys
def merge_claims(variants, config): """Deduplicate claims across all variants.""" seen = set() merged = [] for variant in variants: for claim in variant.items: if claim not in seen: seen.add(claim) merged.append(claim) return Claims(items=merged)
register_scripted("merge_claims", merge_claims)
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: # Each of the 3 copies runs this function independently return Claims(items=["shall authenticate", "shall encrypt"])
pipeline = construct_from_module(sys.modules[__name__], name="oracle-demo")The merge step can be either scripted or LLM-powered:
| Keyword | What it does |
|---|---|
merge_fn="name" | Call a registered scripted function. Receives (variants, config). |
merge_prompt="template" | Call an LLM with the variants as context. Uses the node’s model=. |
Exactly one of merge_fn or merge_prompt is required. Setting both raises ConstructError.
ensemble_n defaults to 3 if omitted when either merge keyword is present. It must be >= 2.
For LLM-mode nodes, the ensemble runs the same prompt N times (useful with non-zero temperature):
@node(output=Claims, prompt='rw/decompose', model='reason', llm_config={"temperature": 0.8}, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: ...Interrupt with interrupt_when
Section titled “Interrupt with interrupt_when”Pause the graph for human review when a condition is met. This is the Operator pattern — human-in-the-loop checkpoints that require a checkpointer.
The interrupt_when= keyword accepts either a string (registered condition name) or a callable:
Callable form
Section titled “Callable form”from neograph import node, construct_from_module, compile, runfrom langgraph.checkpoint.memory import MemorySaverimport sys
@node(output=Analysis)def analyze() -> Analysis: return Analysis(claims=["auth", "logging"], coverage_pct=55)
@node( output=ValidationResult, interrupt_when=lambda state: ( {"issues": state.check.issues, "message": "Please review"} if state.check and not state.check.passed else None ),)def check(analyze: Analysis) -> ValidationResult: if analyze.coverage_pct < 80: return ValidationResult(passed=False, issues=["Coverage below threshold"]) return ValidationResult(passed=True, issues=[])
pipeline = construct_from_module(sys.modules[__name__], name="review")graph = compile(pipeline, checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "review-001"}}result = run(graph, input={"node_id": "REQ-001"}, config=config)
if "__interrupt__" in result: # Human reviews, then resume result = run(graph, resume={"approved": True}, config=config)The callable receives the full graph state and returns either a dict (interrupt payload shown to the human) or None (no interrupt, continue). A checkpointer is required — compile() enforces this when an Operator is present.
String form
Section titled “String form”from neograph import register_condition
def needs_review(state): val = state.check_results if val and not val.passed: return {"issues": val.issues} return None
register_condition("needs_review", needs_review)
@node(output=ValidationResult, interrupt_when="needs_review")def check(analyze: Analysis) -> ValidationResult: ...Programmatic equivalent
Section titled “Programmatic equivalent”Every modifier keyword compiles to the same Node | Modifier that the runtime API uses. If you prefer the pipe syntax (for example, when an LLM is constructing the graph at runtime), the equivalent is:
from neograph import Node, Each, Oracle, Operator
# Fan-outverify = Node("verify", mode="scripted", output=VerifyResult) | Each(over="clusters.groups", key="label")
# Ensembledecompose = Node("decompose", mode="produce", output=Claims, prompt="rw/decompose", model="reason") | Oracle(n=3, merge_fn="merge_claims")
# Interruptcheck = Node("check", mode="scripted", output=ValidationResult) | Operator(when="needs_review")Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.