CrewAI マルチエージェントフレームワーク チートシート
概要
CrewAIは、開発者がAI搭載アプリケーションを構築および展開する方法を変革する革新的なオープンソースのマルチエージェント統合フレームワークです。João Mouraによって作成されたこのPythonベースのフレームワークは、複数のAIエージェントが結束したユニットとして機能し、それぞれが特定の役割を担い、責任を共有して、単一のエージェントでは処理が困難な複雑なタスクを達成することを可能にします。
CrewAIの特徴は、エージェントが自律的にタスクを委任し、問題解決に協力し、専門的なツールと機能を活用できる洗練されたマルチエージェントシステムを統合できることです。このフレームワークは、迅速な開発のための高レベルの簡潔さと、複雑なシナリオのための正確な低レベルの制御の両方を提供し、あらゆるビジネスや技術要件に合わせた自律型AIエージェントの作成に理想的です。
CrewAIは、多面的な課題を管理できるAIシステムへの高まるニーズに対応し、それらを管理可能なコンポーネントに分解し、各コンポーネントに専門エージェントを割り当て、従来の単一エージェントアプローチと比較して優れた結果を達成するためにその取り組みを調整します。
コアコンセプト
エージェント
エージェントは、CrewAIシステムの基本的な構成要素です。各エージェントは、特定の役割、目標、能力を備えて設計され、専門分野内でタスクを推論、計画、実行できる自律的なエンティティとして機能します。
クルー
クルーは、共通の目的に向けて協力する複数のエージェントのコレクションです。クルーは、マルチエージェントコラボレーションの構造とワークフローを定義し、エージェントがどのように対話し、タスクを委任し、情報を共有するかを確立します。
タスク
タスクは、完了する必要がある特定の目的または活動を表します。複雑さと要件に応じて、個々のエージェントまたはクルー内の複数のエージェントに割り当てることができます。
ツール
ツールは、外部サービス、API、データベース、または特殊な機能へのアクセスを提供することで、エージェントの機能を拡張します。エージェントは、コアの言語モデル機能を超えるアクションを実行するためにツールを使用できます。
Would you like me to continue with the remaining sections?```bash
Install CrewAI using pip
pip install crewai
Install with additional tools
pip install ‘crewai[tools]‘
Install development version
pip install git+https://github.com/crewAIInc/crewAI.git
### Environment Setup
```python
import os
from crewai import Agent, Task, Crew, Process
# Set up API keys for LLM providers
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
# or
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-api-key"
# or other supported providers
Project Structure
my_crew_project/
├── agents/
│ ├── __init__.py
│ ├── researcher.py
│ └── writer.py
├── tasks/
│ ├── __init__.py
│ ├── research_tasks.py
│ └── writing_tasks.py
├── tools/
│ ├── __init__.py
│ └── custom_tools.py
├── crews/
│ ├── __init__.py
│ └── content_crew.py
└── main.py
Agent Configuration
Basic Agent Creation
from crewai import Agent
# Create a basic agent
researcher = Agent(
role='Research Specialist',
goal='Conduct thorough research on given topics',
backstory="""You are an experienced researcher with expertise in
gathering, analyzing, and synthesizing information from multiple sources.
You have a keen eye for detail and can identify reliable sources.""",
verbose=True,
allow_delegation=False
)
Advanced Agent Configuration
from crewai import Agent
from crewai_tools import SerperDevTool, WebsiteSearchTool
# Create an agent with tools and advanced settings
research_agent = Agent(
role='Senior Research Analyst',
goal='Provide comprehensive analysis and insights on market trends',
backstory="""You are a senior research analyst with 10+ years of experience
in market research and competitive analysis. You excel at identifying patterns,
trends, and actionable insights from complex data sets.""",
tools=[SerperDevTool(), WebsiteSearchTool()],
verbose=True,
allow_delegation=True,
max_iter=5,
memory=True,
step_callback=lambda step: print(f"Agent step: \\\\{step\\\\}"),
system_template="""You are \\\\{role\\\\}. \\\\{backstory\\\\}
Your goal is: \\\\{goal\\\\}
Always provide detailed analysis with supporting evidence."""
)
Agent with Custom LLM
from langchain.llms import OpenAI
from crewai import Agent
# Use custom LLM configuration
custom_llm = OpenAI(temperature=0.7, model_name="gpt-4")
analyst = Agent(
role='Data Analyst',
goal='Analyze data and provide statistical insights',
backstory='Expert in statistical analysis and data interpretation',
llm=custom_llm,
verbose=True
)
Multimodal Agent
from crewai import Agent
# Agent with multimodal capabilities
visual_analyst = Agent(
role='Visual Content Analyst',
goal='Analyze images and visual content for insights',
backstory='Specialist in visual content analysis and interpretation',
multimodal=True, # Enable multimodal capabilities
tools=[image_analysis_tool],
verbose=True
)
Task Definition and Management
Basic Task Creation
from crewai import Task
# Define a simple task
research_task = Task(
description="""Conduct comprehensive research on artificial intelligence
trends in 2024. Focus on:
1. Emerging AI technologies
2. Market adoption rates
3. Key industry players
4. Future predictions
Provide a detailed report with sources and citations.""",
agent=researcher,
expected_output="A comprehensive research report with citations"
)
Advanced Task Configuration
from crewai import Task
# Task with dependencies and callbacks
analysis_task = Task(
description="""Analyze the research findings and create strategic
recommendations for AI adoption in enterprise environments.""",
agent=analyst,
expected_output="Strategic recommendations document with actionable insights",
context=[research_task], # Depends on research_task completion
callback=lambda output: save_to_database(output),
async_execution=False,
output_file="analysis_report.md"
)
Task with Custom Output Parsing
from crewai import Task
from pydantic import BaseModel
from typing import List
class ResearchOutput(BaseModel):
title: str
summary: str
key_findings: List[str]
sources: List[str]
confidence_score: float
structured_task = Task(
description="Research AI market trends and provide structured output",
agent=researcher,
expected_output="Structured research findings",
output_pydantic=ResearchOutput
)
Conditional Task Execution
from crewai import Task
def should_execute_task(context):
# Custom logic to determine if task should execute
return len(context.get('findings', [])) > 5
conditional_task = Task(
description="Perform detailed analysis if sufficient data is available",
agent=analyst,
expected_output="Detailed analysis report",
condition=should_execute_task
)
Crew Orchestration
Basic Crew Setup
from crewai import Crew, Process
# Create a basic crew
content_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=2,
process=Process.sequential
)
# Execute the crew
result = content_crew.kickoff()
print(result)
Advanced Crew Configuration
from crewai import Crew, Process
from crewai.memory import LongTermMemory
# Advanced crew with memory and custom settings
advanced_crew = Crew(
agents=[researcher, analyst, writer, reviewer],
tasks=[research_task, analysis_task, writing_task, review_task],
process=Process.hierarchical,
memory=LongTermMemory(),
verbose=2,
manager_llm=manager_llm,
function_calling_llm=function_llm,
max_rpm=10,
share_crew=True,
step_callback=crew_step_callback,
task_callback=crew_task_callback
)
Hierarchical Process
from crewai import Crew, Process, Agent
# Manager agent for hierarchical process
manager = Agent(
role='Project Manager',
goal='Coordinate team activities and ensure quality deliverables',
backstory='Experienced project manager with strong leadership skills',
allow_delegation=True
)
hierarchical_crew = Crew(
agents=[manager, researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.hierarchical,
manager_agent=manager,
verbose=2
)
Parallel Task Execution
from crewai import Crew, Process
# Crew with parallel task execution
parallel_crew = Crew(
agents=[researcher1, researcher2, researcher3],
tasks=[task1, task2, task3],
process=Process.sequential, # Overall sequential, but tasks can run in parallel
max_execution_time=3600, # 1 hour timeout
verbose=2
)
# Execute with parallel capabilities
result = parallel_crew.kickoff(inputs=\\\\{
'topic': 'AI in Healthcare',
'deadline': '2024-12-31'
\\\\})
Tool Integration
Built-in Tools
from crewai_tools import (
SerperDevTool,
WebsiteSearchTool,
FileReadTool,
DirectoryReadTool,
CodeDocsSearchTool,
YoutubeVideoSearchTool
)
# Configure built-in tools
search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()
file_tool = FileReadTool()
code_tool = CodeDocsSearchTool()
# Agent with multiple tools
multi_tool_agent = Agent(
role='Research Assistant',
goal='Gather information from multiple sources',
backstory='Versatile researcher with access to various information sources',
tools=[search_tool, web_tool, file_tool, code_tool],
verbose=True
)
Custom Tool Development
from crewai_tools import BaseTool
from typing import Type
from pydantic import BaseModel, Field
class DatabaseQueryInput(BaseModel):
query: str = Field(description="SQL query to execute")
database: str = Field(description="Database name")
class DatabaseQueryTool(BaseTool):
name: str = "Database Query Tool"
description: str = "Execute SQL queries against specified databases"
args_schema: Type[BaseModel] = DatabaseQueryInput
def _run(self, query: str, database: str) -> str:
# Implement database query logic
try:
# Connect to database and execute query
result = execute_database_query(database, query)
return f"Query executed successfully: \\\\{result\\\\}"
except Exception as e:
return f"Query failed: \\\\{str(e)\\\\}"
# Use custom tool
db_tool = DatabaseQueryTool()
database_agent = Agent(
role='Database Analyst',
goal='Query and analyze database information',
backstory='Expert in database operations and SQL',
tools=[db_tool],
verbose=True
)
API Integration Tool
from crewai_tools import BaseTool
import requests
class APIIntegrationTool(BaseTool):
name: str = "API Integration Tool"
description: str = "Make HTTP requests to external APIs"
def _run(self, endpoint: str, method: str = "GET", data: dict = None) -> str:
try:
if method.upper() == "GET":
response = requests.get(endpoint)
elif method.upper() == "POST":
response = requests.post(endpoint, json=data)
return response.json()
except Exception as e:
return f"API request failed: \\\\{str(e)\\\\}"
# Agent with API capabilities
api_agent = Agent(
role='API Integration Specialist',
goal='Interact with external services via APIs',
backstory='Expert in API integration and data retrieval',
tools=[APIIntegrationTool()],
verbose=True
)
Memory and Context Management
Long-Term Memory
from crewai.memory import LongTermMemory
from crewai import Crew
# Crew with persistent memory
memory_crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
memory=LongTermMemory(),
verbose=2
)
# Memory persists across executions
result1 = memory_crew.kickoff(inputs=\\\\{'topic': 'AI Ethics'\\\\})
result2 = memory_crew.kickoff(inputs=\\\\{'topic': 'AI Regulation'\\\\})
Context Sharing
from crewai import Task, Agent
# Tasks that share context
context_task1 = Task(
description="Research market trends",
agent=researcher,
expected_output="Market trend analysis"
)
context_task2 = Task(
description="Analyze the market trends and provide recommendations",
agent=analyst,
expected_output="Strategic recommendations",
context=[context_task1] # Uses output from context_task1
)
```### カスタムメモリ実装
```python
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
class CustomMemory(LongTermMemory):
def __init__(self, storage_path: str = "./custom_memory"):
super().__init__(storage_path=storage_path)
self.custom_entities = \\\\{\\\\}
def save_entity(self, entity_name: str, entity_data: dict):
self.custom_entities[entity_name] = entity_data
# Implement custom storage logic
def retrieve_entity(self, entity_name: str) -> dict:
return self.custom_entities.get(entity_name, \\\\{\\\\})
# Use custom memory
custom_memory = CustomMemory("./project_memory")
crew_with_custom_memory = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
memory=custom_memory
)
```## 高度な機能
### エージェント委任
```python
from crewai import Agent
# Senior agent that can delegate
senior_researcher = Agent(
role='Senior Research Director',
goal='Oversee research projects and delegate tasks',
backstory='Experienced research director with team management skills',
allow_delegation=True,
max_delegation=3,
verbose=True
)
# Junior agents that can receive delegated tasks
junior_researcher1 = Agent(
role='Junior Researcher - Technology',
goal='Research technology trends and innovations',
backstory='Specialized in technology research',
allow_delegation=False
)
junior_researcher2 = Agent(
role='Junior Researcher - Market Analysis',
goal='Analyze market conditions and competitive landscape',
backstory='Specialized in market research and analysis',
allow_delegation=False
)
# Crew with delegation hierarchy
delegation_crew = Crew(
agents=[senior_researcher, junior_researcher1, junior_researcher2],
tasks=[complex_research_task],
process=Process.hierarchical,
verbose=2
)
```### 推論と計画
```python
from crewai import Agent
# Agent with enhanced reasoning capabilities
reasoning_agent = Agent(
role='Strategic Planner',
goal='Develop comprehensive strategies with detailed reasoning',
backstory='Expert strategic planner with strong analytical skills',
reasoning=True, # Enable reasoning capabilities
planning=True, # Enable planning capabilities
verbose=True,
max_iter=10
)
# Task that requires complex reasoning
strategic_task = Task(
description="""Develop a comprehensive 5-year strategic plan for AI adoption
in the healthcare industry. Consider:
1. Current market conditions
2. Regulatory environment
3. Technology readiness
4. Competitive landscape
5. Implementation challenges
Provide detailed reasoning for each recommendation.""",
agent=reasoning_agent,
expected_output="Comprehensive strategic plan with detailed reasoning"
)
```### コールバックと監視
```python
from crewai import Crew, Agent, Task
def agent_step_callback(agent_output):
print(f"Agent \\\\{agent_output.agent\\\\} completed step: \\\\{agent_output.step\\\\}")
# Log to monitoring system
log_agent_activity(agent_output)
def task_completion_callback(task_output):
print(f"Task completed: \\\\{task_output.description\\\\}")
# Send notification or update dashboard
notify_task_completion(task_output)
def crew_step_callback(crew_output):
print(f"Crew step completed: \\\\{crew_output\\\\}")
# Update progress tracking
update_progress(crew_output)
# Crew with comprehensive monitoring
monitored_crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
step_callback=crew_step_callback,
task_callback=task_completion_callback,
verbose=2
)
# Agents with individual monitoring
monitored_agent = Agent(
role='Monitored Researcher',
goal='Conduct research with detailed monitoring',
backstory='Researcher with comprehensive activity tracking',
step_callback=agent_step_callback,
verbose=True
)
```## エラー処理と回復力
### リトライロジック
```python
from crewai import Task, Agent
import time
def retry_callback(attempt, error):
print(f"Task failed on attempt \\\\{attempt\\\\}: \\\\{error\\\\}")
time.sleep(2 ** attempt) # Exponential backoff
resilient_task = Task(
description="Perform web research with retry logic",
agent=researcher,
expected_output="Research findings",
max_retries=3,
retry_callback=retry_callback
)
```### エラー回復
```python
from crewai import Crew, Process
def error_handler(error, context):
print(f"Error occurred: \\\\{error\\\\}")
# Implement recovery logic
if "rate_limit" in str(error).lower():
time.sleep(60) # Wait for rate limit reset
return True # Retry
return False # Don't retry
error_resilient_crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
error_handler=error_handler,
max_retries=3,
verbose=2
)
```### フォールバックエージェント
```python
from crewai import Agent, Task, Crew
# Primary agent
primary_researcher = Agent(
role='Primary Researcher',
goal='Conduct comprehensive research',
backstory='Expert researcher with specialized tools',
tools=[advanced_search_tool, database_tool]
)
# Fallback agent with basic capabilities
fallback_researcher = Agent(
role='Backup Researcher',
goal='Conduct basic research when primary agent fails',
backstory='Reliable researcher with basic tools',
tools=[basic_search_tool]
)
# Task with fallback logic
research_with_fallback = Task(
description="Conduct research with fallback support",
agent=primary_researcher,
fallback_agent=fallback_researcher,
expected_output="Research findings"
)
```## パフォーマンス最適化
### 並列実行
```python
from crewai import Crew, Process
import asyncio
# Async crew execution
async def run_crew_async():
crew = Crew(
agents=[researcher1, researcher2, researcher3],
tasks=[task1, task2, task3],
process=Process.sequential,
verbose=2
)
result = await crew.kickoff_async(inputs=\\\\{'topic': 'AI Trends'\\\\})
return result
# Run multiple crews in parallel
async def run_multiple_crews():
crews = [create_crew(topic) for topic in ['AI', 'ML', 'NLP']]
results = await asyncio.gather(*[crew.kickoff_async() for crew in crews])
return results
```### リソース管理
```python
from crewai import Crew
import threading
class ResourceManager:
def __init__(self, max_concurrent_agents=5):
self.semaphore = threading.Semaphore(max_concurrent_agents)
self.active_agents = 0
def acquire_agent_slot(self):
self.semaphore.acquire()
self.active_agents += 1
def release_agent_slot(self):
self.semaphore.release()
self.active_agents -= 1
resource_manager = ResourceManager(max_concurrent_agents=3)
# Crew with resource management
resource_managed_crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
resource_manager=resource_manager,
verbose=2
)
```### キャッシングと最適化
```python
from crewai import Agent, Task
from functools import lru_cache
# Agent with caching capabilities
class CachedAgent(Agent):
@lru_cache(maxsize=100)
def cached_execution(self, task_description):
return super().execute_task(task_description)
cached_researcher = CachedAgent(
role='Cached Researcher',
goal='Perform research with caching',
backstory='Efficient researcher with caching capabilities'
)
# Task with caching
cached_task = Task(
description="Research AI trends (cached)",
agent=cached_researcher,
expected_output="Cached research results",
cache_results=True
)
```## 統合パターン
### Flaskウェブアプリケーション
```python
from flask import Flask, request, jsonify
from crewai import Crew, Agent, Task
app = Flask(__name__)
# Initialize crew components
researcher = Agent(
role='API Researcher',
goal='Research topics via web API',
backstory='Researcher accessible via web API'
)
@app.route('/research', methods=['POST'])
def research_endpoint():
data = request.json
topic = data.get('topic')
# Create dynamic task
research_task = Task(
description=f"Research the topic: \\\\{topic\\\\}",
agent=researcher,
expected_output="Research findings"
)
# Execute crew
crew = Crew(agents=[researcher], tasks=[research_task])
result = crew.kickoff()
return jsonify(\\\\{'result': result\\\\})
if __name__ == '__main__':
app.run(debug=True)
```### Celeryバックグラウンドタスク
```python
from celery import Celery
from crewai import Crew, Agent, Task
app = Celery('crewai_tasks')
@app.task
def execute_crew_task(topic, agents_config, tasks_config):
# Reconstruct agents and tasks from config
agents = [create_agent_from_config(config) for config in agents_config]
tasks = [create_task_from_config(config) for config in tasks_config]
# Execute crew
crew = Crew(agents=agents, tasks=tasks)
result = crew.kickoff(inputs=\\\\{'topic': topic\\\\})
return result
# Usage
result = execute_crew_task.delay(
topic="AI in Healthcare",
agents_config=[researcher_config, analyst_config],
tasks_config=[research_config, analysis_config]
)
```### データベース統合
```python
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from crewai import Crew, Agent, Task
import datetime
Base = declarative_base()
class CrewExecution(Base):
__tablename__ = 'crew_executions'
id = Column(Integer, primary_key=True)
crew_name = Column(String(100))
input_data = Column(Text)
output_data = Column(Text)
execution_time = Column(DateTime, default=datetime.datetime.utcnow)
status = Column(String(50))
# Database-integrated crew
class DatabaseCrew(Crew):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.engine = create_engine('sqlite:///crew_executions.db')
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def kickoff(self, inputs=None):
# Log execution start
execution = CrewExecution(
crew_name=self.__class__.__name__,
input_data=str(inputs),
status='running'
)
self.session.add(execution)
self.session.commit()
try:
result = super().kickoff(inputs)
execution.output_data = str(result)
execution.status = 'completed'
except Exception as e:
execution.status = f'failed: \\\\{str(e)\\\\}'
raise
finally:
self.session.commit()
return result
```## ベストプラクティス
### エージェント設計原則
- **単一責任**: 各エージェントは明確で焦点の定まった役割を持つべき
- **明確な目標**: 各エージェントの具体的で測定可能な目的を定義する
- **豊かな背景**: エージェントの動作を改善するための詳細なコンテキストを提供する
- **適切なツール**: エージェントの役割に関連するツールを装備する
- **委任戦略**: 複雑さを避けるために慎重に委任を使用する
### タスク編成
- **明確な説明**: 詳細で曖昧さのないタスク説明を書く
- **期待される出力**: 正確に出力形式を指定する
- **コンテキスト依存関係**: タスクの依存関係とコンテキスト共有を明確に定義する
- **エラー処理**: 堅牢なエラー処理と回復メカニズムを実装する
- **パフォーマンス監視**: タスク実行とパフォーマンスメトリクスを追跡する
### クルー編成
- **プロセス選択**: 適切なプロセスタイプを選択する(順次、階層的、並列)
- **メモリ管理**: コンテキスト保持のためにメモリを戦略的に使用する
- **リソース制限**: 実行時間と反復回数に適切な制限を設定する
- **監視**: 包括的なログ記録と監視を実装する
- **テスト**: さまざまな入力でクルーの動作を徹底的にテストする
### パフォーマンス最適化
- **エージェント専門化**: 特定のドメイン用に専門化されたエージェントを作成する
- **ツール最適化**: 効率的なツールを使用し、外部APIコールを最小限に抑える
- **キャッシング**: 頻繁にアクセスされるデータのキャッシングを実装する
- **並列実行**: 適切な場所で並列処理を活用する
- **リソース管理**: 計算リソースを監視および管理する
## トラブルシューティング
### 一般的な問題
#### エージェントが応答しない
```python
# Debug agent configuration
agent = Agent(
role='Debug Agent',
goal='Test agent responsiveness',
backstory='Agent for debugging purposes',
verbose=True, # Enable verbose output
max_iter=1, # Limit iterations for testing
allow_delegation=False
)
# Test with simple task
test_task = Task(
description="Say hello and confirm you are working",
agent=agent,
expected_output="Simple greeting message"
)
メモリの問題
# Clear memory if needed
crew.memory.clear()
# Check memory usage
print(f"Memory entities: \\\\{len(crew.memory.entities)\\\\}")
print(f"Memory size: \\\\{crew.memory.get_memory_size()\\\\}")
ツール統合の問題
# Test tool functionality
tool = SerperDevTool()
try:
result = tool._run("test query")
print(f"Tool working: \\\\{result\\\\}")
except Exception as e:
print(f"Tool error: \\\\{e\\\\}")
パフォーマンスの問題
# Monitor execution time
import time
start_time = time.time()
result = crew.kickoff()
execution_time = time.time() - start_time
print(f"Execution time: \\\\{execution_time\\\\} seconds")
# Profile memory usage
import tracemalloc
tracemalloc.start()
result = crew.kickoff()
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: \\\\{current / 1024 / 1024:.2f\\\\} MB")
print(f"Peak memory usage: \\\\{peak / 1024 / 1024:.2f\\\\} MB")
この包括的なCrewAIチートシートは、洗練された複数エージェントAIシステムを構築するために必要なすべてを提供します。基本的なセットアップから高度な編成パターンまで、これらの例とベストプラクティスを使用して、複数の専門化されたエージェントのコラボレーティブパワーを活用する強力なAIアプリケーションを作成します。
Note: Texts 15-18 were left blank as no original text was provided for translation.