LangGraph Time Travel: Replay and Branch Agent History

Use LangGraph's time travel API to replay past agent runs and branch alternate histories. Debug faster and test agent decisions without re-running from scratch.

Problem: You Can't Debug What You Can't Replay

Your LangGraph agent made a bad decision at step 4 of 12. To reproduce it, you re-run the whole graph — burning tokens, waiting for tool calls, and hoping the LLM reproduces the same path.

There's a better way. LangGraph's time travel API lets you rewind to any checkpoint, replay from that point, and branch off alternate executions without touching the original run.

You'll learn:

  • How LangGraph checkpoints work and what they store
  • How to list, inspect, and replay past states
  • How to branch a new execution from any historical checkpoint
  • A practical debugging workflow for flaky agents

Time: 20 min | Difficulty: Intermediate


How LangGraph Checkpointing Works

Every time a LangGraph graph transitions between nodes, the Checkpointer writes a snapshot of the full state to a store. Each snapshot gets a unique checkpoint_id and is linked to a thread_id (a single conversation or run).

thread_id: "run-42"

checkpoint_id: "c1"  →  node: __start__   state: {messages: [...]}
checkpoint_id: "c2"  →  node: agent        state: {messages: [...], tool_calls: [...]}
checkpoint_id: "c3"  →  node: tools        state: {messages: [...], tool_results: [...]}
checkpoint_id: "c4"  →  node: agent        state: {messages: [...]}  ← bad decision here

Time travel works by loading a past checkpoint and resuming the graph from that node — either replaying the same path or injecting a modified state to branch a new one.

Two backends are available:

BackendUse for
MemorySaverDevelopment and testing only — no persistence across restarts
AsyncPostgresSaver / SqliteSaverProduction — survives restarts, queryable

This article uses MemorySaver for examples. Swap in SqliteSaver for persistence with zero API changes.


Setup

Install LangGraph 0.2+ with checkpointing support:

pip install langgraph langchain-openai --break-system-packages

Verify:

python -c "import langgraph; print(langgraph.__version__)"
# 0.2.x

Step 1: Build a Graph with a Checkpointer

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator

# State schema — messages accumulate via operator.add
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    step_count: int

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def agent_node(state: AgentState) -> AgentState:
    response = llm.invoke(state["messages"])
    return {
        "messages": [response],
        "step_count": state["step_count"] + 1,
    }

def should_continue(state: AgentState) -> str:
    last = state["messages"][-1]
    # Stop after 3 turns or when the agent stops asking questions
    if state["step_count"] >= 3 or "DONE" in last.content:
        return "end"
    return "continue"

builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue, {
    "continue": "agent",
    "end": END,
})

# MemorySaver enables all time travel features
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

Every invocation now writes checkpoints automatically. No extra code needed in your nodes.


Step 2: Run the Graph and Capture Checkpoints

# thread_id groups all checkpoints for one run
config = {"configurable": {"thread_id": "debug-run-01"}}

initial_state = {
    "messages": [HumanMessage(content="Plan a 3-step research task on quantum computing.")],
    "step_count": 0,
}

result = graph.invoke(initial_state, config=config)
print(f"Final step count: {result['step_count']}")
print(f"Last message: {result['messages'][-1].content[:100]}")

Step 3: List All Checkpoints for a Thread

# get_state_history returns a generator of StateSnapshot objects
history = list(graph.get_state_history(config))

for snapshot in history:
    print(f"checkpoint_id : {snapshot.config['configurable']['checkpoint_id']}")
    print(f"  node        : {snapshot.next}")          # which node runs NEXT from here
    print(f"  step_count  : {snapshot.values['step_count']}")
    print(f"  msg count   : {len(snapshot.values['messages'])}")
    print()

Expected output:

checkpoint_id : 1ef4...
  node        : ()                  ← graph finished
  step_count  : 3
  msg count   : 4

checkpoint_id : 1ef3...
  node        : ('agent',)          ← agent would run next
  step_count  : 2
  msg count   : 3

checkpoint_id : 1ef2...
  node        : ('agent',)
  step_count  : 1
  msg count   : 2

checkpoint_id : 1ef1...
  node        : ('agent',)
  step_count  : 0
  msg count   : 1

history[0] is the most recent checkpoint. history[-1] is the initial state.


Step 4: Inspect a Specific Checkpoint

# Load state at a specific checkpoint without resuming
target_checkpoint_id = history[2].config["configurable"]["checkpoint_id"]

past_config = {
    "configurable": {
        "thread_id": "debug-run-01",
        "checkpoint_id": target_checkpoint_id,
    }
}

snapshot = graph.get_state(past_config)
print(f"State at checkpoint: step={snapshot.values['step_count']}")
print(f"Next node: {snapshot.next}")
print(f"Last message: {snapshot.values['messages'][-1].content[:200]}")

This is read-only. Nothing runs. Use this to inspect state before deciding whether to replay or branch.


Step 5: Replay from a Past Checkpoint

Replaying resumes the graph from a checkpoint using the same state — the LLM will generate a new response, so results may differ from the original run due to sampling.

# Resume execution from checkpoint — same state, new LLM calls
replay_config = {
    "configurable": {
        "thread_id": "debug-run-01",
        "checkpoint_id": target_checkpoint_id,
    }
}

