
CrewAI Flows: Event-Driven Multi-Agent Pipelines
Summary
Build event-driven multi-agent pipelines in CrewAI Flows: @start, @listen, @router, state.
Why CrewAI Flows, why now
Single-crew CrewAI scripts are great for a linear demo: researcher → analyst → writer and you are done. The moment you need conditional branching, parallel work that must converge, or human approval between steps, that simple kickoff() stops being enough. CrewAI Flows (stable in CrewAI 0.95+) is the event-driven layer that sits above crews. Decorators (@start, @listen, @router) wire methods into a directed graph, a Pydantic model holds typed state across steps, and the runtime fans out, fans in, and routes for you.
This is a hands-on guide. We will build a 4-step research pipeline that runs two crews in parallel, joins them with and_, routes on a quality score, and either retries or publishes. By the end you will know exactly which decorator to reach for and the gotchas that bite teams shipping Flows to production.
Prerequisites
- Python 3.10+ and an OpenAI or Anthropic API key (Flows is LLM-agnostic via litellm).
- Comfort with Python decorators and basic Pydantic models.
pip install 'crewai>=0.95' 'crewai-tools' python-dotenv
Step 1 — Define typed state with Pydantic
Flows are parameterized with a Pydantic model so every method gets autocomplete and validation on self.state. Untyped dict state works but you will regret it the first time a typo silently overwrites a field.
from pydantic import BaseModel
from crewai.flow.flow import Flow, start, listen, router, and_
class ResearchState(BaseModel):
topic: str = ""
sources: list[str] = []
summary: str = ""
draft: str = ""
quality: float = 0.0
attempts: int = 0
Step 2 — Mark the entry point with @start
Every Flow needs at least one @start() method. It runs as soon as you call flow.kickoff(). Multiple @start methods run in parallel — handy when you need to seed state from two independent sources.
class ResearchFlow(Flow[ResearchState]):
@start()
def seed_topic(self):
self.state.topic = "WebGPU vs WebGL in 2026"
return self.state.topic
Step 3 — Run two crews in parallel and join with and_
Each @listen(seed_topic) below fires concurrently the moment seed_topic finishes. To wait for all of them before moving on, wrap the dependencies in and_(...). Use or_(...) if any one is enough.
from crewai import Agent, Task, Crew
@listen(seed_topic)
def research_web(self, topic):
researcher = Agent(role="Web researcher", goal="Find 5 fresh sources",
backstory="Skims arXiv, GitHub, vendor blogs.")
task = Task(description=f"List 5 URLs about: {topic}", agent=researcher,
expected_output="Bullet list of URLs")
out = Crew(agents=[researcher], tasks=[task]).kickoff()
self.state.sources = str(out).splitlines()
return "web_done"
@listen(seed_topic)
def research_papers(self, topic):
# imagine an arXiv tool here
self.state.sources += ["arxiv:2503.12345", "arxiv:2504.99887"]
return "papers_done"
@listen(and_(research_web, research_papers))
def synthesize(self, _):
analyst = Agent(role="Analyst", goal="One-paragraph synthesis", backstory="Skeptical engineer.")
task = Task(description=f"Summarize {self.state.sources}",
agent=analyst, expected_output="<= 120 words")
self.state.summary = str(Crew(agents=[analyst], tasks=[task]).kickoff())
return self.state.summary
Step 4 — Branch with @router on a quality score
@router returns a string label and the runtime fires whichever @listen(label) matches. This is how you implement retry loops, human-in-the-loop gates, or publish/discard branches without smearing if/else across your code.
@router(synthesize)
def gate(self, summary):
# cheap heuristic; in real code, call an LLM judge
self.state.quality = min(1.0, len(summary) / 600)
self.state.attempts += 1
if self.state.quality >= 0.6:
return "publish"
if self.state.attempts >= 3:
return "give_up"
return "retry"
@listen("retry")
def retry_research(self):
# mutate state and re-enter the graph by calling another listener
self.state.sources = []
return self.research_web(self.state.topic)
@listen("publish")
def publish(self, _=None):
self.state.draft = f"# {self.state.topic}\n\n{self.state.summary}"
print("PUBLISHED:", self.state.draft[:80])
@listen("give_up")
def give_up(self, _=None):
print("Quality never crossed threshold after", self.state.attempts, "tries")
Step 5 — Run it and inspect state
if __name__ == "__main__":
flow = ResearchFlow()
flow.kickoff()
print(flow.state.model_dump_json(indent=2))
# bonus: render the graph
flow.plot("research_flow") # writes research_flow.html
Example output (trimmed):
PUBLISHED: # WebGPU vs WebGL in 2026\n\nWebGPU now ships in...
{
"topic": "WebGPU vs WebGL in 2026",
"sources": ["https://...", "arxiv:2503.12345", ...],
"summary": "WebGPU is the default for new...",
"draft": "# WebGPU vs WebGL in 2026\\n\\nWebGPU is...",
"quality": 0.74,
"attempts": 1
}
Common pitfalls
- Forgetting
and_— two independent@listenmethods fire on the first dependency completing. If you need both, wrap them. - Returning nothing from a
@router— the chain dies silently. Always return a string label, even for the default branch. - Mutating shared lists from parallel
@listenmethods can race. Useself.state.sources += [...]on a fresh list per branch, or guard with a lock. - Re-entry loops —
@listen("retry")calling another listener directly bypasses the runtime. Prefer routing back into the graph by returning a label that another method listens to. - Long-running steps — Flows have no built-in timeout. Wrap external calls in
asyncio.wait_foror run them under your own task supervisor.
Quick reference
| Decorator | When it fires | Returns |
|---|---|---|
| @start() | On flow.kickoff() | Value passed to listeners |
| @listen(fn) | When fn finishes | Value passed to next listeners |
| @listen(and_(a,b)) | When BOTH a and b finish | Joined value |
| @listen(or_(a,b)) | When EITHER a or b finishes first | First value |
| @router(fn) | When fn finishes | String label that other listeners match |
Next steps
- Add
flow.plot()to CI to catch dead branches before review. - Replace the heuristic gate with an LLM-as-judge crew that scores groundedness.
- Persist
self.stateto Postgres between steps for crash recovery. - Wire approval emails into a
@listen("publish")step before the final write.
Comments
Be the first to comment