CrewAI Flows: Event-Driven Agent Orchestration Tutorial 2026

Build event-driven AI pipelines with CrewAI Flows. Learn state management, conditional branching, and crew integration with working code examples.

Problem: Multi-Agent Pipelines Are Hard to Control

You've built a CrewAI crew. It runs — but you can't control what happens between steps. You can't branch on a condition, pause mid-pipeline, or trigger a crew only when a previous step succeeds. You end up duct-taping logic with raw Python around your crew calls.

CrewAI Flows solves this. It gives you a structured, event-driven execution engine that sits above individual crews and tasks.

You'll learn:

  • How Flows differ from Crews and when to use each
  • How to wire steps with @start, @listen, and @router decorators
  • How to manage shared state across steps with FlowState
  • How to embed a full Crew inside a Flow step

Time: 25 min | Difficulty: Intermediate


Why Flows Exist

A Crew is great at parallel collaboration — multiple agents working on a shared goal. But Crews don't give you sequential control. You can't easily say: "Run step A, check the result, then choose between step B or step C."

Flows are an orchestration layer. They let you:

  • Chain steps in explicit sequence
  • Branch conditionally with routers
  • Maintain typed state across all steps
  • Trigger Crews as sub-units inside a larger pipeline

Think of it this way: a Crew is a team, a Flow is the project plan that coordinates multiple teams.

Flow
 ├── Step 1: fetch_data         (@start)
 ├── Step 2: validate_data      (@listen to fetch_data)
 ├── Router: route_on_quality   (@router, based on validation)
 │    ├── "good"  → Step 3a: run_analysis_crew
 │    └── "poor"  → Step 3b: run_cleanup_crew
 └── Step 4: generate_report    (@listen to both crews)

Setup

You need CrewAI 0.80+ for Flows to be stable. Install with the tools extra:

# uv is the fastest way to manage this
uv add "crewai[tools]>=0.80.0"

# Or pip
pip install "crewai[tools]>=0.80.0" --break-system-packages

# Verify
python -c "import crewai; print(crewai.__version__)"

Expected: 0.80.x or higher.

Set your LLM key in the environment:

export OPENAI_API_KEY="sk-..."
# Or use .env — crewai loads it automatically via python-dotenv

Solution

Step 1: Define Your Flow State

State is a Pydantic model that all steps read from and write to. It persists across the entire flow execution — no globals, no passing arguments manually.

# flow_state.py
from pydantic import BaseModel

class ResearchFlowState(BaseModel):
    topic: str = ""
    raw_data: str = ""
    quality_score: float = 0.0
    analysis_result: str = ""
    final_report: str = ""

Use specific field names — you'll reference these directly in step methods. Keep the model flat; nested models work but add complexity without much benefit for most flows.


Step 2: Create the Flow Class

# research_flow.py
from crewai.flow.flow import Flow, listen, router, start
from flow_state import ResearchFlowState

class ResearchFlow(Flow[ResearchFlowState]):
    """Event-driven research pipeline with conditional branching."""

    @start()
    def fetch_data(self):
        # @start marks this as the entry point — runs first, no trigger needed
        print(f"Fetching data for topic: {self.state.topic}")

        # Simulate a data fetch (replace with real tool call or API)
        self.state.raw_data = f"Raw research data about {self.state.topic}. " * 20
        self.state.quality_score = 0.85  # In real use: score this with an LLM or heuristic

    @listen(fetch_data)
    def validate_data(self):
        # @listen(fetch_data) means this runs after fetch_data completes
        print(f"Validating data — quality score: {self.state.quality_score}")
        # No return value needed; state mutations are the output

The self.state object is shared. Any step can read or write it. CrewAI handles serialization between steps.


Step 3: Add a Router for Conditional Branching

Routers inspect state and return a string that maps to a downstream step.

    @router(validate_data)
    def route_on_quality(self):
        # Return value must match a method name decorated with @listen(route_on_quality)
        if self.state.quality_score >= 0.7:
            return "high_quality"
        return "low_quality"

    @listen("high_quality")
    def analyze_data(self):
        print("Running full analysis on high-quality data")
        self.state.analysis_result = f"Deep analysis complete: {self.state.raw_data[:100]}..."

    @listen("low_quality")
    def clean_and_retry(self):
        print("Data quality too low — cleaning and re-scoring")
        # Fix the data, bump the score, then set analysis result
        self.state.raw_data = self.state.raw_data.strip()
        self.state.quality_score = 0.75
        self.state.analysis_result = "Cleaned analysis: limited depth due to source quality."

The router string "high_quality" and "low_quality" must exactly match what the @listen decorators receive. A typo here causes a silent skip — the step never runs.


Step 4: Embed a Crew Inside a Flow Step

This is the most powerful pattern. You run a full multi-agent Crew as one step in a larger Flow.

# analysis_crew.py
from crewai import Agent, Crew, Task, Process

def build_analysis_crew(data: str) -> Crew:
    analyst = Agent(
        role="Data Analyst",
        goal="Extract key insights from research data",
        backstory="Expert analyst with 10 years in research synthesis.",
        verbose=False,
    )

    summarizer = Agent(
        role="Report Writer",
        goal="Write a clear, structured summary of analysis findings",
        backstory="Technical writer who makes complex data readable.",
        verbose=False,
    )

    analyze_task = Task(
        description=f"Analyze this data and identify the top 3 insights:\n\n{data}",
        expected_output="Three bullet points, each under 30 words.",
        agent=analyst,
    )

    write_task = Task(
        description="Write a 150-word report based on the analyst's findings.",
        expected_output="A structured paragraph report.",
        agent=summarizer,
        context=[analyze_task],  # summarizer sees analyst output
    )

    return Crew(
        agents=[analyst, summarizer],
        tasks=[analyze_task, write_task],
        process=Process.sequential,
        verbose=False,
    )

