LlamaIndex Workflows give you a first-class event-driven primitive for building agentic RAG systems that go beyond a single retrieve-then-generate call. Standard RAG breaks the moment a question requires multi-hop reasoning, tool use between retrieval steps, or dynamic routing based on what was retrieved. Workflows solve this by modeling your pipeline as a state machine where steps communicate through typed events.
This guide builds three progressively complex patterns: a routed single-agent RAG, a multi-agent RAG with specialized sub-retrievers, and a self-correcting critic loop. All examples run on Python 3.12 and LlamaIndex 0.11 (the llama-index-core split release).
You'll learn:
- How to model retrieval as typed events instead of function chains
- How to fan out to multiple specialized retrievers in parallel and merge results
- How to implement a critic-reflect loop that re-queries when confidence is low
Time: 30 min | Difficulty: Advanced
Why Standard RAG Pipelines Break at Scale
Single-pass RAG — embed query, retrieve top-k, stuff into prompt — works for simple Q&A. It fails when:
- The answer requires facts from two different document sets that must be reconciled
- Retrieved chunks contradict each other and the model silently picks one
- The retrieval step needs to call an external API based on what was retrieved
- A low-confidence answer should trigger a second, more targeted retrieval pass
The root problem is linearity. A chain executes left to right with no branching, no parallel execution, and no feedback from the generator back to the retriever.
LlamaIndex Workflows model your pipeline as a directed event graph. Each @step is an async function that receives one or more event types and emits one or more event types. The runtime handles concurrency, fan-out, and fan-in automatically.
Event-driven flow: QueryEvent fans out to parallel retrievers, MergeEvent combines chunks, CriticEvent triggers re-retrieval on low confidence.
Why This Approach Works
LlamaIndex Workflows borrow from actor-model concurrency. Each step is stateless — it receives an event, does work, and emits an event. State lives in Context, a typed key-value store scoped to the workflow run. This means:
- Steps can run in parallel when they share no data dependencies
- You can checkpoint and resume a run by serializing
Context - Debugging is deterministic — replay any run from its event log
Symptoms of pipelines that need Workflows:
- LangChain
SequentialChainwith 4+ links that all read from the samememoryobject - RAG that calls the same retriever twice with slightly different queries
- LLM calls wrapped in
try/exceptthat silently fall back to an empty string
Setup
Step 1: Install dependencies
# LlamaIndex split packages — only install what you need
pip install \
llama-index-core==0.11.* \
llama-index-llms-openai==0.3.* \
llama-index-embeddings-openai==0.2.* \
llama-index-vector-stores-chroma==0.2.* \
chromadb==0.5.* \
openai>=1.30
# Verify workflow runtime is available
python -c "from llama_index.core.workflow import Workflow, StartEvent, StopEvent; print('OK')"
Expected output: OK
Step 2: Create a shared config module
# config.py
import os
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
Settings.llm = OpenAI(
model="gpt-4o",
api_key=os.environ["OPENAI_API_KEY"],
temperature=0.1, # low temp for factual retrieval tasks
)
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-small",
dimensions=1536,
)
Settings.chunk_size = 512
Settings.chunk_overlap = 64
Pattern 1: Routed Single-Agent RAG
Step 3: Define typed events
Events are the API contract between steps. Define them before writing any step logic — it forces you to think about what data flows where.
# events.py
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore
from typing import list
class QueryClassifiedEvent(Event):
query: str
domain: str # "technical" | "policy" | "general"
class RetrievedEvent(Event):
query: str
nodes: list[NodeWithScore]
class AnswerEvent(Event):
answer: str
confidence: float # 0.0–1.0, extracted from LLM structured output
Step 4: Build the routed workflow
# routed_rag.py
import config # noqa: F401 — applies Settings globally
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
from llama_index.core import VectorStoreIndex
from llama_index.core.llms import LLM
from events import QueryClassifiedEvent, RetrievedEvent, AnswerEvent
class RoutedRAGWorkflow(Workflow):
def __init__(
self,
technical_index: VectorStoreIndex,
policy_index: VectorStoreIndex,
general_index: VectorStoreIndex,
llm: LLM,
**kwargs,
):
super().__init__(**kwargs)
self.retrievers = {
"technical": technical_index.as_retriever(similarity_top_k=5),
"policy": policy_index.as_retriever(similarity_top_k=5),
"general": general_index.as_retriever(similarity_top_k=3),
}
self.llm = llm
@step
async def classify_query(
self, ctx: Context, ev: StartEvent
) -> QueryClassifiedEvent:
query = ev.get("query", "")
# Single LLM call — cheaper than embedding-based routing
response = await self.llm.acomplete(
f"Classify this query into exactly one of: technical, policy, general.\n"
f"Query: {query}\nRespond with only the category word."
)
domain = response.text.strip().lower()
if domain not in ("technical", "policy", "general"):
domain = "general" # safe fallback
await ctx.set("original_query", query)
return QueryClassifiedEvent(query=query, domain=domain)
@step
async def retrieve(
self, ctx: Context, ev: QueryClassifiedEvent
) -> RetrievedEvent:
retriever = self.retrievers[ev.domain]
nodes = await retriever.aretrieve(ev.query)
return RetrievedEvent(query=ev.query, nodes=nodes)
@step
async def generate(
self, ctx: Context, ev: RetrievedEvent
) -> StopEvent:
context_str = "\n\n".join(n.get_content() for n in ev.nodes)
response = await self.llm.acomplete(
f"Context:\n{context_str}\n\nQuestion: {ev.query}\n\n"
f"Answer concisely. End your response with CONFIDENCE: 0.X where X is your confidence 0-9."
)
text = response.text
# Extract confidence without a second LLM call
conf = 0.7
if "CONFIDENCE:" in text:
try:
conf = float(text.split("CONFIDENCE:")[-1].strip()[:3])
except ValueError:
pass
answer = text.split("CONFIDENCE:")[0].strip()
return StopEvent(result={"answer": answer, "confidence": conf})
Step 5: Run it
# run_routed.py
import asyncio
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
import config # noqa: F401
from routed_rag import RoutedRAGWorkflow
async def main():
# In production, load from a persistent vector store
tech_docs = SimpleDirectoryReader("./docs/technical").load_data()
policy_docs = SimpleDirectoryReader("./docs/policy").load_data()
tech_index = VectorStoreIndex.from_documents(tech_docs)
policy_index = VectorStoreIndex.from_documents(policy_docs)
general_index = VectorStoreIndex.from_documents(tech_docs + policy_docs)
workflow = RoutedRAGWorkflow(
technical_index=tech_index,
policy_index=policy_index,
general_index=general_index,
llm=OpenAI(model="gpt-4o"),
timeout=60, # seconds — prevents hung workflows
verbose=True,
)
result = await workflow.run(query="What is the maximum API rate limit?")
print(result["answer"])
print(f"Confidence: {result['confidence']}")
asyncio.run(main())
Expected output:
The API rate limit is 10,000 requests per minute for Tier 2 accounts...
Confidence: 0.9
Pattern 2: Parallel Multi-Retriever RAG
When a question spans multiple domains simultaneously, sequential routing loses half the relevant context. The Workflows runtime supports fan-out: one step emits multiple events of the same type, and a collector step waits for all of them before proceeding.
Step 6: Add fan-out events
# events.py — additions
class SubQueryEvent(Event):
"""Emitted once per sub-retriever. The runtime collects all before MergeStep fires."""
sub_query: str
domain: str
class MergeEvent(Event):
query: str
all_nodes: list[NodeWithScore]
Step 7: Build the parallel workflow
# parallel_rag.py
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
from events import SubQueryEvent, MergeEvent
import config # noqa: F401
DOMAINS = ["technical", "policy", "legal"]
class ParallelRAGWorkflow(Workflow):
def __init__(self, indexes: dict, llm, **kwargs):
super().__init__(**kwargs)
self.retrievers = {d: idx.as_retriever(similarity_top_k=4) for d, idx in indexes.items()}
self.llm = llm
@step
async def decompose(self, ctx: Context, ev: StartEvent) -> SubQueryEvent:
query = ev.get("query", "")
await ctx.set("query", query)
await ctx.set("expected_count", len(DOMAINS))
# Emit one SubQueryEvent per domain — runtime fans out in parallel
for domain in DOMAINS:
ctx.send_event(SubQueryEvent(sub_query=query, domain=domain))
@step
async def sub_retrieve(self, ctx: Context, ev: SubQueryEvent) -> MergeEvent:
nodes = await self.retrievers[ev.domain].aretrieve(ev.sub_query)
# ctx.collect_events waits until expected_count events arrive
result = ctx.collect_events(ev, [SubQueryEvent] * await ctx.get("expected_count"))
if result is None:
return None # Still waiting for other domains
# All domains returned — merge and deduplicate by node ID
seen = set()
merged = []
for collected_ev in result:
for node in collected_ev.nodes if hasattr(collected_ev, "nodes") else nodes:
if node.node_id not in seen:
seen.add(node.node_id)
merged.append(node)
# Sort by score descending, keep top 10
merged.sort(key=lambda n: n.score or 0, reverse=True)
return MergeEvent(query=await ctx.get("query"), all_nodes=merged[:10])
@step
async def generate(self, ctx: Context, ev: MergeEvent) -> StopEvent:
context_str = "\n\n---\n\n".join(
f"[{n.node.metadata.get('domain', 'unknown')}]\n{n.get_content()}"
for n in ev.all_nodes
)
response = await self.llm.acomplete(
f"Using the following multi-domain context, answer the question.\n\n"
f"Context:\n{context_str}\n\nQuestion: {ev.query}"
)
return StopEvent(result={"answer": response.text, "node_count": len(ev.all_nodes)})
The key method is ctx.collect_events. It buffers incoming events of the specified types and returns None until the expected count arrives — at which point it returns all of them at once. This is how you implement a join gate without any shared mutable state.
Pattern 3: Critic-Reflect Self-Correction Loop
Low-confidence answers should trigger a targeted re-retrieval rather than returning a bad answer to the user. This pattern adds a critic step that inspects the generated answer, scores it, and either accepts it or emits a refined query back to the retrieval step.
Step 8: Add loop events
# events.py — additions
class CriticEvent(Event):
original_query: str
answer: str
confidence: float
attempt: int
class RefinedQueryEvent(Event):
refined_query: str
attempt: int
Step 9: Build the critic workflow
# critic_rag.py
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
from events import RetrievedEvent, CriticEvent, RefinedQueryEvent
import config # noqa: F401
MAX_ATTEMPTS = 3 # prevents infinite loops
CONFIDENCE_THRESHOLD = 0.75
class CriticRAGWorkflow(Workflow):
def __init__(self, index, llm, **kwargs):
super().__init__(**kwargs)
self.retriever = index.as_retriever(similarity_top_k=6)
self.llm = llm
@step
async def retrieve(
self, ctx: Context, ev: StartEvent | RefinedQueryEvent
) -> RetrievedEvent:
if isinstance(ev, StartEvent):
query = ev.get("query", "")
attempt = 1
await ctx.set("original_query", query)
else:
query = ev.refined_query
attempt = ev.attempt
nodes = await self.retriever.aretrieve(query)
await ctx.set("attempt", attempt)
return RetrievedEvent(query=query, nodes=nodes)
@step
async def generate(self, ctx: Context, ev: RetrievedEvent) -> CriticEvent:
context_str = "\n\n".join(n.get_content() for n in ev.nodes)
response = await self.llm.acomplete(
f"Context:\n{context_str}\n\nQuestion: {ev.query}\n\n"
f"Provide a factual answer. End with CONFIDENCE: 0.X"
)
text = response.text
conf = 0.5
if "CONFIDENCE:" in text:
try:
conf = float(text.split("CONFIDENCE:")[-1].strip()[:3])
except ValueError:
pass
return CriticEvent(
original_query=await ctx.get("original_query"),
answer=text.split("CONFIDENCE:")[0].strip(),
confidence=conf,
attempt=await ctx.get("attempt"),
)
@step
async def critic(
self, ctx: Context, ev: CriticEvent
) -> StopEvent | RefinedQueryEvent:
if ev.confidence >= CONFIDENCE_THRESHOLD or ev.attempt >= MAX_ATTEMPTS:
# Accept: confidence met, or we've exhausted retries
return StopEvent(result={
"answer": ev.answer,
"confidence": ev.confidence,
"attempts": ev.attempt,
})
# Reject: ask the LLM to rewrite the query to target the gap
refined = await self.llm.acomplete(
f"The following answer had low confidence ({ev.confidence}).\n"
f"Original question: {ev.original_query}\n"
f"Weak answer: {ev.answer}\n\n"
f"Write a more specific retrieval query that would find better evidence. "
f"Return only the query, no explanation."
)
return RefinedQueryEvent(
refined_query=refined.text.strip(),
attempt=ev.attempt + 1,
)
If it fails:
WorkflowTimeoutError→ Increasetimeout=on theWorkflowconstructor. The critic loop adds one full round-trip per retry; set at leasttimeout=120for 3 attempts.ctx.collect_events returns None forever→expected_countdoesn't match the number of events emitted. Addverbose=Trueto trace which events are queued.
Verification
python -c "
import asyncio
from critic_rag import CriticRAGWorkflow
from llama_index.core import VectorStoreIndex, Document
from llama_index.llms.openai import OpenAI
import config
async def test():
docs = [Document(text='The SLA uptime guarantee is 99.95% for Enterprise plans billed in USD.')]
index = VectorStoreIndex.from_documents(docs)
wf = CriticRAGWorkflow(index=index, llm=OpenAI(model='gpt-4o'), timeout=90, verbose=True)
result = await wf.run(query='What is the uptime guarantee?')
print(result)
asyncio.run(test())
"
You should see: A result dict with answer, confidence >= 0.75, and attempts: 1 (because the single relevant document gives high confidence on the first pass).
Workflow Comparison: LlamaIndex vs LangGraph
| LlamaIndex Workflows 0.11 | LangGraph 0.2 | |
|---|---|---|
| Primitive | Event class + @step | Node function + edge dict |
| Parallelism | Built-in via ctx.collect_events | Requires explicit send + conditional edges |
| State | Typed Context key-value store | TypedDict state passed through edges |
| Streaming | workflow.run_step() iterator | graph.stream() |
| Persistence | Custom WorkflowCheckpointer | LangGraph Platform ($99+/month USD) |
| Best for | RAG-heavy, event-driven pipelines | Complex agent graphs with many conditional branches |
LlamaIndex Workflows have less boilerplate for RAG-centric patterns because the event model maps naturally to retrieval fan-out. LangGraph gives you more control over conditional branching when you have many agent roles that hand off to each other.
What You Learned
StartEvent/StopEventare the entry and exit points; every step in between emits and receives typed eventsctx.collect_eventsis the join primitive — it buffers events until a count threshold is met, enabling parallel retrieval without shared mutable state- Union type annotations on
@stepinput (StartEvent | RefinedQueryEvent) let a step serve as the entry point for both initial queries and retry events, which is how you build loops without recursion - Set
MAX_ATTEMPTSon any critic loop — an uncapped loop will exhaust your LLM budget silently verbose=Trueon the Workflow constructor prints every event transition, which is the fastest debugging path
Tested on LlamaIndex 0.11.x, Python 3.12, OpenAI gpt-4o, Ubuntu 22.04 & macOS Sequoia
FAQ
Q: Do LlamaIndex Workflows work with local LLMs like Ollama?
A: Yes. Replace OpenAI(model="gpt-4o") with Ollama(model="llama3.3", base_url="http://localhost:11434") from llama-index-llms-ollama. The async acomplete calls work identically. Performance on the critic loop will depend on model quality — models under 13B often produce CONFIDENCE scores that are uncalibrated.
Q: How do I persist workflow state between runs?
A: Implement a WorkflowCheckpointer by serializing ctx to JSON after each step. LlamaIndex 0.11 does not include a built-in checkpointer — you hook into workflow.run_step() to yield control and snapshot state. For production use on AWS, store snapshots in S3 us-east-1 for sub-10ms read latency.
Q: What is the minimum context window needed for Pattern 2?
A: 10 merged nodes at 512 tokens each plus prompt overhead puts you around 6,000–7,000 tokens. GPT-4o (128k context) handles this fine. If you're using a model with a 4k or 8k context, reduce similarity_top_k to 2 per domain and increase chunk overlap to maintain coherence.
Q: Can I mix LlamaIndex Workflows with LangChain retrievers?
A: Yes. Wrap a LangChain retriever in a standard Python async function and call it inside a @step. The workflow runtime is retriever-agnostic — it only cares about the events a step emits, not what happens inside.