Skip to content
Built by Postindustria. We help teams build agentic production systems.

Why Not Just LangGraph?

LangGraph is the runtime. NeoGraph is a compiler that targets it. The question is not “LangGraph or NeoGraph” — it is “do you want to write the wiring by hand, or define what you mean and let the compiler generate it?”

This page compares the five most common LLM pipeline patterns. Every NeoGraph example compiles to a standard LangGraph graph. There is no magic runtime, no custom executor. The difference is authoring: you define logic, not wiring.

PatternWhat NeoGraph eliminates
Sequential pipelineTypedDict, add_node x N, add_edge x N+1
Tool-calling agentRouter function, conditional edges, tool loop cycle
Map-reduce (fan-out/fan-in)2 state schemas, Send boilerplate, Annotated reducer
Human-in-the-loopInterrupt node, Command routing, two-phase invocation
Subgraph composition2 StateGraphs, 2 TypedDicts, manual state mapping wrapper

Three LLM steps in order: decompose a topic into claims, classify them, summarize.

from typing import TypedDict
from langgraph.graph import END, START, StateGraph
class PipelineState(TypedDict):
topic: str
claims: Claims | None
classified: ClassifiedClaims | None
summary: Summary | None
def decompose(state: PipelineState):
result = llm.with_structured_output(Claims).invoke(
f"Break this topic into 3-5 factual claims: {state['topic']}")
return {"claims": result}
def classify(state: PipelineState):
result = llm.with_structured_output(ClassifiedClaims).invoke(
f"Classify each claim by category: {state['claims']}")
return {"classified": result}
def summarize(state: PipelineState):
result = llm.with_structured_output(Summary).invoke(
f"Summarize these classified claims: {state['classified']}")
return {"summary": result}
graph = StateGraph(PipelineState)
graph.add_node("decompose", decompose)
graph.add_node("classify", classify)
graph.add_node("summarize", summarize)
graph.add_edge(START, "decompose")
graph.add_edge("decompose", "classify")
graph.add_edge("classify", "summarize")
graph.add_edge("summarize", END)
app = graph.compile()

The TypedDict must list every field. Every node is added individually. Every edge is added individually. The START and END edges are mandatory. For N nodes, you write N add_node calls and N+1 add_edge calls.

import sys
from neograph import node, construct_from_module, compile, run
@node(outputs=Claims, prompt='decompose', model='fast')
def decompose() -> Claims: ...
@node(outputs=ClassifiedClaims, prompt='classify', model='fast')
def classify(decompose: Claims) -> ClassifiedClaims: ...
@node(outputs=Summary, prompt='summarize', model='fast')
def summarize(classify: ClassifiedClaims) -> Summary: ...
pipeline = construct_from_module(sys.modules[__name__])
graph = compile(pipeline)
result = run(graph, input={'topic': 'microservice authentication'})

No TypedDict. No add_node. No add_edge. The parameter name decompose in classify(decompose: Claims) IS the edge. The compiler infers the state schema from type annotations and wires edges from the dependency graph.

An LLM that decides which tools to call, with a budget limit on tool usage.

from typing import Annotated, TypedDict
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
tools = [search_web]
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
def agent(state: AgentState):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
# This router is IDENTICAL in every ReAct agent
def should_continue(state: AgentState):
last = state["messages"][-1]
if last.tool_calls:
return "tools"
return END
graph = StateGraph(AgentState)
graph.add_node("agent", agent)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_continue, ["tools", END])
graph.add_edge("tools", "agent") # cycle back
app = graph.compile()

The should_continue router function is identical in every ReAct agent you build with LangGraph. It checks if the last message has tool calls and routes accordingly. This is pure boilerplate.

LangGraph also has no built-in concept of tool budgets. If you want to limit how many times the LLM can call a tool, you have to implement that yourself.

import sys
from neograph import node, construct_from_module, compile, run
from neograph import Tool
@node(outputs=ResearchResult, prompt='research', model='fast',
tools=[Tool("search_web", budget=3)])
def research() -> ResearchResult: ...
pipeline = construct_from_module(sys.modules[__name__])
graph = compile(pipeline)
result = run(graph, input={'query': 'latest AI research'})

