3. Oracle Ensemble
An Oracle runs the same node multiple times in parallel — each instance gets a unique generator ID so it can produce a different perspective. A merge step combines the results into a single consensus output.
This is the ensemble pattern: multiple independent attempts at the same task, deduplicated or judged into one answer.
With @node, the Oracle modifier is applied via kwargs: ensemble_n= and merge_fn= on the decorator. No | pipe operator needed.
What you will learn
Section titled “What you will learn”- Applying the Oracle modifier via
@nodekwargs (ensemble_n=,merge_fn=) - Registering a scripted merge function with
register_scripted() - Thread-safe generation across parallel instances
- How the compiler expands Oracle into fan-out + barrier + merge
Schemas
Section titled “Schemas”from pydantic import BaseModel
class Topic(BaseModel, frozen=True): text: str
class Claims(BaseModel, frozen=True): items: list[str]The Oracle node
Section titled “The Oracle node”Adding ensemble_n=3 and merge_fn="merge_claims" to @node tells the compiler to run this node 3 times in parallel, then merge the results:
from neograph import node, register_scripted
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: with _gen_counter_lock: idx = _gen_counter[0] % len(_perspectives) _gen_counter[0] += 1 return Claims(items=_perspectives[idx])Each parallel instance calls the same function. A thread-safe counter rotates through perspectives so each generator returns different claims.
The merge function
Section titled “The merge function”The merge function receives a list of all generator outputs and combines them:
def merge_claims(variants, config): """Merge N claim lists into one deduplicated list.""" seen = set() merged = [] for variant in variants: for claim in variant.items: if claim not in seen: seen.add(claim) merged.append(claim) return Claims(items=merged)
register_scripted("merge_claims", merge_claims)variants is [Claims, Claims, Claims] — one per generator. The merge deduplicates by claim text.
The complete pipeline
Section titled “The complete pipeline”"""Oracle Ensemble: 3 parallel generators + scripted merge.
Run: python 03_oracle_ensemble.py"""
from __future__ import annotations
import sysimport threading
from pydantic import BaseModel
from neograph import compile, construct_from_module, node, register_scripted, run
# -- Schemas ----------------------------------------------------------------
class Topic(BaseModel, frozen=True): text: str
class Claims(BaseModel, frozen=True): items: list[str]
# -- Generator perspectives -------------------------------------------------
_perspectives = [ ["security: must authenticate", "security: must encrypt"], ["reliability: must handle failures", "reliability: must log errors"], ["performance: must respond in 200ms", "security: must authenticate"],]
_gen_counter_lock = threading.Lock()_gen_counter = [0]
# -- Merge: combine and deduplicate claims from all generators --------------
def merge_claims(variants, config): """Merge N claim lists into one deduplicated list.""" seen = set() merged = [] for variant in variants: for claim in variant.items: if claim not in seen: seen.add(claim) merged.append(claim) return Claims(items=merged)
register_scripted("merge_claims", merge_claims)
# -- Build pipeline ---------------------------------------------------------
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: with _gen_counter_lock: idx = _gen_counter[0] % len(_perspectives) _gen_counter[0] += 1 return Claims(items=_perspectives[idx])
pipeline = construct_from_module(sys.modules[__name__], name="oracle-demo")
# -- Run --------------------------------------------------------------------
if __name__ == "__main__": _gen_counter[0] = 0 # reset for clean run graph = compile(pipeline) result = run(graph, input={"node_id": "REQ-001"})
merged = result["decompose"] print(f"3 generators produced {len(merged.items)} unique claims:") for claim in merged.items: print(f" - {claim}") # "security: must authenticate" appears in gen-0 and gen-2 but is deduplicatedExpected output
Section titled “Expected output”3 generators produced 5 unique claims: - security: must authenticate - security: must encrypt - reliability: must handle failures - reliability: must log errors - performance: must respond in 200msNote that “security: must authenticate” appears in both gen-0 and gen-2, but the merge function deduplicates it.
@node kwargs vs the | pipe operator
Section titled “@node kwargs vs the | pipe operator”The @node decorator accepts Oracle kwargs directly:
# @node style (this walkthrough)@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: ...This is equivalent to the pipe-operator style used in the Runtime/IR API:
# Pipe style (programmatic API)decompose = Node.scripted("decompose", fn="generate", output=Claims) | Oracle(n=3, merge_fn="merge_claims")Both compile to the same topology. Use @node kwargs when writing pipelines as modules. Use the pipe operator when building pipelines programmatically.
Scripted merge vs LLM merge
Section titled “Scripted merge vs LLM merge”Oracle requires exactly one of merge_fn or merge_prompt.
Scripted merge (deterministic)
Section titled “Scripted merge (deterministic)”@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")def decompose() -> Claims: ...Use scripted merge when the combination logic is straightforward (deduplication, union, voting).
LLM merge (judge)
Section titled “LLM merge (judge)”@node(output=Claims, prompt="decompose", model="reason", ensemble_n=3, merge_prompt="rw/decompose-merge", merge_model="reason")def decompose() -> Claims: ...The framework passes all generator outputs to an LLM with the given prompt template. The LLM produces the final merged result as structured output. Use this when merging requires judgment (picking the best analysis, synthesizing contradictory results).
What the compiler generates
Section titled “What the compiler generates”When it encounters ensemble_n=3, merge_fn="merge_claims", the compiler expands the single node into:
- Fan-out router — dispatches 3 parallel
Send()calls to the generator node, each with a differentneo_oracle_gen_id - Generator node — runs 3 times in parallel, writes to a collector field with a list reducer
- Merge barrier (
defer=True) — waits for all 3 generators to complete, then runs the merge function - Consumer field — the merged result is written to
state.decompose(the node’s name), not the collector
You never see this expansion. You write one @node with ensemble_n= and get the full topology.
Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.