Build LlamaIndex Workflows: Complex Agentic RAG Patterns 2026

Build complex agentic RAG pipelines with LlamaIndex Workflows — multi-step retrieval, tool-calling agents, and state machines. Python 3.12 + LlamaIndex 0.11.

LlamaIndex Workflows give you a first-class event-driven primitive for building agentic RAG systems that go beyond a single retrieve-then-generate call. Standard RAG breaks the moment a question requires multi-hop reasoning, tool use between retrieval steps, or dynamic routing based on what was retrieved. Workflows solve this by modeling your pipeline as a state machine where steps communicate through typed events.

This guide builds three progressively complex patterns: a routed single-agent RAG, a multi-agent RAG with specialized sub-retrievers, and a self-correcting critic loop. All examples run on Python 3.12 and LlamaIndex 0.11 (the llama-index-core split release).

You'll learn:

  • How to model retrieval as typed events instead of function chains
  • How to fan out to multiple specialized retrievers in parallel and merge results
  • How to implement a critic-reflect loop that re-queries when confidence is low

Time: 30 min | Difficulty: Advanced


Why Standard RAG Pipelines Break at Scale

Single-pass RAG — embed query, retrieve top-k, stuff into prompt — works for simple Q&A. It fails when:

  • The answer requires facts from two different document sets that must be reconciled
  • Retrieved chunks contradict each other and the model silently picks one
  • The retrieval step needs to call an external API based on what was retrieved
  • A low-confidence answer should trigger a second, more targeted retrieval pass

The root problem is linearity. A chain executes left to right with no branching, no parallel execution, and no feedback from the generator back to the retriever.

LlamaIndex Workflows model your pipeline as a directed event graph. Each @step is an async function that receives one or more event types and emits one or more event types. The runtime handles concurrency, fan-out, and fan-in automatically.

LlamaIndex Workflows agentic RAG architecture: event-driven multi-step retrieval Event-driven flow: QueryEvent fans out to parallel retrievers, MergeEvent combines chunks, CriticEvent triggers re-retrieval on low confidence.


Why This Approach Works

LlamaIndex Workflows borrow from actor-model concurrency. Each step is stateless — it receives an event, does work, and emits an event. State lives in Context, a typed key-value store scoped to the workflow run. This means:

  • Steps can run in parallel when they share no data dependencies
  • You can checkpoint and resume a run by serializing Context
  • Debugging is deterministic — replay any run from its event log

Symptoms of pipelines that need Workflows:

  • LangChain SequentialChain with 4+ links that all read from the same memory object
  • RAG that calls the same retriever twice with slightly different queries
  • LLM calls wrapped in try/except that silently fall back to an empty string

Setup

Step 1: Install dependencies

# LlamaIndex split packages — only install what you need
pip install \
  llama-index-core==0.11.* \
  llama-index-llms-openai==0.3.* \
  llama-index-embeddings-openai==0.2.* \
  llama-index-vector-stores-chroma==0.2.* \
  chromadb==0.5.* \
  openai>=1.30
# Verify workflow runtime is available
python -c "from llama_index.core.workflow import Workflow, StartEvent, StopEvent; print('OK')"

Expected output: OK

Step 2: Create a shared config module

# config.py
import os
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

Settings.llm = OpenAI(
    model="gpt-4o",
    api_key=os.environ["OPENAI_API_KEY"],
    temperature=0.1,  # low temp for factual retrieval tasks
)
Settings.embed_model = OpenAIEmbedding(
    model="text-embedding-3-small",
    dimensions=1536,
)
Settings.chunk_size = 512
Settings.chunk_overlap = 64

Pattern 1: Routed Single-Agent RAG

Step 3: Define typed events

Events are the API contract between steps. Define them before writing any step logic — it forces you to think about what data flows where.

# events.py
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore
from typing import list

class QueryClassifiedEvent(Event):
    query: str
    domain: str  # "technical" | "policy" | "general"

class RetrievedEvent(Event):
    query: str
    nodes: list[NodeWithScore]

class AnswerEvent(Event):
    answer: str
    confidence: float  # 0.0–1.0, extracted from LLM structured output

Step 4: Build the routed workflow

# routed_rag.py
import config  # noqa: F401 — applies Settings globally
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
from llama_index.core import VectorStoreIndex
from llama_index.core.llms import LLM
from events import QueryClassifiedEvent, RetrievedEvent, AnswerEvent