One @node with tools=. The compiler infers agent mode from the presence of tools and generates the full ReAct loop: call LLM, check for tool calls, execute tools, loop back. The budget=3 on the Tool enforces a hard limit — after 3 calls, the tool is removed and the LLM is forced to produce a final structured response.

Run N parallel generators and merge the results. Classic ensemble pattern.

import operator
from typing import Annotated, TypedDict
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
# TWO state schemas needed
class OverallState(TypedDict):
topic: str
subjects: list[str]
jokes: Annotated[list[str], operator.add] # manual reducer
best_joke: str
class JokeState(TypedDict):
subject: str
def generate_subjects(state: OverallState):
result = llm.with_structured_output(Jokes).invoke(
f"List 3 subtopics of: {state['topic']}")
return {"subjects": result.items}
def generate_joke(state: JokeState):
result = llm.invoke(f"Write a short joke about {state['subject']}")
return {"jokes": [result.content]}
def fan_out_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
def pick_best(state: OverallState):
jokes_str = "\n".join(f"{i}: {j}" for i, j in enumerate(state["jokes"]))
result = llm.with_structured_output(BestJoke).invoke(
f"Pick the funniest joke:\n{jokes_str}")
return {"best_joke": state["jokes"][result.id]}
graph = StateGraph(OverallState)
graph.add_node("generate_subjects", generate_subjects)
graph.add_node("generate_joke", generate_joke)
graph.add_node("pick_best", pick_best)
graph.add_edge(START, "generate_subjects")
graph.add_conditional_edges("generate_subjects", fan_out_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "pick_best")
graph.add_edge("pick_best", END)
app = graph.compile()

This requires two state schemas (parent and per-worker), an Annotated reducer to merge parallel results, a Send() function for fan-out, and conditional_edges to wire the dispatch.

import sys
from neograph import node, construct_from_module, compile, run
@node(outputs=Jokes, prompt='generate', model='fast',
ensemble_n=3, merge_prompt='pick-best')
def generate() -> Jokes: ...
pipeline = construct_from_module(sys.modules[__name__])
graph = compile(pipeline)
result = run(graph, input={'topic': 'programming'})

ensemble_n=3 with merge_prompt='pick-best' tells the compiler to: (1) fan out 3 parallel executions of the node via Send(), (2) collect results in a deferred barrier, (3) merge with an LLM judge call using the "pick-best" prompt template. One keyword argument replaces the two-schema, reducer, Send, conditional-edges setup.

For scripted merging instead of LLM judging, use merge_fn instead of merge_prompt:

@node(outputs=Claims, prompt='decompose', model='reason',
ensemble_n=3, merge_fn='combine_variants')
def decompose() -> Claims: ...

Pause the graph for human approval, then resume.

from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt
class State(TypedDict):
topic: str
analysis: Analysis | None
human_approved: bool
report: str
def analyze(state: State):
result = llm.with_structured_output(Analysis).invoke(
f"Analyze '{state['topic']}'. List 3 claims, rate confidence 0-1.")
return {"analysis": result}
def review_gate(state: State):
if state["analysis"].confidence < 0.8:
decision = interrupt({
"message": f"Confidence {state['analysis'].confidence:.0%} is low. Approve?",
"claims": state["analysis"].claims,
})
return {"human_approved": decision.get("approved", False)}
return {"human_approved": True}
def route_after_review(state: State) -> Command:
if state["human_approved"]:
return Command(goto="report")
return Command(goto=END)
def report(state: State):
result = llm.with_structured_output(Report).invoke(
f"Write a brief report based on: {state['analysis']}")
return {"report": result.text}
memory = MemorySaver()
graph = StateGraph(State)
graph.add_node("analyze", analyze)
graph.add_node("review_gate", review_gate)
graph.add_node("route", route_after_review)
graph.add_node("report", report)
graph.add_edge(START, "analyze")
graph.add_edge("analyze", "review_gate")
graph.add_edge("review_gate", "route")
graph.add_edge("report", END)
app = graph.compile(checkpointer=memory)

You need a dedicated review_gate node just for the interrupt, a separate route_after_review node for Command routing, and two-phase invocation to handle the pause/resume cycle.

