Problem: You Can't Debug What You Can't Replay
Your LangGraph agent made a bad decision at step 4 of 12. To reproduce it, you re-run the whole graph — burning tokens, waiting for tool calls, and hoping the LLM reproduces the same path.
There's a better way. LangGraph's time travel API lets you rewind to any checkpoint, replay from that point, and branch off alternate executions without touching the original run.
You'll learn:
- How LangGraph checkpoints work and what they store
- How to list, inspect, and replay past states
- How to branch a new execution from any historical checkpoint
- A practical debugging workflow for flaky agents
Time: 20 min | Difficulty: Intermediate
How LangGraph Checkpointing Works
Every time a LangGraph graph transitions between nodes, the Checkpointer writes a snapshot of the full state to a store. Each snapshot gets a unique checkpoint_id and is linked to a thread_id (a single conversation or run).
thread_id: "run-42"
checkpoint_id: "c1" → node: __start__ state: {messages: [...]}
checkpoint_id: "c2" → node: agent state: {messages: [...], tool_calls: [...]}
checkpoint_id: "c3" → node: tools state: {messages: [...], tool_results: [...]}
checkpoint_id: "c4" → node: agent state: {messages: [...]} ← bad decision here
Time travel works by loading a past checkpoint and resuming the graph from that node — either replaying the same path or injecting a modified state to branch a new one.
Two backends are available:
| Backend | Use for |
|---|---|
MemorySaver | Development and testing only — no persistence across restarts |
AsyncPostgresSaver / SqliteSaver | Production — survives restarts, queryable |
This article uses MemorySaver for examples. Swap in SqliteSaver for persistence with zero API changes.
Setup
Install LangGraph 0.2+ with checkpointing support:
pip install langgraph langchain-openai --break-system-packages
Verify:
python -c "import langgraph; print(langgraph.__version__)"
# 0.2.x
Step 1: Build a Graph with a Checkpointer
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator
# State schema — messages accumulate via operator.add
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
step_count: int
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def agent_node(state: AgentState) -> AgentState:
response = llm.invoke(state["messages"])
return {
"messages": [response],
"step_count": state["step_count"] + 1,
}
def should_continue(state: AgentState) -> str:
last = state["messages"][-1]
# Stop after 3 turns or when the agent stops asking questions
if state["step_count"] >= 3 or "DONE" in last.content:
return "end"
return "continue"
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue, {
"continue": "agent",
"end": END,
})
# MemorySaver enables all time travel features
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
Every invocation now writes checkpoints automatically. No extra code needed in your nodes.
Step 2: Run the Graph and Capture Checkpoints
# thread_id groups all checkpoints for one run
config = {"configurable": {"thread_id": "debug-run-01"}}
initial_state = {
"messages": [HumanMessage(content="Plan a 3-step research task on quantum computing.")],
"step_count": 0,
}
result = graph.invoke(initial_state, config=config)
print(f"Final step count: {result['step_count']}")
print(f"Last message: {result['messages'][-1].content[:100]}")
Step 3: List All Checkpoints for a Thread
# get_state_history returns a generator of StateSnapshot objects
history = list(graph.get_state_history(config))
for snapshot in history:
print(f"checkpoint_id : {snapshot.config['configurable']['checkpoint_id']}")
print(f" node : {snapshot.next}") # which node runs NEXT from here
print(f" step_count : {snapshot.values['step_count']}")
print(f" msg count : {len(snapshot.values['messages'])}")
print()
Expected output:
checkpoint_id : 1ef4...
node : () ← graph finished
step_count : 3
msg count : 4
checkpoint_id : 1ef3...
node : ('agent',) ← agent would run next
step_count : 2
msg count : 3
checkpoint_id : 1ef2...
node : ('agent',)
step_count : 1
msg count : 2
checkpoint_id : 1ef1...
node : ('agent',)
step_count : 0
msg count : 1
history[0] is the most recent checkpoint. history[-1] is the initial state.
Step 4: Inspect a Specific Checkpoint
# Load state at a specific checkpoint without resuming
target_checkpoint_id = history[2].config["configurable"]["checkpoint_id"]
past_config = {
"configurable": {
"thread_id": "debug-run-01",
"checkpoint_id": target_checkpoint_id,
}
}
snapshot = graph.get_state(past_config)
print(f"State at checkpoint: step={snapshot.values['step_count']}")
print(f"Next node: {snapshot.next}")
print(f"Last message: {snapshot.values['messages'][-1].content[:200]}")
This is read-only. Nothing runs. Use this to inspect state before deciding whether to replay or branch.
Step 5: Replay from a Past Checkpoint
Replaying resumes the graph from a checkpoint using the same state — the LLM will generate a new response, so results may differ from the original run due to sampling.
# Resume execution from checkpoint — same state, new LLM calls
replay_config = {
"configurable": {
"thread_id": "debug-run-01",
"checkpoint_id": target_checkpoint_id,
}
}
# Pass None as input — the state is loaded from the checkpoint
replay_result = graph.invoke(None, config=replay_config)
print(f"Replayed to step: {replay_result['step_count']}")
print(f"New last message: {replay_result['messages'][-1].content[:100]}")
Key point: replay uses thread_id: "debug-run-01" so it overwrites the thread's later checkpoints. If you want to preserve the original, branch instead (Step 6).
Step 6: Branch a New History from a Checkpoint
Branching creates a new thread starting from a past checkpoint with a modified state. The original thread is untouched.
# Inject a modified state to change the agent's direction
modified_state = {
"messages": snapshot.values["messages"] + [
HumanMessage(content="Focus only on quantum error correction, ignore other topics.")
],
"step_count": snapshot.values["step_count"],
}
# New thread_id = new branch — original run is preserved
branch_config = {"configurable": {"thread_id": "debug-run-01-branch-a"}}
# Update the new thread's state to start from our modified snapshot
graph.update_state(branch_config, modified_state)
# Resume from this new starting point
branch_result = graph.invoke(None, config=branch_config)
print(f"Branch completed at step: {branch_result['step_count']}")
print(f"Branch last message: {branch_result['messages'][-1].content[:100]}")
You now have two threads:
debug-run-01— original run, intactdebug-run-01-branch-a— alternate history from step 2 with injected context
Step 7: Compare Branch Outputs
# List checkpoints for both threads to compare paths
original_history = list(graph.get_state_history(
{"configurable": {"thread_id": "debug-run-01"}}
))
branch_history = list(graph.get_state_history(
{"configurable": {"thread_id": "debug-run-01-branch-a"}}
))
print(f"Original run: {len(original_history)} checkpoints, {original_history[0].values['step_count']} steps")
print(f"Branch run : {len(branch_history)} checkpoints, {branch_history[0].values['step_count']} steps")
Practical Debugging Workflow
Use this pattern when an agent behaves unexpectedly in production:
def debug_agent_run(graph, thread_id: str):
"""Find the first checkpoint where agent output looks wrong."""
config = {"configurable": {"thread_id": thread_id}}
history = list(graph.get_state_history(config))
# Walk history oldest-first
for snapshot in reversed(history):
messages = snapshot.values.get("messages", [])
if not messages:
continue
last = messages[-1]
# Flag AI messages with suspiciously short content
if hasattr(last, "content") and len(last.content) < 20:
print(f"⚠️ Suspect checkpoint: {snapshot.config['configurable']['checkpoint_id']}")
print(f" Step: {snapshot.values['step_count']}")
print(f" Message: {last.content!r}")
print(f" Next node: {snapshot.next}")
return snapshot
print("No suspect checkpoints found.")
return None
suspect = debug_agent_run(graph, "debug-run-01")
Once you find the bad checkpoint, branch from the one before it and inject corrected context.
Switching to a Persistent Backend
For production, replace MemorySaver with SqliteSaver — same API, survives restarts:
from langgraph.checkpoint.sqlite import SqliteSaver
# Writes to a local SQLite file — swap for AsyncPostgresSaver in prod
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# All the same time travel calls work identically
config = {"configurable": {"thread_id": "prod-run-99"}}
result = graph.invoke(initial_state, config=config)
history = list(graph.get_state_history(config))
For async workloads with FastAPI or async LangGraph:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async def setup():
async with await AsyncPostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/checkpoints"
) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# same API
Verification
Run through the full flow end-to-end:
python your_agent.py
Confirm you can:
# 1. Get history
history = list(graph.get_state_history(config))
assert len(history) > 1, "No checkpoints saved — checkpointer not attached"
# 2. Inspect a past state
snapshot = graph.get_state(history[1].config)
assert snapshot.values is not None
# 3. Branch from it
branch_cfg = {"configurable": {"thread_id": "verify-branch"}}
graph.update_state(branch_cfg, snapshot.values)
branch_result = graph.invoke(None, config=branch_cfg)
assert branch_result is not None
print("✅ Time travel working correctly")
What You Learned
- LangGraph writes a
StateSnapshotat every node transition — no extra code needed in your nodes get_state_history()returns all checkpoints for a thread, newest first- Replaying from a checkpoint reuses the stored state but makes new LLM calls — outputs may differ
- Branching with a new
thread_idandupdate_state()preserves the original run entirely MemorySaveris for development only — useSqliteSaverorAsyncPostgresSaverin production
Limitation: Time travel only works within a single graph. If your graph calls subgraphs, each subgraph maintains its own checkpoint stream — you need to traverse them separately.
When NOT to use this: If your agent executes irreversible side effects (sending emails, writing to a database), replaying from before those steps will re-execute them. Add idempotency guards or dry-run modes before using time travel in production workflows with side effects.
Tested on LangGraph 0.2.35, LangChain 0.3.x, Python 3.12, Ubuntu 24.04