Troubleshoot LangGraph State Management in 20 Minutes

Fix common LangGraph workflow bugs: state not persisting, nodes overwriting each other, and checkpointer misconfiguration.

Problem: Your LangGraph Workflow State Is Broken

You built a multi-node LangGraph workflow, but nodes are either overwriting each other's state, changes aren't persisting between turns, or your graph silently drops data mid-run.

You'll learn:

  • Why LangGraph state updates fail silently and how to catch them
  • How to configure reducers so nodes don't clobber each other
  • How to wire up a checkpointer for persistent multi-turn memory

Time: 20 min | Level: Intermediate


Why This Happens

LangGraph uses a typed TypedDict as its state schema, and every node returns a partial state update — not the full state. If two nodes write to the same key without a reducer, the last write wins. If you want appending behavior (e.g., a message history), you need to tell LangGraph how to merge updates explicitly.

Persistence is a separate concern. Without a checkpointer, every graph invocation starts from scratch. This catches a lot of people off guard when building chatbots or agentic loops.

Common symptoms:

  • messages list only contains the last message, not the full history
  • A node's output disappears by the time it reaches the next node
  • Multi-turn conversations "forget" previous exchanges
  • KeyError or None when accessing state inside a node

Solution

Step 1: Define Your State Schema With Reducers

The most common mistake is defining state without a reducer for list fields.

from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

# ❌ Bad: last write wins — nodes will overwrite messages
class BadState(TypedDict):
    messages: list

# ✅ Good: add_messages reducer appends instead of replacing
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    context: str  # Single-value fields don't need a reducer

add_messages is LangGraph's built-in reducer for chat history. It handles both appending new messages and updating existing ones by message ID.

Expected: Your messages field accumulates across every node invocation in the graph.

If it fails:

  • TypeError: unhashable type: You're passing a raw list to an Annotated field — make sure you're returning {"messages": [HumanMessage(...)]}, not a flat string.
  • Messages duplicating: You're appending manually inside the node and the reducer is also appending. Return the new messages only; the reducer handles merging.

Step 2: Fix Node Return Values

Nodes must return a dict matching a subset of your state keys. A common bug is returning the full state object, which can cause unexpected overwrites.

from langchain_core.messages import AIMessage

# ❌ Bad: returns full state dict — risky if keys overlap unexpectedly
def bad_node(state: AgentState):
    reply = call_llm(state["messages"])
    state["messages"].append(reply)  # Mutating state directly
    return state

# ✅ Good: return only what this node is responsible for
def agent_node(state: AgentState):
    reply = call_llm(state["messages"])
    # Return just the new message — add_messages handles the merge
    return {"messages": [AIMessage(content=reply)]}

Never mutate the incoming state dict directly. LangGraph passes state by reference in some configurations, and mutations can cause subtle race conditions in async graphs.

Expected: Each node touches only its own keys, and the graph assembles the final state cleanly.


Step 3: Add a Checkpointer for Persistence

Without a checkpointer, your graph has no memory between .invoke() calls. Wire one up before compiling.

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver

# Build your graph
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.set_entry_point("agent")
builder.add_edge("agent", END)

# ✅ Attach checkpointer before compiling — this is the step people miss
memory = SqliteSaver.from_conn_string(":memory:")  # In-memory for dev
graph = builder.compile(checkpointer=memory)

For production, swap :memory: for a real SQLite path or use AsyncPostgresSaver if you need horizontal scaling.

Invoke with a thread ID so LangGraph knows which conversation to resume:

config = {"configurable": {"thread_id": "user-123-session-1"}}

# First turn
result = graph.invoke(
    {"messages": [HumanMessage(content="What's the capital of France?")]},
    config=config
)

# Second turn — graph automatically loads previous state for thread-123
result = graph.invoke(
    {"messages": [HumanMessage(content="And what's its population?")]},
    config=config
)

If it fails:

  • ValueError: Checkpointer not configured: You called .compile() without passing checkpointer=. Add it.
  • Second turn doesn't remember first: Check that thread_id is identical in both calls — it's case-sensitive and whitespace-sensitive.
  • sqlite3.OperationalError: database is locked: You have two graph instances sharing the same SQLite file. Use one shared instance per process or switch to Postgres.

Step 4: Debug State Mid-Graph With Streaming

If you can't tell where state is going wrong, stream the graph and inspect state after each node.

# Stream mode "values" emits full state after every node
for step in graph.stream(
    {"messages": [HumanMessage(content="Hello")]},
    config=config,
    stream_mode="values"
):
    print("--- State snapshot ---")
    for key, val in step.items():
        print(f"  {key}: {val}")

This makes it immediately obvious which node is dropping or overwriting data.

Expected: You see a state snapshot after each node, with messages growing as expected.


Verification

python -m pytest tests/test_graph.py -v

Write a minimal test that runs two turns and asserts the full message history is present:

def test_state_persists_across_turns():
    config = {"configurable": {"thread_id": "test-thread"}}
    
    graph.invoke({"messages": [HumanMessage(content="Hi")]}, config=config)
    result = graph.invoke({"messages": [HumanMessage(content="Remember me?")]}, config=config)
    
    messages = result["messages"]
    assert len(messages) >= 3  # HumanMessage, AIMessage, HumanMessage at minimum
    assert any("Hi" in str(m.content) for m in messages)

You should see: All tests pass, with message history accumulating correctly across turns.


What You Learned

  • State fields that need merging (like message lists) require a reducer via Annotated — without one, the last write wins silently.
  • Nodes should return only the keys they own, never mutate incoming state directly.
  • Persistence requires a checkpointer wired in at .compile() time, plus a consistent thread_id per conversation.
  • Streaming with stream_mode="values" is your best debugging tool for tracing state through a graph.

Limitation: SqliteSaver is single-process only. If you're running multiple workers, use AsyncPostgresSaver or LangGraph Platform's built-in persistence layer.

When NOT to use reducers: Single-value fields like a user_id or context string don't need them — returning a new value is the correct behavior.


Tested on LangGraph 0.2.x, LangChain 0.3.x, Python 3.12