LangGraph Persistence: Checkpointing Long-Running Workflows

Add checkpointing to LangGraph workflows so agents survive crashes, support human-in-the-loop pauses, and resume from exact state. Production patterns included.

Problem: Your LangGraph Agent Loses All State on Failure

You run a multi-step LangGraph agent — it completes 8 of 12 nodes, then an LLM timeout kills the process. You restart from scratch. Hours of work, API costs, and time wasted.

The same problem hits human-in-the-loop flows: you need an agent to pause and wait for user approval, but LangGraph has no built-in way to persist state between a pause and a resume.

You'll learn:

  • How to wire a SqliteSaver or PostgresSaver checkpointer into any graph
  • How to resume a workflow from an exact node after a crash or pause
  • How to implement human-in-the-loop approval with interrupt_before

Time: 20 min | Difficulty: Intermediate


Why LangGraph State Dies Without a Checkpointer

By default, LangGraph holds the entire StateGraph in memory. No checkpointer = no persistence. The moment your process exits, the state is gone.

LangGraph's checkpointing system saves a snapshot of the graph state at every node transition. Each snapshot is tied to a thread_id — a string you choose. To resume, you pass the same thread_id back.

Symptoms of missing persistence:

  • Restarting an agent from node 1 after any failure
  • No way to pause and wait for async human input
  • Can't replay or audit what an agent did step-by-step

Solution

Step 1: Install Dependencies

# LangGraph 0.2+ includes checkpointing; langgraph-checkpoint-sqlite for local dev
pip install langgraph langgraph-checkpoint-sqlite

# For production PostgreSQL persistence
pip install langgraph-checkpoint-postgres psycopg[binary]

Verify:

python -c "import langgraph; print(langgraph.__version__)"

Expected: 0.2.x or higher.


Step 2: Build a Graph Without Persistence First

Start with a simple three-node graph so you can see exactly where the checkpointer slots in.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

class WorkflowState(TypedDict):
    task: str
    research: str
    draft: str
    approved: bool
    messages: Annotated[list, operator.add]  # append-only list

def research_node(state: WorkflowState) -> dict:
    # Simulates a slow LLM call — this is where crashes happen in production
    print(f"Researching: {state['task']}")
    return {"research": f"Research results for: {state['task']}"}

def draft_node(state: WorkflowState) -> dict:
    print("Writing draft...")
    return {"draft": f"Draft based on: {state['research']}"}

def review_node(state: WorkflowState) -> dict:
    print("Finalizing...")
    return {"approved": True}

# Build the graph — no checkpointer yet
builder = StateGraph(WorkflowState)
builder.add_node("research", research_node)
builder.add_node("draft", draft_node)
builder.add_node("review", review_node)

builder.set_entry_point("research")
builder.add_edge("research", "draft")
builder.add_edge("draft", "review")
builder.add_edge("review", END)

graph = builder.compile()  # No persistence — state dies with the process

Step 3: Add SqliteSaver for Local Development

from langgraph.checkpoint.sqlite import SqliteSaver

# SQLite writes to a local file — survives process restarts
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

# Compile the graph WITH the checkpointer
graph = builder.compile(checkpointer=checkpointer)

Now run it with a thread_id in the config:

config = {"configurable": {"thread_id": "workflow-run-001"}}

result = graph.invoke(
    {"task": "Write a report on LangGraph persistence", "messages": []},
    config=config
)
print(result["approved"])  # True

The key change: every node transition is now saved to checkpoints.db. If the process dies after research but before draft, you can resume from research's output — not from the start.


Step 4: Resume From a Checkpoint After a Crash

To resume, invoke the graph with None as input and the same thread_id. LangGraph fetches the latest checkpoint and continues from where it stopped.

# Simulate a crash mid-workflow by raising an exception in draft_node,
# then re-run with None input to resume

config = {"configurable": {"thread_id": "workflow-run-001"}}

# Resume — LangGraph replays from the last saved checkpoint
result = graph.invoke(None, config=config)
print(result)

To inspect what's saved before resuming:

# Get the latest checkpoint state for a thread
snapshot = graph.get_state(config)
print(snapshot.values)         # Current state dict
print(snapshot.next)           # Which nodes run next
print(snapshot.created_at)     # Timestamp of this checkpoint

If it fails:

  • KeyError: thread_id → You forgot "configurable" wrapper: use {"configurable": {"thread_id": "..."}}
  • No checkpoint found → The thread_id doesn't exist yet; pass full initial state on first run