class RoutedRAGWorkflow(Workflow):
    def __init__(
        self,
        technical_index: VectorStoreIndex,
        policy_index: VectorStoreIndex,
        general_index: VectorStoreIndex,
        llm: LLM,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.retrievers = {
            "technical": technical_index.as_retriever(similarity_top_k=5),
            "policy": policy_index.as_retriever(similarity_top_k=5),
            "general": general_index.as_retriever(similarity_top_k=3),
        }
        self.llm = llm

    @step
    async def classify_query(
        self, ctx: Context, ev: StartEvent
    ) -> QueryClassifiedEvent:
        query = ev.get("query", "")
        # Single LLM call — cheaper than embedding-based routing
        response = await self.llm.acomplete(
            f"Classify this query into exactly one of: technical, policy, general.\n"
            f"Query: {query}\nRespond with only the category word."
        )
        domain = response.text.strip().lower()
        if domain not in ("technical", "policy", "general"):
            domain = "general"  # safe fallback
        await ctx.set("original_query", query)
        return QueryClassifiedEvent(query=query, domain=domain)

    @step
    async def retrieve(
        self, ctx: Context, ev: QueryClassifiedEvent
    ) -> RetrievedEvent:
        retriever = self.retrievers[ev.domain]
        nodes = await retriever.aretrieve(ev.query)
        return RetrievedEvent(query=ev.query, nodes=nodes)

    @step
    async def generate(
        self, ctx: Context, ev: RetrievedEvent
    ) -> StopEvent:
        context_str = "\n\n".join(n.get_content() for n in ev.nodes)
        response = await self.llm.acomplete(
            f"Context:\n{context_str}\n\nQuestion: {ev.query}\n\n"
            f"Answer concisely. End your response with CONFIDENCE: 0.X where X is your confidence 0-9."
        )
        text = response.text
        # Extract confidence without a second LLM call
        conf = 0.7
        if "CONFIDENCE:" in text:
            try:
                conf = float(text.split("CONFIDENCE:")[-1].strip()[:3])
            except ValueError:
                pass
        answer = text.split("CONFIDENCE:")[0].strip()
        return StopEvent(result={"answer": answer, "confidence": conf})

Step 5: Run it

# run_routed.py
import asyncio
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
import config  # noqa: F401
from routed_rag import RoutedRAGWorkflow

async def main():
    # In production, load from a persistent vector store
    tech_docs = SimpleDirectoryReader("./docs/technical").load_data()
    policy_docs = SimpleDirectoryReader("./docs/policy").load_data()

    tech_index = VectorStoreIndex.from_documents(tech_docs)
    policy_index = VectorStoreIndex.from_documents(policy_docs)
    general_index = VectorStoreIndex.from_documents(tech_docs + policy_docs)

    workflow = RoutedRAGWorkflow(
        technical_index=tech_index,
        policy_index=policy_index,
        general_index=general_index,
        llm=OpenAI(model="gpt-4o"),
        timeout=60,  # seconds — prevents hung workflows
        verbose=True,
    )

    result = await workflow.run(query="What is the maximum API rate limit?")
    print(result["answer"])
    print(f"Confidence: {result['confidence']}")

asyncio.run(main())

Expected output:

The API rate limit is 10,000 requests per minute for Tier 2 accounts...
Confidence: 0.9

Pattern 2: Parallel Multi-Retriever RAG

When a question spans multiple domains simultaneously, sequential routing loses half the relevant context. The Workflows runtime supports fan-out: one step emits multiple events of the same type, and a collector step waits for all of them before proceeding.

Step 6: Add fan-out events

# events.py — additions
class SubQueryEvent(Event):
    """Emitted once per sub-retriever. The runtime collects all before MergeStep fires."""
    sub_query: str
    domain: str

class MergeEvent(Event):
    query: str
    all_nodes: list[NodeWithScore]

Step 7: Build the parallel workflow

# parallel_rag.py
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
from events import SubQueryEvent, MergeEvent
import config  # noqa: F401

DOMAINS = ["technical", "policy", "legal"]

class ParallelRAGWorkflow(Workflow):
    def __init__(self, indexes: dict, llm, **kwargs):
        super().__init__(**kwargs)
        self.retrievers = {d: idx.as_retriever(similarity_top_k=4) for d, idx in indexes.items()}
        self.llm = llm

    @step
    async def decompose(self, ctx: Context, ev: StartEvent) -> SubQueryEvent:
        query = ev.get("query", "")
        await ctx.set("query", query)
        await ctx.set("expected_count", len(DOMAINS))
        # Emit one SubQueryEvent per domain — runtime fans out in parallel
        for domain in DOMAINS:
            ctx.send_event(SubQueryEvent(sub_query=query, domain=domain))

    @step
    async def sub_retrieve(self, ctx: Context, ev: SubQueryEvent) -> MergeEvent:
        nodes = await self.retrievers[ev.domain].aretrieve(ev.sub_query)
        # ctx.collect_events waits until expected_count events arrive
        result = ctx.collect_events(ev, [SubQueryEvent] * await ctx.get("expected_count"))
        if result is None:
            return None  # Still waiting for other domains
        # All domains returned — merge and deduplicate by node ID
        seen = set()
        merged = []
        for collected_ev in result:
            for node in collected_ev.nodes if hasattr(collected_ev, "nodes") else nodes:
                if node.node_id not in seen:
                    seen.add(node.node_id)
                    merged.append(node)
        # Sort by score descending, keep top 10
        merged.sort(key=lambda n: n.score or 0, reverse=True)
        return MergeEvent(query=await ctx.get("query"), all_nodes=merged[:10])

    @step
    async def generate(self, ctx: Context, ev: MergeEvent) -> StopEvent:
        context_str = "\n\n---\n\n".join(
            f"[{n.node.metadata.get('domain', 'unknown')}]\n{n.get_content()}"
            for n in ev.all_nodes
        )
        response = await self.llm.acomplete(
            f"Using the following multi-domain context, answer the question.\n\n"
            f"Context:\n{context_str}\n\nQuestion: {ev.query}"
        )
        return StopEvent(result={"answer": response.text, "node_count": len(ev.all_nodes)})

The key method is ctx.collect_events. It buffers incoming events of the specified types and returns None until the expected count arrives — at which point it returns all of them at once. This is how you implement a join gate without any shared mutable state.


Pattern 3: Critic-Reflect Self-Correction Loop

Low-confidence answers should trigger a targeted re-retrieval rather than returning a bad answer to the user. This pattern adds a critic step that inspects the generated answer, scores it, and either accepts it or emits a refined query back to the retrieval step.

Step 8: Add loop events

# events.py — additions
class CriticEvent(Event):
    original_query: str
    answer: str
    confidence: float
    attempt: int

class RefinedQueryEvent(Event):
    refined_query: str
    attempt: int

Step 9: Build the critic workflow

# critic_rag.py
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
from events import RetrievedEvent, CriticEvent, RefinedQueryEvent
import config  # noqa: F401

MAX_ATTEMPTS = 3  # prevents infinite loops
CONFIDENCE_THRESHOLD = 0.75

class CriticRAGWorkflow(Workflow):
    def __init__(self, index, llm, **kwargs):
        super().__init__(**kwargs)
        self.retriever = index.as_retriever(similarity_top_k=6)
        self.llm = llm

    @step
    async def retrieve(
        self, ctx: Context, ev: StartEvent | RefinedQueryEvent
    ) -> RetrievedEvent:
        if isinstance(ev, StartEvent):
            query = ev.get("query", "")
            attempt = 1
            await ctx.set("original_query", query)
        else:
            query = ev.refined_query
            attempt = ev.attempt
        nodes = await self.retriever.aretrieve(query)
        await ctx.set("attempt", attempt)
        return RetrievedEvent(query=query, nodes=nodes)

    @step
    async def generate(self, ctx: Context, ev: RetrievedEvent) -> CriticEvent:
        context_str = "\n\n".join(n.get_content() for n in ev.nodes)
        response = await self.llm.acomplete(
            f"Context:\n{context_str}\n\nQuestion: {ev.query}\n\n"
            f"Provide a factual answer. End with CONFIDENCE: 0.X"
        )
        text = response.text
        conf = 0.5
        if "CONFIDENCE:" in text:
            try:
                conf = float(text.split("CONFIDENCE:")[-1].strip()[:3])
            except ValueError:
                pass
        return CriticEvent(
            original_query=await ctx.get("original_query"),
            answer=text.split("CONFIDENCE:")[0].strip(),
            confidence=conf,
            attempt=await ctx.get("attempt"),
        )

    @step
    async def critic(
        self, ctx: Context, ev: CriticEvent
    ) -> StopEvent | RefinedQueryEvent:
        if ev.confidence >= CONFIDENCE_THRESHOLD or ev.attempt >= MAX_ATTEMPTS:
            # Accept: confidence met, or we've exhausted retries
            return StopEvent(result={
                "answer": ev.answer,
                "confidence": ev.confidence,
                "attempts": ev.attempt,
            })
        # Reject: ask the LLM to rewrite the query to target the gap
        refined = await self.llm.acomplete(
            f"The following answer had low confidence ({ev.confidence}).\n"
            f"Original question: {ev.original_query}\n"
            f"Weak answer: {ev.answer}\n\n"
            f"Write a more specific retrieval query that would find better evidence. "
            f"Return only the query, no explanation."
        )
        return RefinedQueryEvent(
            refined_query=refined.text.strip(),
            attempt=ev.attempt + 1,
        )

If it fails:

  • WorkflowTimeoutError → Increase timeout= on the Workflow constructor. The critic loop adds one full round-trip per retry; set at least timeout=120 for 3 attempts.
  • ctx.collect_events returns None foreverexpected_count doesn't match the number of events emitted. Add verbose=True to trace which events are queued.

Verification

python -c "
import asyncio
from critic_rag import CriticRAGWorkflow
from llama_index.core import VectorStoreIndex, Document
from llama_index.llms.openai import OpenAI
import config

async def test():
    docs = [Document(text='The SLA uptime guarantee is 99.95% for Enterprise plans billed in USD.')]
    index = VectorStoreIndex.from_documents(docs)
    wf = CriticRAGWorkflow(index=index, llm=OpenAI(model='gpt-4o'), timeout=90, verbose=True)
    result = await wf.run(query='What is the uptime guarantee?')
    print(result)

asyncio.run(test())
"

You should see: A result dict with answer, confidence >= 0.75, and attempts: 1 (because the single relevant document gives high confidence on the first pass).


Workflow Comparison: LlamaIndex vs LangGraph

LlamaIndex Workflows 0.11LangGraph 0.2
PrimitiveEvent class + @stepNode function + edge dict
ParallelismBuilt-in via ctx.collect_eventsRequires explicit send + conditional edges
StateTyped Context key-value storeTypedDict state passed through edges
Streamingworkflow.run_step() iteratorgraph.stream()
PersistenceCustom WorkflowCheckpointerLangGraph Platform ($99+/month USD)
Best forRAG-heavy, event-driven pipelinesComplex agent graphs with many conditional branches

LlamaIndex Workflows have less boilerplate for RAG-centric patterns because the event model maps naturally to retrieval fan-out. LangGraph gives you more control over conditional branching when you have many agent roles that hand off to each other.


What You Learned

  • StartEvent / StopEvent are the entry and exit points; every step in between emits and receives typed events
  • ctx.collect_events is the join primitive — it buffers events until a count threshold is met, enabling parallel retrieval without shared mutable state
  • Union type annotations on @step input (StartEvent | RefinedQueryEvent) let a step serve as the entry point for both initial queries and retry events, which is how you build loops without recursion
  • Set MAX_ATTEMPTS on any critic loop — an uncapped loop will exhaust your LLM budget silently
  • verbose=True on the Workflow constructor prints every event transition, which is the fastest debugging path

Tested on LlamaIndex 0.11.x, Python 3.12, OpenAI gpt-4o, Ubuntu 22.04 & macOS Sequoia


FAQ

Q: Do LlamaIndex Workflows work with local LLMs like Ollama? A: Yes. Replace OpenAI(model="gpt-4o") with Ollama(model="llama3.3", base_url="http://localhost:11434") from llama-index-llms-ollama. The async acomplete calls work identically. Performance on the critic loop will depend on model quality — models under 13B often produce CONFIDENCE scores that are uncalibrated.

Q: How do I persist workflow state between runs? A: Implement a WorkflowCheckpointer by serializing ctx to JSON after each step. LlamaIndex 0.11 does not include a built-in checkpointer — you hook into workflow.run_step() to yield control and snapshot state. For production use on AWS, store snapshots in S3 us-east-1 for sub-10ms read latency.

Q: What is the minimum context window needed for Pattern 2? A: 10 merged nodes at 512 tokens each plus prompt overhead puts you around 6,000–7,000 tokens. GPT-4o (128k context) handles this fine. If you're using a model with a 4k or 8k context, reduce similarity_top_k to 2 per domain and increase chunk overlap to maintain coherence.

Q: Can I mix LlamaIndex Workflows with LangChain retrievers? A: Yes. Wrap a LangChain retriever in a standard Python async function and call it inside a @step. The workflow runtime is retriever-agnostic — it only cares about the events a step emits, not what happens inside.