Skip to content
Built by Postindustria. We help teams build agentic production systems.

Modifier Keywords

The @node decorator accepts keyword arguments that attach modifiers — Each, Oracle, and Operator — without touching the programmatic API. Each pattern shown below compiles to the same graph as its Node | Modifier equivalent.

Process each item in a collection independently. The map_over= keyword specifies a dotted path to the collection. The map_key= keyword names the field used to key results in the output dict.

from neograph import node, construct_from_module, compile, run
import sys
from pydantic import BaseModel
class ClusterGroup(BaseModel, frozen=True):
label: str
claim_ids: list[str]
class Clusters(BaseModel, frozen=True):
groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True):
cluster_label: str
coverage_pct: int
@node(output=Clusters)
def discover_clusters() -> Clusters:
return Clusters(groups=[
ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2"]),
ClusterGroup(label="logging", claim_ids=["REQ-3"]),
])
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")
def verify(cluster: ClusterGroup) -> VerifyResult:
coverage = {"authentication": 85, "logging": 60}
return VerifyResult(
cluster_label=cluster.label,
coverage_pct=coverage.get(cluster.label, 0),
)
pipeline = construct_from_module(sys.modules[__name__])
graph = compile(pipeline)
result = run(graph, input={"node_id": "analysis-001"})
# result["verify"] is dict[str, VerifyResult], keyed by label
for label, vr in result["verify"].items():
print(f"{label}: {vr.coverage_pct}%")

How it works:

  • map_over='discover_clusters.groups' tells the framework to iterate over the groups field of the discover_clusters output.
  • map_key='label' means each result is keyed by cluster.label in the output dict.
  • The cluster parameter does not match any upstream @node — the framework recognizes it as the fan-out item receiver and skips it in topology wiring.
  • Under the hood, @node applies | Each(over=..., key=...) to the created Node.

Both map_over and map_key are required together. Passing one without the other raises ConstructError.

Ensemble with ensemble_n, merge_fn, and merge_prompt

Section titled “Ensemble with ensemble_n, merge_fn, and merge_prompt”

Run a node N times in parallel, then merge the results. This is the Oracle pattern — multiple independent generations combined into a consensus.

from neograph import node, register_scripted, construct_from_module, compile, run
import sys
def merge_claims(variants, config):
"""Deduplicate claims across all variants."""
seen = set()
merged = []
for variant in variants:
for claim in variant.items:
if claim not in seen:
seen.add(claim)
merged.append(claim)
return Claims(items=merged)
register_scripted("merge_claims", merge_claims)
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims:
# Each of the 3 copies runs this function independently
return Claims(items=["shall authenticate", "shall encrypt"])
pipeline = construct_from_module(sys.modules[__name__], name="oracle-demo")

The merge step can be either scripted or LLM-powered:

KeywordWhat it does
merge_fn="name"Call a registered scripted function. Receives (variants, config).
merge_prompt="template"Call an LLM with the variants as context. Uses the node’s model=.

Exactly one of merge_fn or merge_prompt is required. Setting both raises ConstructError.

ensemble_n defaults to 3 if omitted when either merge keyword is present. It must be >= 2.

For LLM-mode nodes, the ensemble runs the same prompt N times (useful with non-zero temperature):

@node(output=Claims, prompt='rw/decompose', model='reason',
llm_config={"temperature": 0.8},
ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims: ...

Pause the graph for human review when a condition is met. This is the Operator pattern — human-in-the-loop checkpoints that require a checkpointer.

The interrupt_when= keyword accepts either a string (registered condition name) or a callable:

from neograph import node, construct_from_module, compile, run
from langgraph.checkpoint.memory import MemorySaver
import sys
@node(output=Analysis)
def analyze() -> Analysis:
return Analysis(claims=["auth", "logging"], coverage_pct=55)
@node(
output=ValidationResult,
interrupt_when=lambda state: (
{"issues": state.check.issues, "message": "Please review"}
if state.check and not state.check.passed
else None
),
)
def check(analyze: Analysis) -> ValidationResult:
if analyze.coverage_pct < 80:
return ValidationResult(passed=False, issues=["Coverage below threshold"])
return ValidationResult(passed=True, issues=[])
pipeline = construct_from_module(sys.modules[__name__], name="review")
graph = compile(pipeline, checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "review-001"}}
result = run(graph, input={"node_id": "REQ-001"}, config=config)
if "__interrupt__" in result:
# Human reviews, then resume
result = run(graph, resume={"approved": True}, config=config)

The callable receives the full graph state and returns either a dict (interrupt payload shown to the human) or None (no interrupt, continue). A checkpointer is required — compile() enforces this when an Operator is present.

from neograph import register_condition
def needs_review(state):
val = state.check_results
if val and not val.passed:
return {"issues": val.issues}
return None
register_condition("needs_review", needs_review)
@node(output=ValidationResult, interrupt_when="needs_review")
def check(analyze: Analysis) -> ValidationResult: ...

Every modifier keyword compiles to the same Node | Modifier that the runtime API uses. If you prefer the pipe syntax (for example, when an LLM is constructing the graph at runtime), the equivalent is:

from neograph import Node, Each, Oracle, Operator
# Fan-out
verify = Node("verify", mode="scripted", output=VerifyResult) | Each(over="clusters.groups", key="label")
# Ensemble
decompose = Node("decompose", mode="produce", output=Claims,
prompt="rw/decompose", model="reason") | Oracle(n=3, merge_fn="merge_claims")
# Interrupt
check = Node("check", mode="scripted", output=ValidationResult) | Operator(when="needs_review")

Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.