Step 5: Add Human-in-the-Loop With interrupt_before

interrupt_before tells LangGraph to save state and halt before a specific node runs. The process can exit. When you resume later, the halted node executes.

# Halt before "review" so a human can inspect the draft
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["review"]  # List the node names to pause before
)

config = {"configurable": {"thread_id": "hitl-workflow-001"}}

# Run until the interrupt — stops before "review"
result = graph.invoke(
    {"task": "Quarterly AI report", "messages": []},
    config=config
)

print("Paused. Inspect draft:")
state = graph.get_state(config)
print(state.values["draft"])
print("Next nodes:", state.next)  # ('review',)

Now a human reviews the draft. When they approve, resume:

# Option A: Resume without modifying state — just continue
graph.invoke(None, config=config)

# Option B: Update state before resuming (e.g., edited draft)
graph.update_state(
    config,
    {"draft": "Human-edited draft content here"}
)
graph.invoke(None, config=config)

Step 6: Switch to PostgresSaver for Production

SQLite is single-writer — it breaks under concurrent workflows. For production, use PostgresSaver.

from langgraph.checkpoint.postgres import PostgresSaver
import psycopg

# Connection string from environment variable — never hardcode credentials
DB_URI = "postgresql://user:password@localhost:5432/langgraph_db"

with psycopg.connect(DB_URI, autocommit=True) as conn:
    checkpointer = PostgresSaver(conn)
    checkpointer.setup()  # Creates checkpoint tables on first run

    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "prod-workflow-001"}}
    result = graph.invoke(
        {"task": "Production run", "messages": []},
        config=config
    )

For async workloads (FastAPI, async agents), use AsyncPostgresSaver:

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import psycopg

async def run_workflow():
    async with await psycopg.AsyncConnection.connect(DB_URI) as conn:
        checkpointer = AsyncPostgresSaver(conn)
        await checkpointer.setup()

        graph = builder.compile(checkpointer=checkpointer)
        config = {"configurable": {"thread_id": "async-001"}}

        result = await graph.ainvoke(
            {"task": "Async production run", "messages": []},
            config=config
        )
        return result

Verification

# List all checkpoints for a thread — confirms persistence is working
config = {"configurable": {"thread_id": "workflow-run-001"}}

for checkpoint in graph.get_state_history(config):
    print(checkpoint.created_at, "→", checkpoint.next)

You should see one entry per node transition, ordered newest-first:

2026-03-10T14:22:01 → ()
2026-03-10T14:21:58 → ('review',)
2026-03-10T14:21:54 → ('draft',)
2026-03-10T14:21:50 → ('research',)

To replay from a specific historical checkpoint, pass its checkpoint_id:

# Time-travel: re-run from after the research node
historical_config = {
    "configurable": {
        "thread_id": "workflow-run-001",
        "checkpoint_id": "<id from get_state_history>"
    }
}
graph.invoke(None, config=historical_config)

Production Considerations

Thread ID strategy matters. Use meaningful, unique IDs: f"user-{user_id}-task-{task_id}". Random UUIDs make debugging impossible. Meaningful IDs let you query checkpoints by user or task.

Checkpoint bloat. Long-running threads accumulate hundreds of checkpoints. Prune old ones with a scheduled job — LangGraph doesn't auto-expire them. For Postgres, a simple DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days' is enough.

Don't store secrets in state. The entire StateGraph dict is serialized to the checkpoint store. API keys, tokens, and PII written into state fields end up in your database in plaintext.

interrupt_before vs interrupt_after. interrupt_before halts before the node runs — use this when you want a human to approve inputs. interrupt_after halts after the node runs — use this when you want a human to review outputs before the next step.


What You Learned

  • A checkpointer is the only thing separating a stateless graph from a resumable workflow
  • SqliteSaver is fine for local dev; PostgresSaver is required for concurrent production use
  • interrupt_before implements human-in-the-loop without polling or external queues
  • get_state_history enables time-travel debugging and full audit trails

Limitation: Checkpointing serializes state on every node transition. For graphs with large state payloads (e.g., embedded documents, image bytes), this adds latency and storage cost. Store large objects externally and keep only references in StateGraph state.

Tested on LangGraph 0.2.55, Python 3.12, PostgreSQL 16, Ubuntu 24.04