Skip to content
Built by Postindustria. We help teams build agentic production systems.

5. Human-in-the-Loop

Some pipelines can’t run end-to-end without a human in the loop. A validation step finds a problem and needs review. An output is generated but must be approved before side effects. A privileged operation requires confirmation.

NeoGraph makes this a one-line addition: pass interrupt_when= to @node. When the condition returns a truthy value, the graph pauses via LangGraph’s interrupt() mechanism, checkpoints its state, and waits. Resume with run(graph, resume={...}, config=config).

A pipeline analyzes a set of requirements and checks whether coverage meets a quality bar. If it doesn’t, a human must review the gaps before the final report is generated.

from __future__ import annotations
import sys
from langgraph.checkpoint.memory import MemorySaver
from pydantic import BaseModel
from neograph import compile, construct_from_module, node, run
# ── Schemas ──
class Analysis(BaseModel, frozen=True):
claims: list[str]
coverage_pct: int
class ValidationResult(BaseModel, frozen=True):
passed: bool
issues: list[str]
class FinalReport(BaseModel, frozen=True):
text: str
# ── Pipeline ──
@node(output=Analysis)
def analyze() -> Analysis:
return Analysis(claims=["auth", "logging", "encryption"], coverage_pct=55)
@node(
output=ValidationResult,
interrupt_when=lambda state: (
{"issues": state.check.issues, "message": "Please review and approve"}
if state.check and not state.check.passed
else None
),
)
def check(analyze: Analysis) -> ValidationResult:
if analyze.coverage_pct < 80:
return ValidationResult(
passed=False,
issues=[f"Coverage {analyze.coverage_pct}% is below 80% threshold"],
)
return ValidationResult(passed=True, issues=[])
@node(output=FinalReport)
def report(analyze: Analysis) -> FinalReport:
return FinalReport(
text=f"Report: {analyze.claims}, coverage: {analyze.coverage_pct}%"
)
pipeline = construct_from_module(sys.modules[__name__], name="review-pipeline")

interrupt_when accepts either a callable or a registered condition name:

  • Callable (inline): interrupt_when=lambda state: payload_or_none. The function receives the full pipeline state and returns either None (continue) or a dict (pause, with the dict as the interrupt payload).
  • String (registered): interrupt_when='condition_name'. The name must be registered via register_condition('condition_name', lambda state: ...) before compile().

The inline form is usually cleaner. When the condition returns a dict, the graph pauses and LangGraph’s interrupt() fires with that payload as the reason.

Interrupt/resume requires a checkpointer — LangGraph needs somewhere to persist state between the pause and the resume. The compiler enforces this: calling compile(pipeline) on a pipeline with interrupt_when and no checkpointer raises an error.

from langgraph.checkpoint.memory import MemorySaver
graph = compile(pipeline, checkpointer=MemorySaver())
# thread_id identifies this execution for later resume
config = {"configurable": {"thread_id": "review-001"}}

MemorySaver is fine for development. For production, use SqliteSaver, PostgresSaver, or any LangGraph-compatible checkpointer.

result = run(graph, input={"node_id": "REQ-001"}, config=config)
if "__interrupt__" in result:
interrupt_data = result["__interrupt__"]
for interrupt in interrupt_data:
print(f"Paused: {interrupt.value}")
# Paused: {'issues': ['Coverage 55% is below 80% threshold'],
# 'message': 'Please review and approve'}

When the graph pauses, run() returns with the partial state plus an __interrupt__ key containing the interrupt payloads. Everything up to the pause point is already in the checkpoint — analyze ran, check ran and produced a failing ValidationResult, then the interrupt fired.

result = run(graph, resume={"approved": True, "reviewer": "alice"}, config=config)
print(result["human_feedback"]) # {'approved': True, 'reviewer': 'alice'}
print(result["report"].text) # The report ran after the resume

Calling run() with resume= instead of input= continues the paused graph. The resume dict is stored in state.human_feedback so downstream nodes can read the decision. The graph then continues from check onward — report runs, and you get the final state.

If analyze.coverage_pct had been 85% instead of 55%, check would return passed=True, the lambda would return None, and the interrupt would never fire. The graph runs straight through to report as if interrupt_when weren’t there.

The Operator modifier still exists for runtime construction. For pipelines written with @node, interrupt_when= is the cleaner path — the condition lambda is co-located with the node it guards, and the graph wiring happens automatically at construct_from_module time.

  • interrupt_when= pauses the graph when the condition returns a truthy payload
  • Inline lambdas are cleaner than registered conditions for most cases
  • A checkpointer is required — MemorySaver for dev, persistent savers for prod
  • run(resume={...}, config=config) continues from the checkpoint
  • The resume payload lands in state.human_feedback

Documentation © 2025-2026 Constantine Mirin, mirin.pro. Licensed under CC BY-ND 4.0.