LangGraph Parallel Execution: Fan-Out and Fan-In Patterns

Run LangGraph nodes in parallel with fan-out and fan-in. Cut multi-step agent latency by 60% using Send API, async nodes, and state merging.

Problem: Your LangGraph Agent Is Running Steps One at a Time

You built a LangGraph agent that calls three tools, summarizes five documents, or runs multiple LLM checks — and it's slow. Each node waits for the previous one to finish, even when the tasks are completely independent.

Sequential execution is the default. Parallel execution takes two extra concepts: fan-out (split work across nodes) and fan-in (merge results back).

You'll learn:

  • How LangGraph's graph execution model enables parallelism
  • Fan-out with static edges and dynamic Send API
  • Fan-in with reducer functions that merge parallel state
  • A full working example: parallel document analysis with result aggregation

Time: 20 min | Difficulty: Intermediate


Why LangGraph Runs Nodes Sequentially by Default

LangGraph executes nodes in topological order. If node B depends on node A's output, B waits. That's correct — but if B, C, and D all depend on A and not on each other, there's no reason they can't run at the same time.

LangGraph supports this natively. When multiple edges leave a single node and lead to independent nodes, the runtime runs those target nodes in parallel within the same superstep.

The catch: your state must handle concurrent writes. Two nodes writing to the same key at the same time will overwrite each other unless you define a reducer.


Core Concepts Before the Code

Supersteps

LangGraph processes the graph in supersteps. Within one superstep, all nodes that are ready to run (no pending dependencies) execute together. Fan-out happens inside a single superstep.

Superstep 1: [router_node]
Superstep 2: [analyst_node, summarizer_node, validator_node]  ← parallel
Superstep 3: [aggregator_node]

Reducers

A reducer tells LangGraph how to combine two writes to the same state key. Without one, the last write wins — which loses data in parallel scenarios.

from typing import Annotated
from operator import add

class State(TypedDict):
    # Without reducer: last write wins (wrong for parallel)
    result: str

    # With reducer: both parallel writes are combined
    results: Annotated[list[str], add]

The add operator concatenates lists. You can write custom reducers for any merge logic.

The Send API

Static fan-out (one node to many fixed nodes) covers simple cases. For dynamic fan-out — where you don't know how many parallel branches you need until runtime — use Send.

Send lets a node emit multiple messages that each kick off an independent subgraph run, like map over an async task queue.


Solution

Step 1: Set Up the Project

pip install langgraph langchain-openai python-dotenv
# .env
OPENAI_API_KEY=sk-...

Step 2: Define State with Reducers

This is the most important step. Get the state wrong and parallel writes will silently overwrite each other.

from typing import Annotated, TypedDict
from operator import add

class DocumentState(TypedDict):
    # Input
    documents: list[str]

    # Written by multiple parallel nodes — needs a reducer
    analyses: Annotated[list[dict], add]

    # Written once by the aggregator — no reducer needed
    final_summary: str

analyses uses add as its reducer. When two parallel nodes each append [{"doc": 1, "result": "..."}], LangGraph concatenates both lists into one. Without the Annotated[list[dict], add], the second write would erase the first.

Step 3: Build the Parallel Analysis Nodes

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def analyze_sentiment(state: DocumentState) -> dict:
    """Runs in parallel — analyzes sentiment across all docs."""
    docs = "\n---\n".join(state["documents"])
    response = llm.invoke(f"Rate overall sentiment (positive/negative/neutral) for:\n{docs}")
    return {"analyses": [{"type": "sentiment", "result": response.content}]}

def extract_topics(state: DocumentState) -> dict:
    """Runs in parallel — extracts key topics."""
    docs = "\n---\n".join(state["documents"])
    response = llm.invoke(f"List the top 5 topics covered in:\n{docs}")
    return {"analyses": [{"type": "topics", "result": response.content}]}

def check_consistency(state: DocumentState) -> dict:
    """Runs in parallel — flags contradictions between docs."""
    docs = "\n---\n".join(state["documents"])
    response = llm.invoke(f"Identify any factual contradictions between these documents:\n{docs}")
    return {"analyses": [{"type": "consistency", "result": response.content}]}

def aggregate_results(state: DocumentState) -> dict:
    """Fan-in node — runs after all parallel nodes finish."""
    analyses_text = "\n\n".join(
        f"[{a['type'].upper()}]\n{a['result']}" for a in state["analyses"]
    )
    response = llm.invoke(
        f"Synthesize these analysis results into a final executive summary:\n\n{analyses_text}"
    )
    return {"final_summary": response.content}

Step 4: Wire the Graph with Static Fan-Out

builder = StateGraph(DocumentState)

# Add all nodes
builder.add_node("analyze_sentiment", analyze_sentiment)
builder.add_node("extract_topics", extract_topics)
builder.add_node("check_consistency", check_consistency)
builder.add_node("aggregate_results", aggregate_results)

# Fan-out: START connects to all three analysis nodes
# LangGraph runs all three in the same superstep
builder.add_edge(START, "analyze_sentiment")
builder.add_edge(START, "extract_topics")
builder.add_edge(START, "check_consistency")

# Fan-in: all three connect to the aggregator
# Aggregator only runs after all three finish
builder.add_edge("analyze_sentiment", "aggregate_results")
builder.add_edge("extract_topics", "aggregate_results")
builder.add_edge("check_consistency", "aggregate_results")

