Problem: Multi-Agent Pipelines Are Hard to Control
You've built a CrewAI crew. It runs — but you can't control what happens between steps. You can't branch on a condition, pause mid-pipeline, or trigger a crew only when a previous step succeeds. You end up duct-taping logic with raw Python around your crew calls.
CrewAI Flows solves this. It gives you a structured, event-driven execution engine that sits above individual crews and tasks.
You'll learn:
- How Flows differ from Crews and when to use each
- How to wire steps with
@start,@listen, and@routerdecorators - How to manage shared state across steps with
FlowState - How to embed a full Crew inside a Flow step
Time: 25 min | Difficulty: Intermediate
Why Flows Exist
A Crew is great at parallel collaboration — multiple agents working on a shared goal. But Crews don't give you sequential control. You can't easily say: "Run step A, check the result, then choose between step B or step C."
Flows are an orchestration layer. They let you:
- Chain steps in explicit sequence
- Branch conditionally with routers
- Maintain typed state across all steps
- Trigger Crews as sub-units inside a larger pipeline
Think of it this way: a Crew is a team, a Flow is the project plan that coordinates multiple teams.
Flow
├── Step 1: fetch_data (@start)
├── Step 2: validate_data (@listen to fetch_data)
├── Router: route_on_quality (@router, based on validation)
│ ├── "good" → Step 3a: run_analysis_crew
│ └── "poor" → Step 3b: run_cleanup_crew
└── Step 4: generate_report (@listen to both crews)
Setup
You need CrewAI 0.80+ for Flows to be stable. Install with the tools extra:
# uv is the fastest way to manage this
uv add "crewai[tools]>=0.80.0"
# Or pip
pip install "crewai[tools]>=0.80.0" --break-system-packages
# Verify
python -c "import crewai; print(crewai.__version__)"
Expected: 0.80.x or higher.
Set your LLM key in the environment:
export OPENAI_API_KEY="sk-..."
# Or use .env — crewai loads it automatically via python-dotenv
Solution
Step 1: Define Your Flow State
State is a Pydantic model that all steps read from and write to. It persists across the entire flow execution — no globals, no passing arguments manually.
# flow_state.py
from pydantic import BaseModel
class ResearchFlowState(BaseModel):
topic: str = ""
raw_data: str = ""
quality_score: float = 0.0
analysis_result: str = ""
final_report: str = ""
Use specific field names — you'll reference these directly in step methods. Keep the model flat; nested models work but add complexity without much benefit for most flows.
Step 2: Create the Flow Class
# research_flow.py
from crewai.flow.flow import Flow, listen, router, start
from flow_state import ResearchFlowState
class ResearchFlow(Flow[ResearchFlowState]):
"""Event-driven research pipeline with conditional branching."""
@start()
def fetch_data(self):
# @start marks this as the entry point — runs first, no trigger needed
print(f"Fetching data for topic: {self.state.topic}")
# Simulate a data fetch (replace with real tool call or API)
self.state.raw_data = f"Raw research data about {self.state.topic}. " * 20
self.state.quality_score = 0.85 # In real use: score this with an LLM or heuristic
@listen(fetch_data)
def validate_data(self):
# @listen(fetch_data) means this runs after fetch_data completes
print(f"Validating data — quality score: {self.state.quality_score}")
# No return value needed; state mutations are the output
The self.state object is shared. Any step can read or write it. CrewAI handles serialization between steps.
Step 3: Add a Router for Conditional Branching
Routers inspect state and return a string that maps to a downstream step.
@router(validate_data)
def route_on_quality(self):
# Return value must match a method name decorated with @listen(route_on_quality)
if self.state.quality_score >= 0.7:
return "high_quality"
return "low_quality"
@listen("high_quality")
def analyze_data(self):
print("Running full analysis on high-quality data")
self.state.analysis_result = f"Deep analysis complete: {self.state.raw_data[:100]}..."
@listen("low_quality")
def clean_and_retry(self):
print("Data quality too low — cleaning and re-scoring")
# Fix the data, bump the score, then set analysis result
self.state.raw_data = self.state.raw_data.strip()
self.state.quality_score = 0.75
self.state.analysis_result = "Cleaned analysis: limited depth due to source quality."
The router string "high_quality" and "low_quality" must exactly match what the @listen decorators receive. A typo here causes a silent skip — the step never runs.
Step 4: Embed a Crew Inside a Flow Step
This is the most powerful pattern. You run a full multi-agent Crew as one step in a larger Flow.
# analysis_crew.py
from crewai import Agent, Crew, Task, Process
def build_analysis_crew(data: str) -> Crew:
analyst = Agent(
role="Data Analyst",
goal="Extract key insights from research data",
backstory="Expert analyst with 10 years in research synthesis.",
verbose=False,
)
summarizer = Agent(
role="Report Writer",
goal="Write a clear, structured summary of analysis findings",
backstory="Technical writer who makes complex data readable.",
verbose=False,
)
analyze_task = Task(
description=f"Analyze this data and identify the top 3 insights:\n\n{data}",
expected_output="Three bullet points, each under 30 words.",
agent=analyst,
)
write_task = Task(
description="Write a 150-word report based on the analyst's findings.",
expected_output="A structured paragraph report.",
agent=summarizer,
context=[analyze_task], # summarizer sees analyst output
)
return Crew(
agents=[analyst, summarizer],
tasks=[analyze_task, write_task],
process=Process.sequential,
verbose=False,
)
Now call that crew inside the flow step:
# Back in research_flow.py
from analysis_crew import build_analysis_crew
@listen(analyze_data)
def run_analysis_crew(self):
print("Kicking off analysis crew...")
crew = build_analysis_crew(self.state.analysis_result)
result = crew.kickoff()
# crew.kickoff() returns a CrewOutput object; .raw gives you the string
self.state.final_report = result.raw
Step 5: Add the Final Step and Run the Flow
@listen(run_analysis_crew)
@listen(clean_and_retry) # also runs after the low-quality path
def generate_report(self):
# This step listens to BOTH branches — runs regardless of which path was taken
print("Generating final report...")
print("\n" + "="*50)
print("FINAL REPORT")
print("="*50)
print(self.state.final_report or self.state.analysis_result)
print("="*50 + "\n")
To listen to multiple upstream steps (fan-in), stack @listen decorators. The step runs once either upstream completes.
Now wire it all together and run:
# main.py
from research_flow import ResearchFlow
if __name__ == "__main__":
flow = ResearchFlow()
# Pass initial state values as kwargs to kickoff()
flow.kickoff(inputs={"topic": "quantum computing applications in drug discovery"})
python main.py
Expected output:
Fetching data for topic: quantum computing applications in drug discovery
Validating data — quality score: 0.85
Running full analysis on high-quality data
Kicking off analysis crew...
Generating final report...
==================================================
FINAL REPORT
==================================================
[LLM-generated report here]
==================================================
Complete Flow File
Here's the full research_flow.py assembled:
# research_flow.py
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
from analysis_crew import build_analysis_crew
class ResearchFlowState(BaseModel):
topic: str = ""
raw_data: str = ""
quality_score: float = 0.0
analysis_result: str = ""
final_report: str = ""
class ResearchFlow(Flow[ResearchFlowState]):
@start()
def fetch_data(self):
self.state.raw_data = f"Raw research data about {self.state.topic}. " * 20
self.state.quality_score = 0.85
@listen(fetch_data)
def validate_data(self):
print(f"Quality score: {self.state.quality_score}")
@router(validate_data)
def route_on_quality(self):
if self.state.quality_score >= 0.7:
return "high_quality"
return "low_quality"
@listen("high_quality")
def analyze_data(self):
self.state.analysis_result = f"Deep analysis: {self.state.raw_data[:100]}..."
@listen("low_quality")
def clean_and_retry(self):
self.state.raw_data = self.state.raw_data.strip()
self.state.analysis_result = "Cleaned analysis: limited depth."
@listen(analyze_data)
def run_analysis_crew(self):
crew = build_analysis_crew(self.state.analysis_result)
result = crew.kickoff()
self.state.final_report = result.raw
@listen(run_analysis_crew)
@listen(clean_and_retry)
def generate_report(self):
print("\nFINAL REPORT\n" + "="*40)
print(self.state.final_report or self.state.analysis_result)
Verification
# Plot the flow graph to verify wiring before running LLM calls
python -c "
from research_flow import ResearchFlow
flow = ResearchFlow()
flow.plot('flow_graph')
print('Graph saved to flow_graph.html')
"
Open flow_graph.html in a browser. You should see all nodes connected correctly — fetch → validate → router → two branches → crew → report.
If a step is missing from the graph: Check that the @listen decorator references the correct method name, not a string typo.
Common Errors
AttributeError: 'ResearchFlow' object has no attribute 'state'
You forgot the type parameter. Change class ResearchFlow(Flow): to class ResearchFlow(Flow[ResearchFlowState]):.
Step silently never runs
The string in @listen("some_string") doesn't match what your router returns. Python won't warn you — check capitalization and spelling.
ValidationError on flow kickoff
Your inputs={} dict contains a key not defined in your FlowState model. Add the field to the model or remove it from inputs.
Crew output is emptycrew.kickoff() returns a CrewOutput object. Use .raw for the string output, not str(result) — that gives you the object repr.
Production Considerations
Async flows: For I/O-heavy steps (API calls, database queries), use async def on step methods and run with await flow.kickoff_async(inputs={...}). This prevents blocking between steps.
Persistence: Flow state is in-memory by default. For long-running flows or crash recovery, serialize self.state.model_dump() to Redis or a database at the end of each step.
Observability: Connect LangSmith or Langfuse to your LLM calls inside the crew. The Flow itself doesn't emit traces, but each crew kickoff does if you set LANGCHAIN_TRACING_V2=true.
Parallelism: To run two steps simultaneously, decorate both with @listen(same_upstream_step). CrewAI will execute them in parallel automatically — no threading code needed.
What You Learned
@startmarks the entry point;@listen(method)chains steps by method reference@routerreturns a string that routes to matching@listen("string")stepsFlowState(a Pydantic model) is the single source of truth — no argument passing- Stacking
@listendecorators on one method creates a fan-in (runs when any upstream completes) crew.kickoff()returnsCrewOutput— use.rawfor the string result
When NOT to use Flows: If your task is a single crew with no branching, skip Flows entirely. The overhead isn't worth it for linear single-crew pipelines. Use Flows when you have conditional logic, multiple crews, or need inspectable state between steps.
Tested on CrewAI 0.80.0, Python 3.12, Ubuntu 24.04 and macOS Sequoia