5. Human-in-the-Loop
Some pipelines can’t run end-to-end without a human in the loop. A validation step finds a problem and needs review. An output is generated but must be approved before side effects. A privileged operation requires confirmation.
NeoGraph makes this a one-line addition: pass interrupt_when= to @node. When the condition returns a truthy value, the graph pauses via LangGraph’s interrupt() mechanism, checkpoints its state, and waits. Resume with run(graph, resume={...}, config=config).
The scenario
Section titled “The scenario”A pipeline analyzes a set of requirements and checks whether coverage meets a quality bar. If it doesn’t, a human must review the gaps before the final report is generated.
from __future__ import annotations
import sysfrom langgraph.checkpoint.memory import MemorySaverfrom pydantic import BaseModel
from neograph import compile, construct_from_module, node, run
# ── Schemas ──
class Analysis(BaseModel, frozen=True): claims: list[str] coverage_pct: int
class ValidationResult(BaseModel, frozen=True): passed: bool issues: list[str]
class FinalReport(BaseModel, frozen=True): text: str
# ── Pipeline ──
@node(output=Analysis)def analyze() -> Analysis: return Analysis(claims=["auth", "logging", "encryption"], coverage_pct=55)
@node( output=ValidationResult, interrupt_when=lambda state: ( {"issues": state.check.issues, "message": "Please review and approve"} if state.check and not state.check.passed else None ),)def check(analyze: Analysis) -> ValidationResult: if analyze.coverage_pct < 80: return ValidationResult( passed=False, issues=[f"Coverage {analyze.coverage_pct}% is below 80% threshold"], ) return ValidationResult(passed=True, issues=[])
@node(output=FinalReport)def report(analyze: Analysis) -> FinalReport: return FinalReport( text=f"Report: {analyze.claims}, coverage: {analyze.coverage_pct}%" )
pipeline = construct_from_module(sys.modules[__name__], name="review-pipeline")The interrupt_when kwarg
Section titled “The interrupt_when kwarg”interrupt_when accepts either a callable or a registered condition name:
- Callable (inline):
interrupt_when=lambda state: payload_or_none. The function receives the full pipeline state and returns eitherNone(continue) or a dict (pause, with the dict as the interrupt payload). - String (registered):
interrupt_when='condition_name'. The name must be registered viaregister_condition('condition_name', lambda state: ...)beforecompile().
The inline form is usually cleaner. When the condition returns a dict, the graph pauses and LangGraph’s interrupt() fires with that payload as the reason.
Running with a checkpointer
Section titled “Running with a checkpointer”Interrupt/resume requires a checkpointer — LangGraph needs somewhere to persist state between the pause and the resume. The compiler enforces this: calling compile(pipeline) on a pipeline with interrupt_when and no checkpointer raises an error.
from langgraph.checkpoint.memory import MemorySaver
graph = compile(pipeline, checkpointer=MemorySaver())
# thread_id identifies this execution for later resumeconfig = {"configurable": {"thread_id": "review-001"}}MemorySaver is fine for development. For production, use SqliteSaver, PostgresSaver, or any LangGraph-compatible checkpointer.
First run — pauses
Section titled “First run — pauses”result = run(graph, input={"node_id": "REQ-001"}, config=config)
if "__interrupt__" in result: interrupt_data = result["__interrupt__"] for interrupt in interrupt_data: print(f"Paused: {interrupt.value}") # Paused: {'issues': ['Coverage 55% is below 80% threshold'], # 'message': 'Please review and approve'}When the graph pauses, run() returns with the partial state plus an __interrupt__ key containing the interrupt payloads. Everything up to the pause point is already in the checkpoint — analyze ran, check ran and produced a failing ValidationResult, then the interrupt fired.
Resume with human feedback
Section titled “Resume with human feedback”result = run(graph, resume={"approved": True, "reviewer": "alice"}, config=config)
print(result["human_feedback"]) # {'approved': True, 'reviewer': 'alice'}print(result["report"].text) # The report ran after the resumeCalling run() with resume= instead of input= continues the paused graph. The resume dict is stored in state.human_feedback so downstream nodes can read the decision. The graph then continues from check onward — report runs, and you get the final state.
When the condition is falsy
Section titled “When the condition is falsy”If analyze.coverage_pct had been 85% instead of 55%, check would return passed=True, the lambda would return None, and the interrupt would never fire. The graph runs straight through to report as if interrupt_when weren’t there.
Why not Operator directly?
Section titled “Why not Operator directly?”The Operator modifier still exists for runtime construction. For pipelines written with @node, interrupt_when= is the cleaner path — the condition lambda is co-located with the node it guards, and the graph wiring happens automatically at construct_from_module time.
Key takeaways
Section titled “Key takeaways”interrupt_when=pauses the graph when the condition returns a truthy payload- Inline lambdas are cleaner than registered conditions for most cases
- A checkpointer is required —
MemorySaverfor dev, persistent savers for prod run(resume={...}, config=config)continues from the checkpoint- The resume payload lands in
state.human_feedback
Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.