4. Each Fan-Out
The Each modifier takes a node and runs it once per item in a collection produced by a previous node. Results are collected into a dict[label, result] keyed by a field you specify.
This is the map phase of map-reduce: discover a set of items, process each one in parallel, gather the results.
With @node, the Each modifier is applied via kwargs: map_over= and map_key= on the decorator.
What you will learn
Section titled “What you will learn”- Producing a collection from one node
- Applying fan-out via
@nodekwargs (map_over=,map_key=) - How each parallel instance receives a single collection item
- Reading fan-out results as a keyed dictionary
Schemas
Section titled “Schemas”from pydantic import BaseModel
class ClusterGroup(BaseModel, frozen=True): label: str claim_ids: list[str]
class Clusters(BaseModel, frozen=True): groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True): cluster_label: str coverage_pct: int gaps: list[str]The discover_clusters node produces a Clusters model containing a list of ClusterGroup items. The verify node processes each group independently:
from neograph import node
@node(output=Clusters)def discover_clusters() -> Clusters: """Simulate discovering requirement clusters from analysis.""" return Clusters(groups=[ ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2", "REQ-3"]), ClusterGroup(label="logging", claim_ids=["REQ-4", "REQ-5"]), ClusterGroup(label="performance", claim_ids=["REQ-6"]), ])
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")def verify(cluster: ClusterGroup) -> VerifyResult: """Verify a single cluster -- check coverage against codebase.""" coverage = {"authentication": 85, "logging": 60, "performance": 100} gaps_map = { "authentication": ["MFA not implemented"], "logging": ["no structured logging", "missing audit trail"], "performance": [], } return VerifyResult( cluster_label=cluster.label, coverage_pct=coverage.get(cluster.label, 0), gaps=gaps_map.get(cluster.label, ["unknown"]), )map_over="discover_clusters.groups" tells the compiler: iterate over state.discover_clusters.groups. map_key="label" tells it: use each item’s .label as the dictionary key in the output.
The verify function receives a single ClusterGroup as its cluster parameter — one item per invocation. You write the function as if it processes one item; the framework handles the iteration.
The complete pipeline
Section titled “The complete pipeline”"""Each Fan-Out: discover clusters, verify each in parallel.
Run: python 04_each_fanout.py"""
from __future__ import annotations
import sys
from pydantic import BaseModel
from neograph import compile, construct_from_module, node, run
# -- Schemas ----------------------------------------------------------------
class ClusterGroup(BaseModel, frozen=True): label: str claim_ids: list[str]
class Clusters(BaseModel, frozen=True): groups: list[ClusterGroup]
class VerifyResult(BaseModel, frozen=True): cluster_label: str coverage_pct: int gaps: list[str]
# -- Nodes ------------------------------------------------------------------
@node(output=Clusters)def discover_clusters() -> Clusters: """Simulate discovering requirement clusters from analysis.""" return Clusters(groups=[ ClusterGroup(label="authentication", claim_ids=["REQ-1", "REQ-2", "REQ-3"]), ClusterGroup(label="logging", claim_ids=["REQ-4", "REQ-5"]), ClusterGroup(label="performance", claim_ids=["REQ-6"]), ])
@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")def verify(cluster: ClusterGroup) -> VerifyResult: """Verify a single cluster -- check coverage against codebase.""" coverage = {"authentication": 85, "logging": 60, "performance": 100} gaps_map = { "authentication": ["MFA not implemented"], "logging": ["no structured logging", "missing audit trail"], "performance": [], } return VerifyResult( cluster_label=cluster.label, coverage_pct=coverage.get(cluster.label, 0), gaps=gaps_map.get(cluster.label, ["unknown"]), )
# -- Build pipeline ---------------------------------------------------------
pipeline = construct_from_module(sys.modules[__name__])
# -- Run --------------------------------------------------------------------
if __name__ == "__main__": graph = compile(pipeline) result = run(graph, input={"node_id": "analysis-001"})
verify_results = result["verify"] # dict[str, VerifyResult]
print(f"Verified {len(verify_results)} clusters:\n") for label, vr in verify_results.items(): status = "PASS" if vr.coverage_pct >= 80 else "GAPS" print(f" [{status}] {label}: {vr.coverage_pct}% coverage") for gap in vr.gaps: print(f" - {gap}")Expected output
Section titled “Expected output”Verified 3 clusters:
[PASS] authentication: 85% coverage - MFA not implemented [GAPS] logging: 60% coverage - no structured logging - missing audit trail [PASS] performance: 100% coverageThe map_over and map_key kwargs
Section titled “The map_over and map_key kwargs”@node(output=VerifyResult, map_over="discover_clusters.groups", map_key="label")def verify(cluster: ClusterGroup) -> VerifyResult: ...Two kwargs:
map_over— dotted path to the collection in state."discover_clusters.groups"means: look atstate.discover_clusters, then access its.groupsattribute. The collection must be iterable.map_key— field on each item used as the dictionary key for results."label"means eachClusterGroup’s.labelbecomes the key in the output dict.
This is equivalent to the pipe-operator style:
verify = Node.scripted("verify", fn="verify_cluster", input=ClusterGroup, output=VerifyResult) | Each(over="discover_clusters.groups", key="label")Reading fan-out results
Section titled “Reading fan-out results”The result for a fan-out node is a dictionary keyed by the map_key field:
result["verify"]# {# "authentication": VerifyResult(cluster_label="authentication", coverage_pct=85, gaps=[...]),# "logging": VerifyResult(cluster_label="logging", coverage_pct=60, gaps=[...]),# "performance": VerifyResult(cluster_label="performance", coverage_pct=100, gaps=[]),# }Keys must be unique across the collection — if two items share a key, the compiler raises an error.
What the compiler generates
Section titled “What the compiler generates”When it encounters map_over= and map_key=, the compiler expands the node into:
- Fan-out router — iterates over
state.discover_clusters.groups, dispatches oneSend()per item with the item set asneo_each_itemin state - Worker node — runs once per item, extracts the item from state, passes it as the function argument, wraps the result in
{key_value: result} - Barrier node (
defer=True) — waits for all workers to complete, collects results via a dict-merge reducer
The state field for the node uses an Annotated dict reducer that merges results additively. Duplicate keys raise an error.
Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.