Skip to content
Built by Postindustria. We help teams build agentic production systems.

4. Each Fan-Out

The Each modifier takes a node and runs it once per item in a collection produced by a previous node. Results are collected into a dict[label, result] keyed by a field you specify.

This is the map phase of map-reduce: discover a set of items, process each one in parallel, gather the results.

With @node, the Each modifier is applied via kwargs: map_over= and map_key= on the decorator.

  • Producing a collection from one node
  • Applying fan-out via @node kwargs (map_over=, map_key=)
  • How each parallel instance receives a single collection item
  • Reading fan-out results as a keyed dictionary
from pydantic import BaseModel
class ClusterGroup(BaseModel, frozen=True):
label: str
claim_ids: list[str]
class Clusters(BaseModel, frozen=True):
groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True):
cluster_label: str
coverage_pct: int
gaps: list[str]

The discover_clusters node produces a Clusters model containing a list of ClusterGroup items. The verify node processes each group independently:

from neograph import node
@node(output=Clusters)
def discover_clusters() -> Clusters:
"""Simulate discovering requirement clusters from analysis."""
return Clusters(groups=[
ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2", "REQ-3"]),
ClusterGroup(label="logging", claim_ids=["REQ-4", "REQ-5"]),
ClusterGroup(label="performance", claim_ids=["REQ-6"]),
])
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")
def verify(cluster: ClusterGroup) -> VerifyResult:
"""Verify a single cluster -- check coverage against codebase."""
coverage = {"authentication": 85, "logging": 60, "performance": 100}
gaps_map = {
"authentication": ["MFA not implemented"],
"logging": ["no structured logging", "missing audit trail"],
"performance": [],
}
return VerifyResult(
cluster_label=cluster.label,
coverage_pct=coverage.get(cluster.label, 0),
gaps=gaps_map.get(cluster.label, ["unknown"]),
)

map_over="discover_clusters.groups" tells the compiler: iterate over state.discover_clusters.groups. map_key="label" tells it: use each item’s .label as the dictionary key in the output.

The verify function receives a single ClusterGroup as its cluster parameter — one item per invocation. You write the function as if it processes one item; the framework handles the iteration.

"""Each Fan-Out: discover clusters, verify each in parallel.
Run:
python 04_each_fanout.py
"""
from __future__ import annotations
import sys
from pydantic import BaseModel
from neograph import compile, construct_from_module, node, run
# -- Schemas ----------------------------------------------------------------
class ClusterGroup(BaseModel, frozen=True):
label: str
claim_ids: list[str]
class Clusters(BaseModel, frozen=True):
groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True):
cluster_label: str
coverage_pct: int
gaps: list[str]
# -- Nodes ------------------------------------------------------------------
@node(output=Clusters)
def discover_clusters() -> Clusters:
"""Simulate discovering requirement clusters from analysis."""
return Clusters(groups=[
ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2", "REQ-3"]),
ClusterGroup(label="logging", claim_ids=["REQ-4", "REQ-5"]),
ClusterGroup(label="performance", claim_ids=["REQ-6"]),
])
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")
def verify(cluster: ClusterGroup) -> VerifyResult:
"""Verify a single cluster -- check coverage against codebase."""
coverage = {"authentication": 85, "logging": 60, "performance": 100}
gaps_map = {
"authentication": ["MFA not implemented"],
"logging": ["no structured logging", "missing audit trail"],
"performance": [],
}
return VerifyResult(
cluster_label=cluster.label,
coverage_pct=coverage.get(cluster.label, 0),
gaps=gaps_map.get(cluster.label, ["unknown"]),
)
# -- Build pipeline ---------------------------------------------------------
pipeline = construct_from_module(sys.modules[__name__])
# -- Run --------------------------------------------------------------------
if __name__ == "__main__":
graph = compile(pipeline)
result = run(graph, input={"node_id": "analysis-001"})
verify_results = result["verify"] # dict[str, VerifyResult]
print(f"Verified {len(verify_results)} clusters:\n")
for label, vr in verify_results.items():
status = "PASS" if vr.coverage_pct >= 80 else "GAPS"
print(f" [{status}] {label}: {vr.coverage_pct}% coverage")
for gap in vr.gaps:
print(f" - {gap}")
Verified 3 clusters:
[PASS] authentication: 85% coverage
- MFA not implemented
[GAPS] logging: 60% coverage
- no structured logging
- missing audit trail
[PASS] performance: 100% coverage
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")
def verify(cluster: ClusterGroup) -> VerifyResult: ...

Two kwargs:

  • map_over — dotted path to the collection in state. "discover_clusters.groups" means: look at state.discover_clusters, then access its .groups attribute. The collection must be iterable.
  • map_key — field on each item used as the dictionary key for results. "label" means each ClusterGroup’s .label becomes the key in the output dict.

This is equivalent to the pipe-operator style:

verify = Node.scripted("verify", fn="verify_cluster", input=ClusterGroup, output=VerifyResult
) | Each(over="discover_clusters.groups", key="label")

The result for a fan-out node is a dictionary keyed by the map_key field:

result["verify"]
# {
# "authentication": VerifyResult(cluster_label="authentication", coverage_pct=85, gaps=[...]),
# "logging": VerifyResult(cluster_label="logging", coverage_pct=60, gaps=[...]),
# "performance": VerifyResult(cluster_label="performance", coverage_pct=100, gaps=[]),
# }

Keys must be unique across the collection — if two items share a key, the compiler raises an error.

When it encounters map_over= and map_key=, the compiler expands the node into:

  1. Fan-out router — iterates over state.discover_clusters.groups, dispatches one Send() per item with the item set as neo_each_item in state
  2. Worker node — runs once per item, extracts the item from state, passes it as the function argument, wraps the result in {key_value: result}
  3. Barrier node (defer=True) — waits for all workers to complete, collects results via a dict-merge reducer

The state field for the node uses an Annotated dict reducer that merges results additively. Duplicate keys raise an error.


Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.