Want to simplify creating and managing ai workflows? CrewAI flows offer structured patterns to orchestrate ai agent interactions. They allow developers to effectively combine coding tasks and teams, offering a powerful framework for developing ai automation. With Agentic Flows in CrewAI, you can design structured, event-driven workflows that streamline task coordination, manage state, and control execution within your ai applications.
What are crews?
crewAI teams enable the orchestration of ai agents for task automation. It facilitates fluid collaboration between agents to solve complex problems. So why does it “flow”, you ask? CrewAI flows are structured patterns to orchestrate the interactions of ai agents. They define how agents collaborate and communicate to accomplish tasks. These flows consist of a sequence of tasks where the result of one task can trigger the next, and the system provides flexible mechanisms for state management and conditional execution control.
What are flows?
Flows are event-driven, meaning they react to specific conditions or triggers. You can design workflows that dynamically adjust based on the results of executing different tasks, enabling seamless execution of complex ai processes.
Workflow control
In CrewAI Flows, developers can structure the flow of tasks and control how information passes between them. Tasks can be chained together, creating a logical flow of operations. This structure allows developers to define a sequence of operations where certain tasks are conditionally executed based on the outcome of previous tasks.
State Management
We can use Structured State Management, which uses a predefined schema, usually through Pydantic's BaseModel, to ensure that data passed between tasks follows a specific structure. It provides the benefits of type safety, validation, and easier management of complex data states.
Input flexibility
Flows can accept input to initialize or update their state at any point during execution. Depending on the workflow requirements, these inputs can be provided at the beginning, during, or after execution.
Event-driven architecture
CrewAI Flows allows workflows to dynamically adjust based on the results of different tasks. Tasks can listen to the results of previous steps, allowing for a reactive system where new tasks can be triggered based on the results of previous ones. The @listen() and @router() decorators allow this level of flexibility, allowing developers to conditionally and dynamically link tasks based on their results. Remember that the @start() decorator is used to mark the start point of a Flow.
Decorators and conditional logic | Description |
@hear() | Create listener methods triggered by specific events or task results. |
@router() | Enables conditional routing within the flow, allowing different execution paths based on the results of previous steps. Useful for managing results of success or failure. |
either_ | Triggers a listener when any of the specified methods emit output. Ideal to continue after completing any task. |
and_ | Ensures that a listener is triggered only when all specified methods emit results. Useful for waiting until multiple conditions or tasks are completed before continuing. |
Task routing
Flows also allow the use of routing to control how execution proceeds based on specific conditions. He @router() Decorator makes this easy by allowing methods to choose their execution path based on the results of previous tasks. For example, a method could check the result of a previous task and decide whether to continue down one path or another, depending on whether certain conditions are met.
Also Read: Creating Collaborative ai Agents with CrewAI
Flows in action
Let's build an agent system using CrewAI streams that recommends movies based on genre. This will be a simple agent system that will help us understand what works behind it.
Facilities
!pip install crewai -U
!pip install crewai-tools
Warning control
import warnings
warnings.filterwarnings('ignore')
Load environment variables
Gonna Snake and get your Serper API key for Google search. We will use OpenAI's 4o-mini model.
import os
os.environ("OPENAI_API_KEY") = ‘’
os.environ('OPENAI_MODEL_NAME') = 'gpt-4o-mini-2024-07-18'
os.environ("SERPER_API_KEY")=''
Import required modules
from crewai import Agent, Task, Crew
from crewai.flow.flow import listen, start, and_, or_, router
from crewai_tools import SerperDevTool
from crewai import Flow
from pydantic import BaseModel
define agent
To keep things simple, we will define a single agent that all tasks will use. This agent will have a Google search tool that all tasks can use.
movie_agent = Agent(
role="Recommend popular movie specific to the genre",
goal="Provide a list of movies based on user preferences",
backstory="You are a cinephile, "
"you recommend good movies to your friends, "
"the movies should be of the same genre",
tools=(SerperDevTool()),
verbose=True
)
Define tasks
action_task = Task(
name="ActionTask",
description="Recommends a popular action movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
comedy_task = Task(
name="ComedyTask",
description="Recommends a popular comedy movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
drama_task = Task(
name="DramaTask",
description="Recommends a popular drama movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
sci_fi_task = Task(
name="SciFiTask",
description="Recommends a sci-fi movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
Define teams for each gender.
action_crew = Crew(
agents=(movie_agent),
tasks=(action_task),
verbose=True,
)
comedy_crew = Crew(
agents=(movie_agent),
tasks=(comedy_task),
verbose=True
)
drama_crew = Crew(
agents=(movie_agent),
tasks=(drama_task),
verbose=True
)
sci_fi_crew = Crew(
agents=(movie_agent),
tasks=(sci_fi_task),
verbose=True
)
Define Genres and GenreState
GENRES = ("action", "comedy", "drama", "sci-fi")
class GenreState(BaseModel):
genre: str = ""
Define movie recommendation flow
We define a class that inherits from the Flow class and we can optionally use the state functionality to add or modify state attributes, here we have already defined GenreState with a genre attribute of type string. (Note that our pydantic model “GenreState” is passed in square brackets)
class MovieRecommendationFlow(Flow(GenreState)):
@start()
def input_genre(self):
genre = input("Enter a genre: ")
print(f"Genre input received: {genre}")
self.state.genre = genre
return genre
@router(input_genre)
def route_to_crew(self):
genre = self.state.genre
if genre not in GENRES:
raise ValueError(f"Invalid genre: {genre}")
if genre == "action":
return "action"
elif genre == "comedy":
return "comedy"
elif genre == "drama":
return "drama"
elif genre == "sci-fi":
return "sci-fi"
@listen("action")
def action_movies(self, genre):
recommendations = action_crew.kickoff()
return recommendations
@listen("comedy")
def comedy_movies(self, genre):
recommendations = comedy_crew.kickoff()
return recommendations
@listen("drama")
def drama_movies(self, genre):
recommendations = drama_crew.kickoff()
return recommendations
@listen("sci-fi")
def sci_fi_movies(self, genre):
recommendations = sci_fi_crew.kickoff()
return recommendations
@listen(or_("action_movies", "comedy_movies", "drama_movies", "sci_fi_movies"))
def finalize_recommendation(self, recommendations):
print("Final movie recommendations:")
return recommendations
We should specify the execution flow by @listen decorator by passing the above method and the output of the above method can be passed as argument in the next method. (Note: the flow must start with the @start decorator)
The router is used here to decide which method should be executed based on the output of the method inside the @router decorator.
or_ specifies that the method should be executed as soon as any of the or_( ) method's methods return output.
(Note: use and_( ) if you want the method to wait until all output is received.)
Trace the flow
flow = MovieRecommendationFlow()
flow.plot()
Show flowchart
from IPython.core.display import display, HTML
with open('/content/crewai_flow.html', 'r') as f:
html_content = f.read()
display(HTML(html_content))
This diagram should give you a better understanding of what is happening. Based on the “input genre”, only one of the four tasks is executed, and when one of them returns a result, the “Completion Recommendation” is triggered.
Start the flow
recommendations = await flow.kickoff_async()
I entered science fiction as input when asked.
After all the Flow running, this is the result I got for the sci-fi movie recommendations I wanted.
Also read: CrewAI multi-agent system to write articles from YouTube videos
Conclusion
In this article, we have explored how ai Crew Event-driven workflows simplify the creation and management of ai task orchestration. By leveraging structured flows, developers can design dynamic and efficient ai systems that enable seamless coordination between tasks and agents. With powerful tools like @listen(), @router() and state management mechanisms, CrewAI enables flexible and adaptable workflows. The practical example of a movie recommendation system highlights how Agentic Flows in CrewAI can be used effectively to personalize ai applications, with tasks that react to user input and dynamically adjust to conditions.
Also, if you are looking for an online ai agent course, explore: Pioneer ai Agent Program.
Frequently asked question
Answer. Use the kickoff() method with an input parameter: flow.kickoff(inputs={“counter”: 10}). Here, “counter” can be a variable used within task or agent definitions.
Answer. @start() marks methods as flow start points that run in parallel. @listen() marks methods that are executed when the specified tasks are completed.
Answer. Use flow.plot() or run crewai's flow plot command to generate an interactive HTML visualization.
Answer. Yes, CrewAI flows support human feedback.