Plantilla de Cheat Multi-Agent CrewAI
Sinopsis
CrewAI es un revolucionario marco de orquestación multiagente de código abierto que transforma la forma en que los desarrolladores construyen e implementan aplicaciones impulsadas por IA. Creado por João Moura, este marco basado en Python permite a múltiples agentes de IA trabajar juntos como unidad cohesiva, cada uno asumiendo roles específicos y compartiendo responsabilidades para realizar tareas complejas que serían difíciles para un solo agente para manejar solo.
Lo que distingue a CrewAI es su capacidad de orquestar sofisticados sistemas multiagentes donde los agentes pueden delegar sus tareas autónomamente, colaborar en la solución de problemas y aprovechar herramientas y capacidades especializadas. El marco proporciona simplicidad de alto nivel para el desarrollo rápido y control preciso de bajo nivel para escenarios complejos, lo que lo hace ideal para crear agentes autónomos de inteligencia artificial adaptados a cualquier requisito técnico o de negocios.
CrewAI aborda la creciente necesidad de sistemas de IA que puedan manejar desafíos multifacéticos derribandolos en componentes manejables, asignando agentes especializados a cada componente y coordinando sus esfuerzos para lograr resultados superiores en comparación con los enfoques tradicionales de un solo agente.
Conceptos básicos
Agentes
Los agentes son los pilares fundamentales de los sistemas CrewAI. Cada agente está diseñado con funciones, metas y capacidades específicas, funcionando como entidades autónomas que pueden razonar, planificar y ejecutar tareas dentro de su dominio de experiencia.
Crews
Una tripulación es una colección de agentes trabajando juntos hacia un objetivo común. Crews define la estructura y flujo de trabajo de la colaboración multiagente, estableciendo cómo interactúan los agentes, delegar tareas y compartir información.
Tareas
Las tareas representan objetivos o actividades específicos que deben completarse. Pueden ser asignados a agentes individuales o distribuidos a través de múltiples agentes dentro de una tripulación, dependiendo de la complejidad y requisitos.
Herramientas
Las herramientas amplían las capacidades de los agentes proporcionando acceso a servicios externos, API, bases de datos o funciones especializadas. Los agentes pueden usar herramientas para realizar acciones más allá de sus capacidades de modelo de lenguaje básico.
Instalación y configuración
Instalación básica
# Install CrewAI using pip
pip install crewai
# Install with additional tools
pip install 'crewai[tools]'
# Install development version
pip install git+https://github.com/crewAIInc/crewAI.git
Environment Setup
import os
from crewai import Agent, Task, Crew, Process
# Set up API keys for LLM providers
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
# or
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-api-key"
# or other supported providers
Estructura del proyecto
my_crew_project/
├── agents/
│ ├── __init__.py
│ ├── researcher.py
│ └── writer.py
├── tasks/
│ ├── __init__.py
│ ├── research_tasks.py
│ └── writing_tasks.py
├── tools/
│ ├── __init__.py
│ └── custom_tools.py
├── crews/
│ ├── __init__.py
│ └── content_crew.py
└── main.py
Configuración del agente
Creación de agentes básicos
from crewai import Agent
# Create a basic agent
researcher = Agent(
role='Research Specialist',
goal='Conduct thorough research on given topics',
backstory="""You are an experienced researcher with expertise in
gathering, analyzing, and synthesizing information from multiple sources.
You have a keen eye for detail and can identify reliable sources.""",
verbose=True,
allow_delegation=False
)
Configuración avanzada del agente
from crewai import Agent
from crewai_tools import SerperDevTool, WebsiteSearchTool
# Create an agent with tools and advanced settings
research_agent = Agent(
role='Senior Research Analyst',
goal='Provide comprehensive analysis and insights on market trends',
backstory="""You are a senior research analyst with 10+ years of experience
in market research and competitive analysis. You excel at identifying patterns,
trends, and actionable insights from complex data sets.""",
tools=[SerperDevTool(), WebsiteSearchTool()],
verbose=True,
allow_delegation=True,
max_iter=5,
memory=True,
step_callback=lambda step: print(f"Agent step: \\\\{step\\\\}"),
system_template="""You are \\\\{role\\\\}. \\\\{backstory\\\\}
Your goal is: \\\\{goal\\\\}
Always provide detailed analysis with supporting evidence."""
)
Agente con LLM personalizada
from langchain.llms import OpenAI
from crewai import Agent
# Use custom LLM configuration
custom_llm = OpenAI(temperature=0.7, model_name="gpt-4")
analyst = Agent(
role='Data Analyst',
goal='Analyze data and provide statistical insights',
backstory='Expert in statistical analysis and data interpretation',
llm=custom_llm,
verbose=True
)
Multimodal Agent
from crewai import Agent
# Agent with multimodal capabilities
visual_analyst = Agent(
role='Visual Content Analyst',
goal='Analyze images and visual content for insights',
backstory='Specialist in visual content analysis and interpretation',
multimodal=True, # Enable multimodal capabilities
tools=[image_analysis_tool],
verbose=True
)
Definición de tareas y gestión
Creación de tareas básicas
from crewai import Task
# Define a simple task
research_task = Task(
description="""Conduct comprehensive research on artificial intelligence
trends in 2024. Focus on:
1. Emerging AI technologies
2. Market adoption rates
3. Key industry players
4. Future predictions
Provide a detailed report with sources and citations.""",
agent=researcher,
expected_output="A comprehensive research report with citations"
)
Configuración de tareas avanzada
from crewai import Task
# Task with dependencies and callbacks
analysis_task = Task(
description="""Analyze the research findings and create strategic
recommendations for AI adoption in enterprise environments.""",
agent=analyst,
expected_output="Strategic recommendations document with actionable insights",
context=[research_task], # Depends on research_task completion
callback=lambda output: save_to_database(output),
async_execution=False,
output_file="analysis_report.md"
)
La tarea con la fijación de productos personalizados
from crewai import Task
from pydantic import BaseModel
from typing import List
class ResearchOutput(BaseModel):
title: str
summary: str
key_findings: List[str]
sources: List[str]
confidence_score: float
structured_task = Task(
description="Research AI market trends and provide structured output",
agent=researcher,
expected_output="Structured research findings",
output_pydantic=ResearchOutput
)
Ejecución de tareas condicional
from crewai import Task
def should_execute_task(context):
# Custom logic to determine if task should execute
return len(context.get('findings', [])) > 5
conditional_task = Task(
description="Perform detailed analysis if sufficient data is available",
agent=analyst,
expected_output="Detailed analysis report",
condition=should_execute_task
)
Crew Orchestration
Basic Crew Setup
from crewai import Crew, Process
# Create a basic crew
content_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=2,
process=Process.sequential
)
# Execute the crew
result = content_crew.kickoff()
print(result)
Configuración avanzada de la tripulación
from crewai import Crew, Process
from crewai.memory import LongTermMemory
# Advanced crew with memory and custom settings
advanced_crew = Crew(
agents=[researcher, analyst, writer, reviewer],
tasks=[research_task, analysis_task, writing_task, review_task],
process=Process.hierarchical,
memory=LongTermMemory(),
verbose=2,
manager_llm=manager_llm,
function_calling_llm=function_llm,
max_rpm=10,
share_crew=True,
step_callback=crew_step_callback,
task_callback=crew_task_callback
)
Proceso jerárquico
from crewai import Crew, Process, Agent
# Manager agent for hierarchical process
manager = Agent(
role='Project Manager',
goal='Coordinate team activities and ensure quality deliverables',
backstory='Experienced project manager with strong leadership skills',
allow_delegation=True
)
hierarchical_crew = Crew(
agents=[manager, researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.hierarchical,
manager_agent=manager,
verbose=2
)
Ejecución de tareas paralelas
from crewai import Crew, Process
# Crew with parallel task execution
parallel_crew = Crew(
agents=[researcher1, researcher2, researcher3],
tasks=[task1, task2, task3],
process=Process.sequential, # Overall sequential, but tasks can run in parallel
max_execution_time=3600, # 1 hour timeout
verbose=2
)
# Execute with parallel capabilities
result = parallel_crew.kickoff(inputs=\\\\{
'topic': 'AI in Healthcare',
'deadline': '2024-12-31'
\\\\})
Integración de herramientas
Herramientas incorporadas
from crewai_tools import (
SerperDevTool,
WebsiteSearchTool,
FileReadTool,
DirectoryReadTool,
CodeDocsSearchTool,
YoutubeVideoSearchTool
)
# Configure built-in tools
search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()
file_tool = FileReadTool()
code_tool = CodeDocsSearchTool()
# Agent with multiple tools
multi_tool_agent = Agent(
role='Research Assistant',
goal='Gather information from multiple sources',
backstory='Versatile researcher with access to various information sources',
tools=[search_tool, web_tool, file_tool, code_tool],
verbose=True
)
Desarrollo de herramientas personalizadas
from crewai_tools import BaseTool
from typing import Type
from pydantic import BaseModel, Field
class DatabaseQueryInput(BaseModel):
query: str = Field(description="SQL query to execute")
database: str = Field(description="Database name")
class DatabaseQueryTool(BaseTool):
name: str = "Database Query Tool"
description: str = "Execute SQL queries against specified databases"
args_schema: Type[BaseModel] = DatabaseQueryInput
def _run(self, query: str, database: str) -> str:
# Implement database query logic
try:
# Connect to database and execute query
result = execute_database_query(database, query)
return f"Query executed successfully: \\\\{result\\\\}"
except Exception as e:
return f"Query failed: \\\\{str(e)\\\\}"
# Use custom tool
db_tool = DatabaseQueryTool()
database_agent = Agent(
role='Database Analyst',
goal='Query and analyze database information',
backstory='Expert in database operations and SQL',
tools=[db_tool],
verbose=True
)
API Integration Tool
from crewai_tools import BaseTool
import requests
class APIIntegrationTool(BaseTool):
name: str = "API Integration Tool"
description: str = "Make HTTP requests to external APIs"
def _run(self, endpoint: str, method: str = "GET", data: dict = None) -> str:
try:
if method.upper() == "GET":
response = requests.get(endpoint)
elif method.upper() == "POST":
response = requests.post(endpoint, json=data)
return response.json()
except Exception as e:
return f"API request failed: \\\\{str(e)\\\\}"
# Agent with API capabilities
api_agent = Agent(
role='API Integration Specialist',
goal='Interact with external services via APIs',
backstory='Expert in API integration and data retrieval',
tools=[APIIntegrationTool()],
verbose=True
)
Memory and Context Management
Memoria a largo plazo
from crewai.memory import LongTermMemory
from crewai import Crew
# Crew with persistent memory
memory_crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
memory=LongTermMemory(),
verbose=2
)
# Memory persists across executions
result1 = memory_crew.kickoff(inputs=\\\\{'topic': 'AI Ethics'\\\\})
result2 = memory_crew.kickoff(inputs=\\\\{'topic': 'AI Regulation'\\\\})
Compartir contexto
from crewai import Task, Agent
# Tasks that share context
context_task1 = Task(
description="Research market trends",
agent=researcher,
expected_output="Market trend analysis"
)
context_task2 = Task(
description="Analyze the market trends and provide recommendations",
agent=analyst,
expected_output="Strategic recommendations",
context=[context_task1] # Uses output from context_task1
)
Aplicación de memoria personalizada
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
class CustomMemory(LongTermMemory):
def __init__(self, storage_path: str = "./custom_memory"):
super().__init__(storage_path=storage_path)
self.custom_entities = \\\\{\\\\}
def save_entity(self, entity_name: str, entity_data: dict):
self.custom_entities[entity_name] = entity_data
# Implement custom storage logic
def retrieve_entity(self, entity_name: str) -> dict:
return self.custom_entities.get(entity_name, \\\\{\\\\})
# Use custom memory
custom_memory = CustomMemory("./project_memory")
crew_with_custom_memory = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
memory=custom_memory
)
Características avanzadas
Agent Delegation
from crewai import Agent
# Senior agent that can delegate
senior_researcher = Agent(
role='Senior Research Director',
goal='Oversee research projects and delegate tasks',
backstory='Experienced research director with team management skills',
allow_delegation=True,
max_delegation=3,
verbose=True
)
# Junior agents that can receive delegated tasks
junior_researcher1 = Agent(
role='Junior Researcher - Technology',
goal='Research technology trends and innovations',
backstory='Specialized in technology research',
allow_delegation=False
)
junior_researcher2 = Agent(
role='Junior Researcher - Market Analysis',
goal='Analyze market conditions and competitive landscape',
backstory='Specialized in market research and analysis',
allow_delegation=False
)
# Crew with delegation hierarchy
delegation_crew = Crew(
agents=[senior_researcher, junior_researcher1, junior_researcher2],
tasks=[complex_research_task],
process=Process.hierarchical,
verbose=2
)
Razonamiento y planificación
from crewai import Agent
# Agent with enhanced reasoning capabilities
reasoning_agent = Agent(
role='Strategic Planner',
goal='Develop comprehensive strategies with detailed reasoning',
backstory='Expert strategic planner with strong analytical skills',
reasoning=True, # Enable reasoning capabilities
planning=True, # Enable planning capabilities
verbose=True,
max_iter=10
)
# Task that requires complex reasoning
strategic_task = Task(
description="""Develop a comprehensive 5-year strategic plan for AI adoption
in the healthcare industry. Consider:
1. Current market conditions
2. Regulatory environment
3. Technology readiness
4. Competitive landscape
5. Implementation challenges
Provide detailed reasoning for each recommendation.""",
agent=reasoning_agent,
expected_output="Comprehensive strategic plan with detailed reasoning"
)
Callback and Monitoring
from crewai import Crew, Agent, Task
def agent_step_callback(agent_output):
print(f"Agent \\\\{agent_output.agent\\\\} completed step: \\\\{agent_output.step\\\\}")
# Log to monitoring system
log_agent_activity(agent_output)
def task_completion_callback(task_output):
print(f"Task completed: \\\\{task_output.description\\\\}")
# Send notification or update dashboard
notify_task_completion(task_output)
def crew_step_callback(crew_output):
print(f"Crew step completed: \\\\{crew_output\\\\}")
# Update progress tracking
update_progress(crew_output)
# Crew with comprehensive monitoring
monitored_crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
step_callback=crew_step_callback,
task_callback=task_completion_callback,
verbose=2
)
# Agents with individual monitoring
monitored_agent = Agent(
role='Monitored Researcher',
goal='Conduct research with detailed monitoring',
backstory='Researcher with comprehensive activity tracking',
step_callback=agent_step_callback,
verbose=True
)
Manejo de errores y resiliencia
Retry Logic
from crewai import Task, Agent
import time
def retry_callback(attempt, error):
print(f"Task failed on attempt \\\\{attempt\\\\}: \\\\{error\\\\}")
time.sleep(2 ** attempt) # Exponential backoff
resilient_task = Task(
description="Perform web research with retry logic",
agent=researcher,
expected_output="Research findings",
max_retries=3,
retry_callback=retry_callback
)
Recuperación de errores
from crewai import Crew, Process
def error_handler(error, context):
print(f"Error occurred: \\\\{error\\\\}")
# Implement recovery logic
if "rate_limit" in str(error).lower():
time.sleep(60) # Wait for rate limit reset
return True # Retry
return False # Don't retry
error_resilient_crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
error_handler=error_handler,
max_retries=3,
verbose=2
)
Agentes de Fallback
from crewai import Agent, Task, Crew
# Primary agent
primary_researcher = Agent(
role='Primary Researcher',
goal='Conduct comprehensive research',
backstory='Expert researcher with specialized tools',
tools=[advanced_search_tool, database_tool]
)
# Fallback agent with basic capabilities
fallback_researcher = Agent(
role='Backup Researcher',
goal='Conduct basic research when primary agent fails',
backstory='Reliable researcher with basic tools',
tools=[basic_search_tool]
)
# Task with fallback logic
research_with_fallback = Task(
description="Conduct research with fallback support",
agent=primary_researcher,
fallback_agent=fallback_researcher,
expected_output="Research findings"
)
Optimización del rendimiento
Ejecución paralela
from crewai import Crew, Process
import asyncio
# Async crew execution
async def run_crew_async():
crew = Crew(
agents=[researcher1, researcher2, researcher3],
tasks=[task1, task2, task3],
process=Process.sequential,
verbose=2
)
result = await crew.kickoff_async(inputs=\\\\{'topic': 'AI Trends'\\\\})
return result
# Run multiple crews in parallel
async def run_multiple_crews():
crews = [create_crew(topic) for topic in ['AI', 'ML', 'NLP']]
results = await asyncio.gather(*[crew.kickoff_async() for crew in crews])
return results
Gestión de los recursos
from crewai import Crew
import threading
class ResourceManager:
def __init__(self, max_concurrent_agents=5):
self.semaphore = threading.Semaphore(max_concurrent_agents)
self.active_agents = 0
def acquire_agent_slot(self):
self.semaphore.acquire()
self.active_agents += 1
def release_agent_slot(self):
self.semaphore.release()
self.active_agents -= 1
resource_manager = ResourceManager(max_concurrent_agents=3)
# Crew with resource management
resource_managed_crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
resource_manager=resource_manager,
verbose=2
)
Caching y Optimización
from crewai import Agent, Task
from functools import lru_cache
# Agent with caching capabilities
class CachedAgent(Agent):
@lru_cache(maxsize=100)
def cached_execution(self, task_description):
return super().execute_task(task_description)
cached_researcher = CachedAgent(
role='Cached Researcher',
goal='Perform research with caching',
backstory='Efficient researcher with caching capabilities'
)
# Task with caching
cached_task = Task(
description="Research AI trends (cached)",
agent=cached_researcher,
expected_output="Cached research results",
cache_results=True
)
Patrones de integración
Aplicación de la Web de Flask
from flask import Flask, request, jsonify
from crewai import Crew, Agent, Task
app = Flask(__name__)
# Initialize crew components
researcher = Agent(
role='API Researcher',
goal='Research topics via web API',
backstory='Researcher accessible via web API'
)
@app.route('/research', methods=['POST'])
def research_endpoint():
data = request.json
topic = data.get('topic')
# Create dynamic task
research_task = Task(
description=f"Research the topic: \\\\{topic\\\\}",
agent=researcher,
expected_output="Research findings"
)
# Execute crew
crew = Crew(agents=[researcher], tasks=[research_task])
result = crew.kickoff()
return jsonify(\\\\{'result': result\\\\})
if __name__ == '__main__':
app.run(debug=True)
Celery Background Tareas
from celery import Celery
from crewai import Crew, Agent, Task
app = Celery('crewai_tasks')
@app.task
def execute_crew_task(topic, agents_config, tasks_config):
# Reconstruct agents and tasks from config
agents = [create_agent_from_config(config) for config in agents_config]
tasks = [create_task_from_config(config) for config in tasks_config]
# Execute crew
crew = Crew(agents=agents, tasks=tasks)
result = crew.kickoff(inputs=\\\\{'topic': topic\\\\})
return result
# Usage
result = execute_crew_task.delay(
topic="AI in Healthcare",
agents_config=[researcher_config, analyst_config],
tasks_config=[research_config, analysis_config]
)
Integración de bases de datos
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from crewai import Crew, Agent, Task
import datetime
Base = declarative_base()
class CrewExecution(Base):
__tablename__ = 'crew_executions'
id = Column(Integer, primary_key=True)
crew_name = Column(String(100))
input_data = Column(Text)
output_data = Column(Text)
execution_time = Column(DateTime, default=datetime.datetime.utcnow)
status = Column(String(50))
# Database-integrated crew
class DatabaseCrew(Crew):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.engine = create_engine('sqlite:///crew_executions.db')
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def kickoff(self, inputs=None):
# Log execution start
execution = CrewExecution(
crew_name=self.__class__.__name__,
input_data=str(inputs),
status='running'
)
self.session.add(execution)
self.session.commit()
try:
result = super().kickoff(inputs)
execution.output_data = str(result)
execution.status = 'completed'
except Exception as e:
execution.status = f'failed: \\\\{str(e)\\\\}'
raise
finally:
self.session.commit()
return result
Buenas prácticas
Principios de diseño del agente
- Single Responsibility: Cada agente debe tener un papel claro y centrado
- ** Objetivos globales**: Definir objetivos específicos y mensurables para cada agente
- Rich Backstories: Proporcionar contexto detallado para mejorar el comportamiento del agente
- Apropiados Herramientas: Agentes de equidad con herramientas relevantes para sus funciones
- ** Estrategia de delegación**: Use delegation thoughtfully to avoid complex
Organización de tareas
- ** Descripción completa**: Escribir descripciones de tareas detalladas e inequívocas
- Expected Outputs: Especifique exactamente qué formato de salida se espera
- ** Dependencias de contexto**: Definir claramente las dependencias de tareas y compartir contextos
- Manejo del espejo: Implementar mecanismos robustos de manipulación y recuperación de errores
- ** Vigilancia de la ejecución**: Seguimiento de la ejecución de tareas y métricas de rendimiento
Crew Orchestration
- ** Selección de Procesos**: Elija el tipo de proceso apropiado (secuencial, jerárquico, paralelo)
- ** Gestión de memoria**: Utilice la memoria estratégicamente para la retención de contexto
- ** Limitaciones de recursos**: establecer límites adecuados para el tiempo de ejecución y las iteraciones
- Monitoring: Implementación de registros completos y vigilancia
- Testing: Probando el comportamiento de la tripulación con varias entradas
Optimización del rendimiento
- Especialización urgente: Crear agentes especializados para dominios específicos
- ** Optimización total**: Utiliza herramientas eficientes y minimiza las llamadas externas de API
- Caching: Implementar caching para datos a los que se accede con frecuencia
- ** Ejecución paralela**: Procesamiento paralelo en caso necesario
- Resource Management: Monitor and manage computational resources
Solución de problemas
Cuestiones comunes
Agente no responde
# Debug agent configuration
agent = Agent(
role='Debug Agent',
goal='Test agent responsiveness',
backstory='Agent for debugging purposes',
verbose=True, # Enable verbose output
max_iter=1, # Limit iterations for testing
allow_delegation=False
)
# Test with simple task
test_task = Task(
description="Say hello and confirm you are working",
agent=agent,
expected_output="Simple greeting message"
)
Cuestiones de memoria
# Clear memory if needed
crew.memory.clear()
# Check memory usage
print(f"Memory entities: \\\\{len(crew.memory.entities)\\\\}")
print(f"Memory size: \\\\{crew.memory.get_memory_size()\\\\}")
Problemas de integración de herramientas
# Test tool functionality
tool = SerperDevTool()
try:
result = tool._run("test query")
print(f"Tool working: \\\\{result\\\\}")
except Exception as e:
print(f"Tool error: \\\\{e\\\\}")
Cuestiones de ejecución
# Monitor execution time
import time
start_time = time.time()
result = crew.kickoff()
execution_time = time.time() - start_time
print(f"Execution time: \\\\{execution_time\\\\} seconds")
# Profile memory usage
import tracemalloc
tracemalloc.start()
result = crew.kickoff()
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: \\\\{current / 1024 / 1024:.2f\\\\} MB")
print(f"Peak memory usage: \\\\{peak / 1024 / 1024:.2f\\\\} MB")
-...
*Esta completa hoja de trampolín CrewAI proporciona todo lo necesario para construir sofisticados sistemas de inteligencia artificial multiagente. Desde la configuración básica hasta patrones avanzados de orquestación, utilice estos ejemplos y mejores prácticas para crear potentes aplicaciones de IA que apalanquen el poder colaborativo de múltiples agentes especializados. *