Problem: Your LangGraph Workflow State Is Broken
You built a multi-node LangGraph workflow, but nodes are either overwriting each other's state, changes aren't persisting between turns, or your graph silently drops data mid-run.
You'll learn:
- Why LangGraph state updates fail silently and how to catch them
- How to configure reducers so nodes don't clobber each other
- How to wire up a checkpointer for persistent multi-turn memory
Time: 20 min | Level: Intermediate
Why This Happens
LangGraph uses a typed TypedDict as its state schema, and every node returns a partial state update — not the full state. If two nodes write to the same key without a reducer, the last write wins. If you want appending behavior (e.g., a message history), you need to tell LangGraph how to merge updates explicitly.
Persistence is a separate concern. Without a checkpointer, every graph invocation starts from scratch. This catches a lot of people off guard when building chatbots or agentic loops.
Common symptoms:
messageslist only contains the last message, not the full history- A node's output disappears by the time it reaches the next node
- Multi-turn conversations "forget" previous exchanges
KeyErrororNonewhen accessing state inside a node
Solution
Step 1: Define Your State Schema With Reducers
The most common mistake is defining state without a reducer for list fields.
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
# ❌ Bad: last write wins — nodes will overwrite messages
class BadState(TypedDict):
messages: list
# ✅ Good: add_messages reducer appends instead of replacing
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
context: str # Single-value fields don't need a reducer
add_messages is LangGraph's built-in reducer for chat history. It handles both appending new messages and updating existing ones by message ID.
Expected: Your messages field accumulates across every node invocation in the graph.
If it fails:
TypeError: unhashable type: You're passing a raw list to anAnnotatedfield — make sure you're returning{"messages": [HumanMessage(...)]}, not a flat string.- Messages duplicating: You're appending manually inside the node and the reducer is also appending. Return the new messages only; the reducer handles merging.
Step 2: Fix Node Return Values
Nodes must return a dict matching a subset of your state keys. A common bug is returning the full state object, which can cause unexpected overwrites.
from langchain_core.messages import AIMessage
# ❌ Bad: returns full state dict — risky if keys overlap unexpectedly
def bad_node(state: AgentState):
reply = call_llm(state["messages"])
state["messages"].append(reply) # Mutating state directly
return state
# ✅ Good: return only what this node is responsible for
def agent_node(state: AgentState):
reply = call_llm(state["messages"])
# Return just the new message — add_messages handles the merge
return {"messages": [AIMessage(content=reply)]}
Never mutate the incoming state dict directly. LangGraph passes state by reference in some configurations, and mutations can cause subtle race conditions in async graphs.
Expected: Each node touches only its own keys, and the graph assembles the final state cleanly.
Step 3: Add a Checkpointer for Persistence
Without a checkpointer, your graph has no memory between .invoke() calls. Wire one up before compiling.
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
# Build your graph
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.set_entry_point("agent")
builder.add_edge("agent", END)
# ✅ Attach checkpointer before compiling — this is the step people miss
memory = SqliteSaver.from_conn_string(":memory:") # In-memory for dev
graph = builder.compile(checkpointer=memory)
For production, swap :memory: for a real SQLite path or use AsyncPostgresSaver if you need horizontal scaling.
Invoke with a thread ID so LangGraph knows which conversation to resume:
config = {"configurable": {"thread_id": "user-123-session-1"}}
# First turn
result = graph.invoke(
{"messages": [HumanMessage(content="What's the capital of France?")]},
config=config
)
# Second turn — graph automatically loads previous state for thread-123
result = graph.invoke(
{"messages": [HumanMessage(content="And what's its population?")]},
config=config
)
If it fails:
ValueError: Checkpointer not configured: You called.compile()without passingcheckpointer=. Add it.- Second turn doesn't remember first: Check that
thread_idis identical in both calls — it's case-sensitive and whitespace-sensitive. sqlite3.OperationalError: database is locked: You have two graph instances sharing the same SQLite file. Use one shared instance per process or switch to Postgres.
Step 4: Debug State Mid-Graph With Streaming
If you can't tell where state is going wrong, stream the graph and inspect state after each node.
# Stream mode "values" emits full state after every node
for step in graph.stream(
{"messages": [HumanMessage(content="Hello")]},
config=config,
stream_mode="values"
):
print("--- State snapshot ---")
for key, val in step.items():
print(f" {key}: {val}")
This makes it immediately obvious which node is dropping or overwriting data.
Expected: You see a state snapshot after each node, with messages growing as expected.
Verification
python -m pytest tests/test_graph.py -v
Write a minimal test that runs two turns and asserts the full message history is present:
def test_state_persists_across_turns():
config = {"configurable": {"thread_id": "test-thread"}}
graph.invoke({"messages": [HumanMessage(content="Hi")]}, config=config)
result = graph.invoke({"messages": [HumanMessage(content="Remember me?")]}, config=config)
messages = result["messages"]
assert len(messages) >= 3 # HumanMessage, AIMessage, HumanMessage at minimum
assert any("Hi" in str(m.content) for m in messages)
You should see: All tests pass, with message history accumulating correctly across turns.
What You Learned
- State fields that need merging (like message lists) require a reducer via
Annotated— without one, the last write wins silently. - Nodes should return only the keys they own, never mutate incoming state directly.
- Persistence requires a checkpointer wired in at
.compile()time, plus a consistentthread_idper conversation. - Streaming with
stream_mode="values"is your best debugging tool for tracing state through a graph.
Limitation: SqliteSaver is single-process only. If you're running multiple workers, use AsyncPostgresSaver or LangGraph Platform's built-in persistence layer.
When NOT to use reducers: Single-value fields like a user_id or context string don't need them — returning a new value is the correct behavior.
Tested on LangGraph 0.2.x, LangChain 0.3.x, Python 3.12