Skip to content
Built by Postindustria. We help teams build agentic production systems.

3. Oracle Ensemble

An Oracle runs the same node multiple times in parallel — each instance gets a unique generator ID so it can produce a different perspective. A merge step combines the results into a single consensus output.

This is the ensemble pattern: multiple independent attempts at the same task, deduplicated or judged into one answer.

With @node, the Oracle modifier is applied via kwargs: ensemble_n= and merge_fn= on the decorator. No | pipe operator needed.

  • Applying the Oracle modifier via @node kwargs (ensemble_n=, merge_fn=)
  • Registering a scripted merge function with register_scripted()
  • Thread-safe generation across parallel instances
  • How the compiler expands Oracle into fan-out + barrier + merge
from pydantic import BaseModel
class Topic(BaseModel, frozen=True):
text: str
class Claims(BaseModel, frozen=True):
items: list[str]

Adding ensemble_n=3 and merge_fn="merge_claims" to @node tells the compiler to run this node 3 times in parallel, then merge the results:

from neograph import node, register_scripted
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims:
with _gen_counter_lock:
idx = _gen_counter[0] % len(_perspectives)
_gen_counter[0] += 1
return Claims(items=_perspectives[idx])

Each parallel instance calls the same function. A thread-safe counter rotates through perspectives so each generator returns different claims.

The merge function receives a list of all generator outputs and combines them:

def merge_claims(variants, config):
"""Merge N claim lists into one deduplicated list."""
seen = set()
merged = []
for variant in variants:
for claim in variant.items:
if claim not in seen:
seen.add(claim)
merged.append(claim)
return Claims(items=merged)
register_scripted("merge_claims", merge_claims)

variants is [Claims, Claims, Claims] — one per generator. The merge deduplicates by claim text.

"""Oracle Ensemble: 3 parallel generators + scripted merge.
Run:
python 03_oracle_ensemble.py
"""
from __future__ import annotations
import sys
import threading
from pydantic import BaseModel
from neograph import compile, construct_from_module, node, register_scripted, run
# -- Schemas ----------------------------------------------------------------
class Topic(BaseModel, frozen=True):
text: str
class Claims(BaseModel, frozen=True):
items: list[str]
# -- Generator perspectives -------------------------------------------------
_perspectives = [
["security: must authenticate", "security: must encrypt"],
["reliability: must handle failures", "reliability: must log errors"],
["performance: must respond in 200ms", "security: must authenticate"],
]
_gen_counter_lock = threading.Lock()
_gen_counter = [0]
# -- Merge: combine and deduplicate claims from all generators --------------
def merge_claims(variants, config):
"""Merge N claim lists into one deduplicated list."""
seen = set()
merged = []
for variant in variants:
for claim in variant.items:
if claim not in seen:
seen.add(claim)
merged.append(claim)
return Claims(items=merged)
register_scripted("merge_claims", merge_claims)
# -- Build pipeline ---------------------------------------------------------
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims:
with _gen_counter_lock:
idx = _gen_counter[0] % len(_perspectives)
_gen_counter[0] += 1
return Claims(items=_perspectives[idx])
pipeline = construct_from_module(sys.modules[__name__], name="oracle-demo")
# -- Run --------------------------------------------------------------------
if __name__ == "__main__":
_gen_counter[0] = 0 # reset for clean run
graph = compile(pipeline)
result = run(graph, input={"node_id": "REQ-001"})
merged = result["decompose"]
print(f"3 generators produced {len(merged.items)} unique claims:")
for claim in merged.items:
print(f" - {claim}")
# "security: must authenticate" appears in gen-0 and gen-2 but is deduplicated
3 generators produced 5 unique claims:
- security: must authenticate
- security: must encrypt
- reliability: must handle failures
- reliability: must log errors
- performance: must respond in 200ms

Note that “security: must authenticate” appears in both gen-0 and gen-2, but the merge function deduplicates it.

The @node decorator accepts Oracle kwargs directly:

# @node style (this walkthrough)
@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims: ...

This is equivalent to the pipe-operator style used in the Runtime/IR API:

# Pipe style (programmatic API)
decompose = Node.scripted("decompose", fn="generate", output=Claims) | Oracle(n=3, merge_fn="merge_claims")

Both compile to the same topology. Use @node kwargs when writing pipelines as modules. Use the pipe operator when building pipelines programmatically.

Oracle requires exactly one of merge_fn or merge_prompt.

@node(output=Claims, ensemble_n=3, merge_fn="merge_claims")
def decompose() -> Claims: ...

Use scripted merge when the combination logic is straightforward (deduplication, union, voting).

@node(output=Claims, prompt="decompose", model="reason",
ensemble_n=3, merge_prompt="rw/decompose-merge", merge_model="reason")
def decompose() -> Claims: ...

The framework passes all generator outputs to an LLM with the given prompt template. The LLM produces the final merged result as structured output. Use this when merging requires judgment (picking the best analysis, synthesizing contradictory results).

When it encounters ensemble_n=3, merge_fn="merge_claims", the compiler expands the single node into:

  1. Fan-out router — dispatches 3 parallel Send() calls to the generator node, each with a different neo_oracle_gen_id
  2. Generator node — runs 3 times in parallel, writes to a collector field with a list reducer
  3. Merge barrier (defer=True) — waits for all 3 generators to complete, then runs the merge function
  4. Consumer field — the merged result is written to state.decompose (the node’s name), not the collector

You never see this expansion. You write one @node with ensemble_n= and get the full topology.


Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.