builder.add_edge("aggregate_results", END)

graph = builder.compile()

Expected output when you print the graph:

START → [analyze_sentiment, extract_topics, check_consistency] → aggregate_results → END

Step 5: Run It

result = graph.invoke({
    "documents": [
        "Q1 revenue grew 12% YoY driven by enterprise sales.",
        "Customer churn increased 3% in Q1 due to pricing changes.",
        "The new pricing model launched in January 2026."
    ],
    "analyses": [],
    "final_summary": ""
})

print(result["final_summary"])

Expected: The three parallel LLM calls complete before aggregate_results runs. Wall-clock time is roughly the slowest single call, not the sum of all three.


Dynamic Fan-Out with the Send API

Static fan-out works when you know your nodes at graph-build time. Use Send when you need to spawn N parallel workers at runtime — for example, one worker per document in a list of unknown length.

Step 6: Rewrite with Send for Per-Document Parallelism

from langgraph.types import Send

class PerDocState(TypedDict):
    # Input list
    documents: list[str]

    # Collected from all per-doc workers
    doc_summaries: Annotated[list[str], add]

    final_report: str

def route_documents(state: PerDocState) -> list[Send]:
    """Returns one Send per document — creates N parallel branches."""
    return [
        Send("summarize_document", {"document": doc, "doc_summaries": [], "final_report": ""})
        for doc in state["documents"]
    ]

def summarize_document(state: PerDocState) -> dict:
    """Runs once per document, all in parallel."""
    response = llm.invoke(f"Summarize in 2 sentences:\n{state['document']}")
    return {"doc_summaries": [response.content]}

def compile_report(state: PerDocState) -> dict:
    """Fan-in: collects all per-doc summaries."""
    combined = "\n".join(f"- {s}" for s in state["doc_summaries"])
    response = llm.invoke(f"Write a cohesive report from these summaries:\n{combined}")
    return {"final_report": response.content}

builder = StateGraph(PerDocState)
builder.add_node("summarize_document", summarize_document)
builder.add_node("compile_report", compile_report)

# route_documents is a conditional edge that returns Send objects
builder.add_conditional_edges(START, route_documents, ["summarize_document"])
builder.add_edge("summarize_document", "compile_report")
builder.add_edge("compile_report", END)

graph = builder.compile()

Now if you pass 10 documents, LangGraph spawns 10 parallel summarize_document runs. Pass 100, get 100. The graph adapts at runtime.

result = graph.invoke({
    "documents": [f"Document {i}: Some content about topic {i}." for i in range(10)],
    "doc_summaries": [],
    "final_report": ""
})

Custom Reducers for Non-List State

add works for lists. For other merge strategies, write your own reducer.

def merge_dicts(left: dict, right: dict) -> dict:
    """Merges two dicts; right values win on key conflict."""
    return {**left, **right}

def keep_highest_score(left: float, right: float) -> float:
    """Fan-in keeps the highest confidence score from parallel classifiers."""
    return max(left, right)

class ScoredState(TypedDict):
    metadata: Annotated[dict, merge_dicts]
    confidence: Annotated[float, keep_highest_score]

Verification

# Time sequential vs parallel to confirm the speedup
python -c "
import time
from your_module import graph

start = time.time()
result = graph.invoke({'documents': ['doc1', 'doc2', 'doc3'], 'analyses': [], 'final_summary': ''})
print(f'Wall time: {time.time() - start:.2f}s')
print(result['final_summary'][:200])
"

You should see: 3–4 seconds for three parallel GPT-4o-mini calls, versus 9–12 seconds if run sequentially. The final_summary key should be populated.

To confirm parallel execution is actually happening, enable LangGraph debug tracing:

graph = builder.compile()

# Stream events to see superstep boundaries
for event in graph.stream(initial_state, stream_mode="debug"):
    print(event)

Look for "type": "task" events with the same "step" number — those ran in the same superstep (in parallel).


Production Considerations

Rate limits: Parallel LLM calls hit your API rate limit faster. If you fan out to 20 nodes that each call GPT-4o, you burn 20x the TPM budget in one superstep. Add exponential backoff or throttle with a semaphore inside node functions.

Error handling: If one parallel branch raises an exception, LangGraph cancels the superstep and surfaces the error. Wrap node logic in try/except and return a sentinel value so the fan-in node can handle partial failures gracefully.

Token costs: Parallelism trades latency for cost — you're running more LLM calls, not fewer. Profile with langsmith tracing to see per-node token usage before scaling fan-out width.

Checkpointing: LangGraph's checkpointer (MemorySaver, SqliteSaver) saves state between supersteps. This means interrupted parallel runs can resume from the last completed superstep, not from scratch.


What You Learned

  • LangGraph parallelizes nodes automatically when multiple edges leave a single node with no shared dependencies
  • State keys that receive parallel writes must use a reducer — otherwise you silently lose data
  • Static fan-out (direct edges) is right for a fixed number of parallel branches
  • The Send API handles dynamic fan-out when the number of parallel workers is unknown at build time
  • Fan-in is just a regular node — it runs after all upstream parallel nodes complete

When not to use this: Don't parallelize nodes that have implicit ordering dependencies (e.g., node B reads a file that node A writes). LangGraph won't detect that dependency — it only tracks explicit edges.

Tested on LangGraph 0.2.x, LangChain 0.3.x, Python 3.12, gpt-4o-mini