import sys
from neograph import node, construct_from_module, compile, run
@node(outputs=Analysis, prompt='analyze', model='fast',
interrupt_when=lambda s: (
{"message": f"Confidence {s.analyze.confidence:.0%} is low. Approve?"}
if s.analyze and s.analyze.confidence < 0.8
else None
))
def analyze() -> Analysis: ...
@node(outputs=Report, prompt='report', model='fast')
def report(analyze: Analysis) -> Report: ...
pipeline = construct_from_module(sys.modules[__name__])
graph = compile(pipeline, checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "demo"}}
result = run(graph, input={"topic": "microservice auth"}, config=config)
# If interrupted, resume:
# result = run(graph, resume={"approved": True}, config=config)

interrupt_when= tells the compiler to insert a check node after analyze. If the lambda returns a truthy value, the graph interrupts. No dedicated gate node, no Command routing, no separate router function.

A parent pipeline with a nested sub-pipeline that has isolated state.

from typing import TypedDict
from langgraph.graph import END, START, StateGraph
# --- CHILD GRAPH (separate state schema) ---
class EnrichState(TypedDict):
claims: Claims | None
scored: ScoredClaims | None
def enrich_lookup(state: EnrichState):
result = llm.with_structured_output(ScoredClaims).invoke(
f"Score each claim: {state['claims']}")
return {"scored": result}
child_graph = StateGraph(EnrichState)
child_graph.add_node("lookup", enrich_lookup)
child_graph.add_edge(START, "lookup")
child_graph.add_edge("lookup", END)
child = child_graph.compile()
# --- PARENT GRAPH (different state schema) ---
class ParentState(TypedDict):
topic: str
claims: Claims | None
enriched: ScoredClaims | None
report: str
def decompose(state: ParentState):
result = llm.with_structured_output(Claims).invoke(
f"Break '{state['topic']}' into 3-5 claims.")
return {"claims": result}
# MANUAL state mapping wrapper
def enrich_wrapper(state: ParentState):
child_result = child.invoke({"claims": state["claims"]})
return {"enriched": child_result["scored"]}
def report(state: ParentState):
result = llm.with_structured_output(Report).invoke(
f"Write a report from: {state['enriched']}")
return {"report": result.text}
parent_graph = StateGraph(ParentState)
parent_graph.add_node("decompose", decompose)
parent_graph.add_node("enrich", enrich_wrapper)
parent_graph.add_node("report", report)
parent_graph.add_edge(START, "decompose")
parent_graph.add_edge("decompose", "enrich")
parent_graph.add_edge("enrich", "report")
parent_graph.add_edge("report", END)
app = parent_graph.compile()

Two TypedDict schemas, two StateGraph builds, two compile() calls, and a manual wrapper function that translates between parent and child state.

import sys
from neograph import node, construct_from_module, compile, run
from neograph import Construct, Node
# Sub-construct with isolated state boundary
enrich = Construct(
"enrich",
input=Claims,
output=ScoredClaims,
nodes=[Node(name="score", mode="think", inputs=Claims, outputs=ScoredClaims, model="fast", prompt="score")],
)
@node(outputs=Claims, prompt='decompose', model='fast')
def decompose() -> Claims: ...
@node(outputs=Report, prompt='report', model='fast')
def report(enrich: ScoredClaims) -> Report: ...
# Mix @node functions with programmatic sub-constructs
pipeline = Construct("analysis", nodes=[decompose, enrich, report])
graph = compile(pipeline)
result = run(graph, input={'topic': 'microservice auth'})

A Construct with input/output types becomes an isolated subgraph automatically. No wrapper function, no manual state mapping, no second StateGraph. The compiler handles the state boundary and type routing. @node functions and programmatic sub-constructs compose freely.

NeoGraph compiles to LangGraph. Everything LangGraph provides at runtime still works:

  • Checkpointing — pass checkpointer= to compile(), same as LangGraph
  • Streaming — the compiled graph supports .stream() and .astream_events()
  • Visualization — call .get_graph().draw_mermaid() on the compiled graph
  • LangGraph Platform — deploy the compiled graph with LangGraph Cloud or self-hosted
  • Callbacks — Langfuse, LangSmith, or any LangChain callback handler flows through via config

NeoGraph removes the repetitive authoring. It does not change the runtime.


Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.