Now call that crew inside the flow step:

# Back in research_flow.py
from analysis_crew import build_analysis_crew

    @listen(analyze_data)
    def run_analysis_crew(self):
        print("Kicking off analysis crew...")
        crew = build_analysis_crew(self.state.analysis_result)
        result = crew.kickoff()
        # crew.kickoff() returns a CrewOutput object; .raw gives you the string
        self.state.final_report = result.raw

Step 5: Add the Final Step and Run the Flow

    @listen(run_analysis_crew)
    @listen(clean_and_retry)  # also runs after the low-quality path
    def generate_report(self):
        # This step listens to BOTH branches — runs regardless of which path was taken
        print("Generating final report...")
        print("\n" + "="*50)
        print("FINAL REPORT")
        print("="*50)
        print(self.state.final_report or self.state.analysis_result)
        print("="*50 + "\n")

To listen to multiple upstream steps (fan-in), stack @listen decorators. The step runs once either upstream completes.

Now wire it all together and run:

# main.py
from research_flow import ResearchFlow

if __name__ == "__main__":
    flow = ResearchFlow()
    # Pass initial state values as kwargs to kickoff()
    flow.kickoff(inputs={"topic": "quantum computing applications in drug discovery"})
python main.py

Expected output:

Fetching data for topic: quantum computing applications in drug discovery
Validating data — quality score: 0.85
Running full analysis on high-quality data
Kicking off analysis crew...
Generating final report...
==================================================
FINAL REPORT
==================================================
[LLM-generated report here]
==================================================

Complete Flow File

Here's the full research_flow.py assembled:

# research_flow.py
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
from analysis_crew import build_analysis_crew


class ResearchFlowState(BaseModel):
    topic: str = ""
    raw_data: str = ""
    quality_score: float = 0.0
    analysis_result: str = ""
    final_report: str = ""


class ResearchFlow(Flow[ResearchFlowState]):

    @start()
    def fetch_data(self):
        self.state.raw_data = f"Raw research data about {self.state.topic}. " * 20
        self.state.quality_score = 0.85

    @listen(fetch_data)
    def validate_data(self):
        print(f"Quality score: {self.state.quality_score}")

    @router(validate_data)
    def route_on_quality(self):
        if self.state.quality_score >= 0.7:
            return "high_quality"
        return "low_quality"

    @listen("high_quality")
    def analyze_data(self):
        self.state.analysis_result = f"Deep analysis: {self.state.raw_data[:100]}..."

    @listen("low_quality")
    def clean_and_retry(self):
        self.state.raw_data = self.state.raw_data.strip()
        self.state.analysis_result = "Cleaned analysis: limited depth."

    @listen(analyze_data)
    def run_analysis_crew(self):
        crew = build_analysis_crew(self.state.analysis_result)
        result = crew.kickoff()
        self.state.final_report = result.raw

    @listen(run_analysis_crew)
    @listen(clean_and_retry)
    def generate_report(self):
        print("\nFINAL REPORT\n" + "="*40)
        print(self.state.final_report or self.state.analysis_result)

Verification

# Plot the flow graph to verify wiring before running LLM calls
python -c "
from research_flow import ResearchFlow
flow = ResearchFlow()
flow.plot('flow_graph')
print('Graph saved to flow_graph.html')
"

Open flow_graph.html in a browser. You should see all nodes connected correctly — fetch → validate → router → two branches → crew → report.

If a step is missing from the graph: Check that the @listen decorator references the correct method name, not a string typo.


Common Errors

AttributeError: 'ResearchFlow' object has no attribute 'state'
You forgot the type parameter. Change class ResearchFlow(Flow): to class ResearchFlow(Flow[ResearchFlowState]):.

Step silently never runs
The string in @listen("some_string") doesn't match what your router returns. Python won't warn you — check capitalization and spelling.

ValidationError on flow kickoff
Your inputs={} dict contains a key not defined in your FlowState model. Add the field to the model or remove it from inputs.

Crew output is empty
crew.kickoff() returns a CrewOutput object. Use .raw for the string output, not str(result) — that gives you the object repr.


Production Considerations

Async flows: For I/O-heavy steps (API calls, database queries), use async def on step methods and run with await flow.kickoff_async(inputs={...}). This prevents blocking between steps.

Persistence: Flow state is in-memory by default. For long-running flows or crash recovery, serialize self.state.model_dump() to Redis or a database at the end of each step.

Observability: Connect LangSmith or Langfuse to your LLM calls inside the crew. The Flow itself doesn't emit traces, but each crew kickoff does if you set LANGCHAIN_TRACING_V2=true.

Parallelism: To run two steps simultaneously, decorate both with @listen(same_upstream_step). CrewAI will execute them in parallel automatically — no threading code needed.


What You Learned

  • @start marks the entry point; @listen(method) chains steps by method reference
  • @router returns a string that routes to matching @listen("string") steps
  • FlowState (a Pydantic model) is the single source of truth — no argument passing
  • Stacking @listen decorators on one method creates a fan-in (runs when any upstream completes)
  • crew.kickoff() returns CrewOutput — use .raw for the string result

When NOT to use Flows: If your task is a single crew with no branching, skip Flows entirely. The overhead isn't worth it for linear single-crew pipelines. Use Flows when you have conditional logic, multiple crews, or need inspectable state between steps.

Tested on CrewAI 0.80.0, Python 3.12, Ubuntu 24.04 and macOS Sequoia