# Pass None as input — the state is loaded from the checkpoint
replay_result = graph.invoke(None, config=replay_config)

print(f"Replayed to step: {replay_result['step_count']}")
print(f"New last message: {replay_result['messages'][-1].content[:100]}")

Key point: replay uses thread_id: "debug-run-01" so it overwrites the thread's later checkpoints. If you want to preserve the original, branch instead (Step 6).


Step 6: Branch a New History from a Checkpoint

Branching creates a new thread starting from a past checkpoint with a modified state. The original thread is untouched.

# Inject a modified state to change the agent's direction
modified_state = {
    "messages": snapshot.values["messages"] + [
        HumanMessage(content="Focus only on quantum error correction, ignore other topics.")
    ],
    "step_count": snapshot.values["step_count"],
}

# New thread_id = new branch — original run is preserved
branch_config = {"configurable": {"thread_id": "debug-run-01-branch-a"}}

# Update the new thread's state to start from our modified snapshot
graph.update_state(branch_config, modified_state)

# Resume from this new starting point
branch_result = graph.invoke(None, config=branch_config)

print(f"Branch completed at step: {branch_result['step_count']}")
print(f"Branch last message: {branch_result['messages'][-1].content[:100]}")

You now have two threads:

  • debug-run-01 — original run, intact
  • debug-run-01-branch-a — alternate history from step 2 with injected context

Step 7: Compare Branch Outputs

# List checkpoints for both threads to compare paths
original_history = list(graph.get_state_history(
    {"configurable": {"thread_id": "debug-run-01"}}
))

branch_history = list(graph.get_state_history(
    {"configurable": {"thread_id": "debug-run-01-branch-a"}}
))

print(f"Original run: {len(original_history)} checkpoints, {original_history[0].values['step_count']} steps")
print(f"Branch run  : {len(branch_history)} checkpoints, {branch_history[0].values['step_count']} steps")

Practical Debugging Workflow

Use this pattern when an agent behaves unexpectedly in production:

def debug_agent_run(graph, thread_id: str):
    """Find the first checkpoint where agent output looks wrong."""
    config = {"configurable": {"thread_id": thread_id}}
    history = list(graph.get_state_history(config))

    # Walk history oldest-first
    for snapshot in reversed(history):
        messages = snapshot.values.get("messages", [])
        if not messages:
            continue

        last = messages[-1]
        # Flag AI messages with suspiciously short content
        if hasattr(last, "content") and len(last.content) < 20:
            print(f"⚠️  Suspect checkpoint: {snapshot.config['configurable']['checkpoint_id']}")
            print(f"   Step: {snapshot.values['step_count']}")
            print(f"   Message: {last.content!r}")
            print(f"   Next node: {snapshot.next}")
            return snapshot

    print("No suspect checkpoints found.")
    return None

suspect = debug_agent_run(graph, "debug-run-01")

Once you find the bad checkpoint, branch from the one before it and inject corrected context.


Switching to a Persistent Backend

For production, replace MemorySaver with SqliteSaver — same API, survives restarts:

from langgraph.checkpoint.sqlite import SqliteSaver

# Writes to a local SQLite file — swap for AsyncPostgresSaver in prod
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

    # All the same time travel calls work identically
    config = {"configurable": {"thread_id": "prod-run-99"}}
    result = graph.invoke(initial_state, config=config)
    history = list(graph.get_state_history(config))

For async workloads with FastAPI or async LangGraph:

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async def setup():
    async with await AsyncPostgresSaver.from_conn_string(
        "postgresql://user:pass@localhost/checkpoints"
    ) as checkpointer:
        graph = builder.compile(checkpointer=checkpointer)
        # same API

Verification

Run through the full flow end-to-end:

python your_agent.py

Confirm you can:

# 1. Get history
history = list(graph.get_state_history(config))
assert len(history) > 1, "No checkpoints saved — checkpointer not attached"

# 2. Inspect a past state
snapshot = graph.get_state(history[1].config)
assert snapshot.values is not None

# 3. Branch from it
branch_cfg = {"configurable": {"thread_id": "verify-branch"}}
graph.update_state(branch_cfg, snapshot.values)
branch_result = graph.invoke(None, config=branch_cfg)
assert branch_result is not None

print("✅ Time travel working correctly")

What You Learned

  • LangGraph writes a StateSnapshot at every node transition — no extra code needed in your nodes
  • get_state_history() returns all checkpoints for a thread, newest first
  • Replaying from a checkpoint reuses the stored state but makes new LLM calls — outputs may differ
  • Branching with a new thread_id and update_state() preserves the original run entirely
  • MemorySaver is for development only — use SqliteSaver or AsyncPostgresSaver in production

Limitation: Time travel only works within a single graph. If your graph calls subgraphs, each subgraph maintains its own checkpoint stream — you need to traverse them separately.

When NOT to use this: If your agent executes irreversible side effects (sending emails, writing to a database), replaying from before those steps will re-execute them. Add idempotency guards or dry-run modes before using time travel in production workflows with side effects.

Tested on LangGraph 0.2.35, LangChain 0.3.x, Python 3.12, Ubuntu 24.04