Appearance
LangGraph Framework Cheat Sheet
Overview
LangGraph represents a paradigm shift in building AI agent workflows, offering a stateful, orchestration framework that brings unprecedented control and flexibility to agent-based applications. Developed by the LangChain team, LangGraph addresses the limitations of traditional linear agent frameworks by introducing a graph-based approach where complex workflows are structured as interconnected nodes and edges, enabling sophisticated multi-agent systems that can handle branching logic, conditional execution, and stateful interactions.
What sets LangGraph apart is its ability to model complex real-world workflows that require decision points, parallel processing, and dynamic routing based on intermediate results. Unlike simple chain-based approaches, LangGraph allows developers to create workflows where agents can collaborate, compete, or operate independently while maintaining shared state and context. This makes it particularly powerful for applications requiring sophisticated reasoning, multi-step problem solving, and adaptive behavior based on changing conditions.
The framework combines the flexibility of graph-based computation with the reliability needed for production systems, offering features like persistence, streaming, debugging support, and seamless deployment through the LangGraph Platform. This positions LangGraph as the go-to solution for developers building next-generation AI applications that require more than simple request-response patterns.
Installation and Setup
Basic Installation
bash
# Install LangGraph
pip install langgraph
# Install with additional dependencies
pip install "langgraph[all]"
# Install development version
pip install git+https://github.com/langchain-ai/langgraph.git
# Install LangGraph Platform CLI (for deployment)
pip install langgraph-cli
Environment Configuration
python
import os
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# Set up API keys
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-api-key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
# Initialize LLMs
openai_llm = ChatOpenAI(model="gpt-4", temperature=0)
anthropic_llm = ChatAnthropic(model="claude-3-sonnet-20240229")
Project Structure
langgraph_project/
├── graphs/
│ ├── __init__.py
│ ├── research_graph.py
│ └── analysis_graph.py
├── nodes/
│ ├── __init__.py
│ ├── agent_nodes.py
│ └── tool_nodes.py
├── state/
│ ├── __init__.py
│ └── state_schemas.py
├── tools/
│ ├── __init__.py
│ └── custom_tools.py
├── checkpoints/
│ └── memory.db
├── config/
│ ├── __init__.py
│ └── settings.py
└── main.py
Core Concepts
State Management
python
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph
# Define state schema
class AgentState(TypedDict):
messages: List[str]
current_task: str
completed_tasks: List[str]
research_data: Optional[dict]
analysis_results: Optional[dict]
final_output: Optional[str]
iteration_count: int
error_messages: List[str]
# Initialize state
initial_state = AgentState(
messages=[],
current_task="",
completed_tasks=[],
research_data=None,
analysis_results=None,
final_output=None,
iteration_count=0,
error_messages=[]
)
Graph Construction
python
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
# Create graph with state management
workflow = StateGraph(AgentState)
# Add nodes (functions that process state)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("synthesize", synthesize_node)
workflow.add_node("review", review_node)
# Define edges (transitions between nodes)
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "synthesize")
workflow.add_edge("synthesize", "review")
# Conditional edges based on state
def should_continue(state: AgentState) -> str:
if state["iteration_count"] > 5:
return END
elif state["error_messages"]:
return "research" # Retry research if errors
else:
return "review"
workflow.add_conditional_edges(
"review",
should_continue,
{
END: END,
"research": "research",
"review": "review"
}
)
# Set entry point
workflow.set_entry_point("research")
# Compile graph with checkpointing
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)
Node Implementation
Basic Node Functions
python
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
def research_node(state: AgentState) -> AgentState:
"""Research node that gathers information"""
current_task = state["current_task"]
# Perform research using LLM
research_prompt = f"""
Research the following topic comprehensively:
{current_task}
Provide detailed findings including:
1. Key facts and statistics
2. Current trends and developments
3. Expert opinions and analysis
4. Relevant sources and references
"""
response = llm.invoke([HumanMessage(content=research_prompt)])
# Update state
state["research_data"] = {
"findings": response.content,
"timestamp": datetime.now().isoformat(),
"sources": extract_sources(response.content)
}
state["completed_tasks"].append("research")
state["messages"].append(f"Research completed for: {current_task}")
return state
def analyze_node(state: AgentState) -> AgentState:
"""Analysis node that processes research data"""
research_data = state["research_data"]
if not research_data:
state["error_messages"].append("No research data available for analysis")
return state
analysis_prompt = f"""
Analyze the following research findings:
{research_data['findings']}
Provide:
1. Key insights and patterns
2. Strengths and weaknesses
3. Opportunities and threats
4. Strategic recommendations
"""
response = llm.invoke([HumanMessage(content=analysis_prompt)])
state["analysis_results"] = {
"insights": response.content,
"timestamp": datetime.now().isoformat(),
"confidence_score": calculate_confidence(response.content)
}
state["completed_tasks"].append("analysis")
state["messages"].append("Analysis completed")
return state
def synthesize_node(state: AgentState) -> AgentState:
"""Synthesis node that combines research and analysis"""
research_data = state["research_data"]
analysis_results = state["analysis_results"]
synthesis_prompt = f"""
Synthesize the following research and analysis into a comprehensive report:
Research Findings:
{research_data['findings']}
Analysis Results:
{analysis_results['insights']}
Create a cohesive narrative that:
1. Integrates all key findings
2. Provides actionable recommendations
3. Addresses potential concerns
4. Outlines next steps
"""
response = llm.invoke([HumanMessage(content=synthesis_prompt)])
state["final_output"] = response.content
state["completed_tasks"].append("synthesis")
state["messages"].append("Synthesis completed")
return state
Advanced Node Patterns
python
from typing import Dict, Any
import asyncio
class AsyncResearchNode:
"""Async node for parallel research operations"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
async def __call__(self, state: AgentState) -> AgentState:
current_task = state["current_task"]
# Parallel research tasks
research_tasks = [
self.web_research(current_task),
self.academic_research(current_task),
self.news_research(current_task)
]
results = await asyncio.gather(*research_tasks, return_exceptions=True)
# Combine results
combined_research = {
"web_findings": results[0] if not isinstance(results[0], Exception) else None,
"academic_findings": results[1] if not isinstance(results[1], Exception) else None,
"news_findings": results[2] if not isinstance(results[2], Exception) else None,
"timestamp": datetime.now().isoformat()
}
state["research_data"] = combined_research
state["completed_tasks"].append("parallel_research")
return state
async def web_research(self, topic: str) -> Dict[str, Any]:
# Implement web research logic
pass
async def academic_research(self, topic: str) -> Dict[str, Any]:
# Implement academic research logic
pass
async def news_research(self, topic: str) -> Dict[str, Any]:
# Implement news research logic
pass
# Multi-agent collaboration node
class CollaborativeNode:
"""Node that coordinates multiple agents"""
def __init__(self, agents: Dict[str, Any]):
self.agents = agents
def __call__(self, state: AgentState) -> AgentState:
current_task = state["current_task"]
# Assign tasks to different agents
agent_results = {}
for agent_name, agent in self.agents.items():
agent_task = self.create_agent_task(current_task, agent_name)
result = agent.invoke(agent_task)
agent_results[agent_name] = result
# Synthesize agent results
synthesis_prompt = f"""
Multiple agents have worked on the task: {current_task}
Agent Results:
{self.format_agent_results(agent_results)}
Synthesize these perspectives into a cohesive solution.
"""
final_result = self.synthesizer_llm.invoke(synthesis_prompt)
state["collaborative_results"] = {
"individual_results": agent_results,
"synthesized_result": final_result.content,
"participating_agents": list(self.agents.keys())
}
return state
Tool Integration Nodes
python
from langgraph.prebuilt import ToolExecutor
from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun
# Create tool executor
tools = [DuckDuckGoSearchRun(), WikipediaQueryRun()]
tool_executor = ToolExecutor(tools)
def tool_calling_node(state: AgentState) -> AgentState:
"""Node that uses tools to gather information"""
current_task = state["current_task"]
# Determine which tools to use
tool_selection_prompt = f"""
For the task: {current_task}
Which tools should be used and what queries should be made?
Available tools: {[tool.name for tool in tools]}
Respond with a JSON list of tool calls:
[{"tool": "tool_name", "query": "search_query"}, ...]
"""
tool_plan = llm.invoke([HumanMessage(content=tool_selection_prompt)])
tool_calls = parse_tool_calls(tool_plan.content)
# Execute tools
tool_results = []
for tool_call in tool_calls:
try:
result = tool_executor.invoke({
"tool": tool_call["tool"],
"tool_input": tool_call["query"]
})
tool_results.append({
"tool": tool_call["tool"],
"query": tool_call["query"],
"result": result,
"success": True
})
except Exception as e:
tool_results.append({
"tool": tool_call["tool"],
"query": tool_call["query"],
"error": str(e),
"success": False
})
state["tool_results"] = tool_results
state["completed_tasks"].append("tool_execution")
return state
def parse_tool_calls(content: str) -> List[Dict[str, str]]:
"""Parse tool calls from LLM response"""
import json
try:
return json.loads(content)
except:
# Fallback parsing logic
return [{"tool": "search", "query": content}]
Conditional Logic and Routing
Dynamic Routing
python
def route_based_on_content(state: AgentState) -> str:
"""Route based on content analysis"""
current_task = state["current_task"].lower()
if "research" in current_task or "analyze" in current_task:
return "research_path"
elif "create" in current_task or "generate" in current_task:
return "creation_path"
elif "review" in current_task or "evaluate" in current_task:
return "review_path"
else:
return "general_path"
def quality_gate(state: AgentState) -> str:
"""Quality gate for output validation"""
if not state["final_output"]:
return "retry"
# Quality assessment
quality_score = assess_output_quality(state["final_output"])
if quality_score > 0.8:
return "approve"
elif quality_score > 0.6:
return "revise"
else:
return "retry"
# Add conditional routing to graph
workflow.add_conditional_edges(
"initial_analysis",
route_based_on_content,
{
"research_path": "research_node",
"creation_path": "creation_node",
"review_path": "review_node",
"general_path": "general_node"
}
)
workflow.add_conditional_edges(
"final_review",
quality_gate,
{
"approve": END,
"revise": "revision_node",
"retry": "research_node"
}
)
Complex Decision Trees
python
def multi_criteria_router(state: AgentState) -> str:
"""Multi-criteria decision routing"""
criteria = {
"complexity": assess_task_complexity(state["current_task"]),
"urgency": assess_task_urgency(state),
"resources": assess_available_resources(state),
"expertise_needed": assess_expertise_requirements(state["current_task"])
}
# Decision matrix
if criteria["complexity"] > 0.8 and criteria["expertise_needed"] > 0.7:
return "expert_collaboration"
elif criteria["urgency"] > 0.9:
return "fast_track"
elif criteria["resources"] < 0.3:
return "resource_optimization"
else:
return "standard_processing"
def adaptive_retry_logic(state: AgentState) -> str:
"""Adaptive retry with different strategies"""
error_count = len(state["error_messages"])
last_error = state["error_messages"][-1] if state["error_messages"] else ""
if error_count == 0:
return "proceed"
elif error_count == 1:
if "timeout" in last_error.lower():
return "retry_with_timeout"
else:
return "retry_with_different_approach"
elif error_count == 2:
return "escalate_to_human"
else:
return "abort"
# Complex routing configuration
workflow.add_conditional_edges(
"task_assessment",
multi_criteria_router,
{
"expert_collaboration": "expert_node",
"fast_track": "fast_track_node",
"resource_optimization": "optimized_node",
"standard_processing": "standard_node"
}
)
Parallel Processing and Concurrency
Parallel Node Execution
python
from langgraph.graph import StateGraph
import asyncio
class ParallelProcessingGraph:
"""Graph with parallel processing capabilities"""
def __init__(self):
self.workflow = StateGraph(AgentState)
self.setup_parallel_nodes()
def setup_parallel_nodes(self):
# Add parallel processing nodes
self.workflow.add_node("parallel_research", self.parallel_research_node)
self.workflow.add_node("parallel_analysis", self.parallel_analysis_node)
self.workflow.add_node("synthesis", self.synthesis_node)
# Parallel execution pattern
self.workflow.add_edge("parallel_research", "synthesis")
self.workflow.add_edge("parallel_analysis", "synthesis")
self.workflow.set_entry_point("parallel_research")
self.workflow.set_entry_point("parallel_analysis")
async def parallel_research_node(self, state: AgentState) -> AgentState:
"""Execute multiple research tasks in parallel"""
research_tasks = [
self.web_search(state["current_task"]),
self.database_query(state["current_task"]),
self.expert_consultation(state["current_task"])
]
results = await asyncio.gather(*research_tasks, return_exceptions=True)
state["parallel_research_results"] = {
"web_search": results[0] if not isinstance(results[0], Exception) else None,
"database_query": results[1] if not isinstance(results[1], Exception) else None,
"expert_consultation": results[2] if not isinstance(results[2], Exception) else None
}
return state
async def parallel_analysis_node(self, state: AgentState) -> AgentState:
"""Execute multiple analysis tasks in parallel"""
analysis_tasks = [
self.sentiment_analysis(state["current_task"]),
self.trend_analysis(state["current_task"]),
self.competitive_analysis(state["current_task"])
]
results = await asyncio.gather(*analysis_tasks, return_exceptions=True)
state["parallel_analysis_results"] = {
"sentiment": results[0] if not isinstance(results[0], Exception) else None,
"trends": results[1] if not isinstance(results[1], Exception) else None,
"competitive": results[2] if not isinstance(results[2], Exception) else None
}
return state
# Fan-out/Fan-in pattern
def fan_out_node(state: AgentState) -> AgentState:
"""Split work into parallel streams"""
task = state["current_task"]
# Create subtasks
subtasks = split_task_into_subtasks(task)
state["subtasks"] = subtasks
state["parallel_streams"] = len(subtasks)
return state
def fan_in_node(state: AgentState) -> AgentState:
"""Combine results from parallel streams"""
subtask_results = state.get("subtask_results", {})
if len(subtask_results) < state["parallel_streams"]:
# Not all streams completed yet
return state
# Combine all results
combined_result = combine_subtask_results(subtask_results)
state["final_output"] = combined_result
return state
Load Balancing and Resource Management
python
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
class ResourceManagedGraph:
"""Graph with resource management and load balancing"""
def __init__(self, max_workers=4, max_concurrent_llm_calls=2):
self.max_workers = max_workers
self.max_concurrent_llm_calls = max_concurrent_llm_calls
self.llm_semaphore = threading.Semaphore(max_concurrent_llm_calls)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = Queue()
def resource_aware_node(self, state: AgentState) -> AgentState:
"""Node that manages resource usage"""
with self.llm_semaphore:
# LLM call with resource management
result = self.make_llm_call(state["current_task"])
state["resource_managed_result"] = result
return state
def load_balanced_processing(self, state: AgentState) -> AgentState:
"""Distribute work across available resources"""
tasks = state.get("pending_tasks", [])
if not tasks:
return state
# Submit tasks to thread pool
futures = []
for task in tasks:
future = self.executor.submit(self.process_single_task, task)
futures.append(future)
# Collect results
results = []
for future in futures:
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
results.append({"error": str(e)})
state["load_balanced_results"] = results
return state
def adaptive_scaling_node(self, state: AgentState) -> AgentState:
"""Adapt resource usage based on workload"""
current_load = self.assess_current_load()
if current_load > 0.8:
# High load - reduce concurrency
self.llm_semaphore = threading.Semaphore(1)
elif current_load < 0.3:
# Low load - increase concurrency
self.llm_semaphore = threading.Semaphore(self.max_concurrent_llm_calls * 2)
return state
Memory and Persistence
Checkpointing and State Persistence
python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
import sqlite3
# SQLite checkpointing
def setup_sqlite_checkpointing():
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
memory = SqliteSaver(conn)
return memory
# PostgreSQL checkpointing for production
def setup_postgres_checkpointing():
connection_string = "postgresql://user:password@localhost/langgraph_db"
memory = PostgresSaver.from_conn_string(connection_string)
return memory
# Custom checkpointing implementation
class CustomCheckpointer:
"""Custom checkpointing with additional features"""
def __init__(self, storage_backend):
self.storage = storage_backend
self.checkpoint_history = {}
def save_checkpoint(self, thread_id: str, state: AgentState, step: int):
checkpoint_data = {
"thread_id": thread_id,
"state": state,
"step": step,
"timestamp": datetime.now().isoformat(),
"metadata": self.extract_metadata(state)
}
self.storage.save(f"{thread_id}_{step}", checkpoint_data)
# Keep checkpoint history
if thread_id not in self.checkpoint_history:
self.checkpoint_history[thread_id] = []
self.checkpoint_history[thread_id].append(step)
def load_checkpoint(self, thread_id: str, step: int = None):
if step is None:
# Load latest checkpoint
steps = self.checkpoint_history.get(thread_id, [])
if not steps:
return None
step = max(steps)
return self.storage.load(f"{thread_id}_{step}")
def rollback_to_checkpoint(self, thread_id: str, step: int):
checkpoint = self.load_checkpoint(thread_id, step)
if checkpoint:
return checkpoint["state"]
return None
# Usage with custom checkpointing
custom_memory = CustomCheckpointer(storage_backend)
app = workflow.compile(checkpointer=custom_memory)
Thread Management and Session Handling
python
from typing import Dict, Any
import uuid
class ThreadManager:
"""Manage multiple conversation threads"""
def __init__(self, checkpointer):
self.checkpointer = checkpointer
self.active_threads = {}
self.thread_metadata = {}
def create_thread(self, user_id: str, initial_state: AgentState) -> str:
thread_id = str(uuid.uuid4())
self.active_threads[thread_id] = {
"user_id": user_id,
"created_at": datetime.now(),
"last_activity": datetime.now(),
"state": initial_state
}
self.thread_metadata[thread_id] = {
"user_id": user_id,
"session_count": 0,
"total_messages": 0
}
return thread_id
def get_thread_state(self, thread_id: str) -> AgentState:
if thread_id in self.active_threads:
return self.active_threads[thread_id]["state"]
# Try to load from checkpoint
checkpoint = self.checkpointer.load_checkpoint(thread_id)
if checkpoint:
return checkpoint["state"]
return None
def update_thread_state(self, thread_id: str, new_state: AgentState):
if thread_id in self.active_threads:
self.active_threads[thread_id]["state"] = new_state
self.active_threads[thread_id]["last_activity"] = datetime.now()
# Save checkpoint
self.checkpointer.save_checkpoint(thread_id, new_state,
self.get_current_step(thread_id))
def cleanup_inactive_threads(self, max_age_hours: int = 24):
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
inactive_threads = [
thread_id for thread_id, data in self.active_threads.items()
if data["last_activity"] < cutoff_time
]
for thread_id in inactive_threads:
del self.active_threads[thread_id]
# Session-aware graph execution
def execute_with_session(app, thread_id: str, input_data: Dict[str, Any]):
config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": f"session_{thread_id}"
}
}
result = app.invoke(input_data, config=config)
return result
Long-term Memory Integration
python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
class LongTermMemoryGraph:
"""Graph with long-term memory capabilities"""
def __init__(self, llm):
self.llm = llm
self.conversation_memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000,
return_messages=True
)
self.episodic_memory = FAISS.from_texts(
["Initial memory"],
OpenAIEmbeddings()
)
self.semantic_memory = {}
def memory_enhanced_node(self, state: AgentState) -> AgentState:
"""Node that leverages long-term memory"""
current_task = state["current_task"]
# Retrieve relevant memories
relevant_episodes = self.episodic_memory.similarity_search(
current_task, k=3
)
conversation_context = self.conversation_memory.load_memory_variables({})
# Enhance prompt with memory context
enhanced_prompt = f"""
Current Task: {current_task}
Relevant Past Experiences:
{self.format_episodes(relevant_episodes)}
Conversation History:
{conversation_context.get('history', '')}
Semantic Knowledge:
{self.get_relevant_semantic_knowledge(current_task)}
Based on this context, proceed with the task.
"""
response = self.llm.invoke([HumanMessage(content=enhanced_prompt)])
# Update memories
self.update_episodic_memory(current_task, response.content)
self.conversation_memory.save_context(
{"input": current_task},
{"output": response.content}
)
state["memory_enhanced_result"] = response.content
return state
def update_episodic_memory(self, task: str, result: str):
"""Update episodic memory with new experience"""
episode = f"Task: {task}\nResult: {result}\nTimestamp: {datetime.now()}"
# Add to vector store
self.episodic_memory.add_texts([episode])
def update_semantic_memory(self, concept: str, knowledge: str):
"""Update semantic memory with new knowledge"""
self.semantic_memory[concept] = knowledge
def get_relevant_semantic_knowledge(self, query: str) -> str:
"""Retrieve relevant semantic knowledge"""
relevant_concepts = []
for concept, knowledge in self.semantic_memory.items():
if any(word in query.lower() for word in concept.lower().split()):
relevant_concepts.append(f"{concept}: {knowledge}")
return "\n".join(relevant_concepts)
Streaming and Real-time Processing
Stream Processing
python
from langgraph.graph import StateGraph
import asyncio
from typing import AsyncIterator
class StreamingGraph:
"""Graph with streaming capabilities"""
def __init__(self):
self.workflow = StateGraph(AgentState)
self.setup_streaming_nodes()
def setup_streaming_nodes(self):
self.workflow.add_node("streaming_research", self.streaming_research_node)
self.workflow.add_node("streaming_analysis", self.streaming_analysis_node)
self.workflow.set_entry_point("streaming_research")
async def streaming_research_node(self, state: AgentState) -> AsyncIterator[AgentState]:
"""Node that yields intermediate results"""
current_task = state["current_task"]
# Yield initial status
state["status"] = "Starting research..."
yield state
# Perform research in chunks
research_steps = [
"Gathering initial sources...",
"Analyzing primary data...",
"Cross-referencing information...",
"Finalizing research findings..."
]
for i, step in enumerate(research_steps):
state["status"] = step
state["progress"] = (i + 1) / len(research_steps)
# Simulate research work
await asyncio.sleep(1)
# Yield intermediate state
yield state
# Final research results
state["research_data"] = {
"findings": f"Research completed for: {current_task}",
"confidence": 0.85,
"sources": ["source1", "source2", "source3"]
}
state["status"] = "Research completed"
yield state
async def stream_execution(self, initial_state: AgentState) -> AsyncIterator[AgentState]:
"""Execute graph with streaming output"""
async for state_update in self.workflow.astream(initial_state):
yield state_update
# Usage
async def main():
streaming_graph = StreamingGraph()
initial_state = AgentState(current_task="Research AI trends", messages=[])
async for state in streaming_graph.stream_execution(initial_state):
print(f"Status: {state.get('status', 'Processing...')}")
print(f"Progress: {state.get('progress', 0) * 100:.1f}%")
print("---")
# Real-time event processing
class EventDrivenGraph:
"""Graph that responds to real-time events"""
def __init__(self):
self.event_queue = asyncio.Queue()
self.active_streams = {}
async def event_listener(self):
"""Listen for incoming events"""
while True:
event = await self.event_queue.get()
await self.process_event(event)
async def process_event(self, event: Dict[str, Any]):
"""Process incoming event"""
event_type = event.get("type")
if event_type == "new_task":
await self.handle_new_task(event)
elif event_type == "update_request":
await self.handle_update_request(event)
elif event_type == "cancellation":
await self.handle_cancellation(event)
async def handle_new_task(self, event: Dict[str, Any]):
"""Handle new task event"""
task_id = event["task_id"]
task_data = event["data"]
# Create new streaming execution
initial_state = AgentState(
current_task=task_data["description"],
task_id=task_id
)
stream = self.workflow.astream(initial_state)
self.active_streams[task_id] = stream
# Process stream
async for state_update in stream:
await self.emit_update(task_id, state_update)
async def emit_update(self, task_id: str, state: AgentState):
"""Emit state update to subscribers"""
update_event = {
"task_id": task_id,
"state": state,
"timestamp": datetime.now().isoformat()
}
# Send to subscribers (WebSocket, SSE, etc.)
await self.notify_subscribers(update_event)
WebSocket Integration
python
import websockets
import json
from typing import Set
class WebSocketGraphServer:
"""WebSocket server for real-time graph execution"""
def __init__(self, graph_app):
self.graph_app = graph_app
self.connected_clients: Set[websockets.WebSocketServerProtocol] = set()
self.client_sessions = {}
async def register_client(self, websocket: websockets.WebSocketServerProtocol):
"""Register new WebSocket client"""
self.connected_clients.add(websocket)
session_id = str(uuid.uuid4())
self.client_sessions[websocket] = session_id
await websocket.send(json.dumps({
"type": "connection_established",
"session_id": session_id
}))
async def unregister_client(self, websocket: websockets.WebSocketServerProtocol):
"""Unregister WebSocket client"""
self.connected_clients.discard(websocket)
if websocket in self.client_sessions:
del self.client_sessions[websocket]
async def handle_client_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
"""Handle message from WebSocket client"""
try:
data = json.loads(message)
message_type = data.get("type")
if message_type == "execute_graph":
await self.execute_graph_for_client(websocket, data)
elif message_type == "get_status":
await self.send_status_to_client(websocket, data)
except json.JSONDecodeError:
await websocket.send(json.dumps({
"type": "error",
"message": "Invalid JSON format"
}))
async def execute_graph_for_client(self, websocket: websockets.WebSocketServerProtocol, data: Dict[str, Any]):
"""Execute graph and stream results to client"""
session_id = self.client_sessions[websocket]
initial_state = AgentState(**data.get("initial_state", {}))
config = {
"configurable": {
"thread_id": session_id
}
}
try:
async for state_update in self.graph_app.astream(initial_state, config=config):
await websocket.send(json.dumps({
"type": "state_update",
"session_id": session_id,
"state": self.serialize_state(state_update),
"timestamp": datetime.now().isoformat()
}))
except Exception as e:
await websocket.send(json.dumps({
"type": "execution_error",
"error": str(e)
}))
async def broadcast_to_all_clients(self, message: Dict[str, Any]):
"""Broadcast message to all connected clients"""
if self.connected_clients:
await asyncio.gather(
*[client.send(json.dumps(message)) for client in self.connected_clients],
return_exceptions=True
)
def serialize_state(self, state: AgentState) -> Dict[str, Any]:
"""Serialize state for JSON transmission"""
# Convert state to JSON-serializable format
return {
key: value for key, value in state.items()
if isinstance(value, (str, int, float, bool, list, dict, type(None)))
}
# Start WebSocket server
async def start_websocket_server(graph_app, host="localhost", port=8765):
server = WebSocketGraphServer(graph_app)
async def handle_client(websocket, path):
await server.register_client(websocket)
try:
async for message in websocket:
await server.handle_client_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
pass
finally:
await server.unregister_client(websocket)
start_server = websockets.serve(handle_client, host, port)
await start_server
Production Deployment
LangGraph Platform Deployment
bash
# Install LangGraph CLI
pip install langgraph-cli
# Initialize LangGraph project
langgraph init my-graph-project
# Configure deployment
cat > langgraph.json << EOF
{
"dependencies": [
"langgraph",
"langchain-openai",
"langchain-community"
],
"graphs": {
"my_graph": "./src/graph.py:workflow"
},
"env": {
"OPENAI_API_KEY": "your-api-key"
}
}
EOF
# Deploy to LangGraph Platform
langgraph deploy --name my-production-graph
Docker Deployment
dockerfile
# Dockerfile for LangGraph application
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY src/ ./src/
COPY config/ ./config/
# Set environment variables
ENV PYTHONPATH=/app
ENV LANGCHAIN_TRACING_V2=true
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Start application
CMD ["python", "-m", "uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
# docker-compose.yml
version: '3.8'
services:
langgraph-app:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://user:password@postgres:5432/langgraph_db
depends_on:
- postgres
- redis
volumes:
- ./checkpoints:/app/checkpoints
restart: unless-stopped
postgres:
image: postgres:15
environment:
POSTGRES_DB: langgraph_db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
Kubernetes Deployment
yaml
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-app
labels:
app: langgraph-app
spec:
replicas: 3
selector:
matchLabels:
app: langgraph-app
template:
metadata:
labels:
app: langgraph-app
spec:
containers:
- name: langgraph-app
image: your-registry/langgraph-app:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-api-key
- name: DATABASE_URL
valueFrom:
configMapKeyRef:
name: app-config
key: database-url
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: langgraph-service
spec:
selector:
app: langgraph-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Monitoring and Observability
Comprehensive Monitoring Setup
python
from langsmith import Client
import logging
import time
from typing import Dict, Any
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# Prometheus metrics
GRAPH_EXECUTIONS = Counter('langgraph_executions_total', 'Total graph executions', ['graph_name', 'status'])
EXECUTION_DURATION = Histogram('langgraph_execution_duration_seconds', 'Graph execution duration')
ACTIVE_THREADS = Gauge('langgraph_active_threads', 'Number of active threads')
NODE_EXECUTIONS = Counter('langgraph_node_executions_total', 'Total node executions', ['node_name', 'status'])
class MonitoringWrapper:
"""Comprehensive monitoring wrapper for LangGraph"""
def __init__(self, graph_app, langsmith_client: Client = None):
self.graph_app = graph_app
self.langsmith_client = langsmith_client
self.logger = self.setup_logging()
self.active_executions = {}
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('langgraph.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
async def execute_with_monitoring(self, initial_state: AgentState, config: Dict[str, Any] = None):
"""Execute graph with comprehensive monitoring"""
execution_id = str(uuid.uuid4())
thread_id = config.get('configurable', {}).get('thread_id', 'default')
# Start monitoring
start_time = time.time()
self.active_executions[execution_id] = {
'thread_id': thread_id,
'start_time': start_time,
'status': 'running'
}
ACTIVE_THREADS.inc()
try:
self.logger.info(f"Starting graph execution {execution_id} for thread {thread_id}")
# Execute graph with streaming
final_state = None
async for state_update in self.graph_app.astream(initial_state, config=config):
# Log state updates
self.log_state_update(execution_id, state_update)
# Send to LangSmith if configured
if self.langsmith_client:
await self.send_to_langsmith(execution_id, state_update)
final_state = state_update
# Success metrics
execution_time = time.time() - start_time
GRAPH_EXECUTIONS.labels(graph_name='main', status='success').inc()
EXECUTION_DURATION.observe(execution_time)
self.logger.info(f"Graph execution {execution_id} completed successfully in {execution_time:.2f}s")
return final_state
except Exception as e:
# Error metrics
execution_time = time.time() - start_time
GRAPH_EXECUTIONS.labels(graph_name='main', status='error').inc()
EXECUTION_DURATION.observe(execution_time)
self.logger.error(f"Graph execution {execution_id} failed: {str(e)}")
raise
finally:
# Cleanup
ACTIVE_THREADS.dec()
if execution_id in self.active_executions:
del self.active_executions[execution_id]
def log_state_update(self, execution_id: str, state: AgentState):
"""Log state updates with structured logging"""
log_data = {
'execution_id': execution_id,
'timestamp': datetime.now().isoformat(),
'completed_tasks': state.get('completed_tasks', []),
'current_task': state.get('current_task', ''),
'iteration_count': state.get('iteration_count', 0)
}
self.logger.info(f"State update: {json.dumps(log_data)}")
async def send_to_langsmith(self, execution_id: str, state: AgentState):
"""Send execution data to LangSmith"""
if not self.langsmith_client:
return
try:
await self.langsmith_client.create_run(
name=f"langgraph_execution_{execution_id}",
run_type="chain",
inputs={"state": state},
extra={"execution_id": execution_id}
)
except Exception as e:
self.logger.warning(f"Failed to send data to LangSmith: {str(e)}")
# Health check endpoints
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return JSONResponse({
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"active_threads": len(monitoring_wrapper.active_executions)
})
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return prometheus_client.generate_latest()
@app.get("/status")
async def status():
"""Detailed status information"""
return JSONResponse({
"active_executions": len(monitoring_wrapper.active_executions),
"execution_details": monitoring_wrapper.active_executions,
"system_info": {
"memory_usage": get_memory_usage(),
"cpu_usage": get_cpu_usage()
}
})
Best Practices and Patterns
Graph Design Principles
- Single Responsibility: Each node should have a clear, focused purpose
- State Management: Design state schema carefully to avoid bloat
- Error Handling: Implement comprehensive error handling and recovery
- Resource Management: Monitor and manage computational resources
- Testing: Create unit tests for individual nodes and integration tests for workflows
Performance Optimization
- Parallel Processing: Leverage parallel execution where appropriate
- Caching: Implement caching for expensive operations
- Resource Pooling: Use connection pooling for external services
- Batch Processing: Group similar operations for efficiency
- Memory Management: Monitor and optimize memory usage
Security Considerations
- Input Validation: Validate all inputs to prevent injection attacks
- Access Control: Implement proper authentication and authorization
- Data Privacy: Handle sensitive data according to privacy regulations
- API Security: Secure API endpoints and manage rate limiting
- Audit Logging: Maintain comprehensive audit trails
This comprehensive LangGraph cheat sheet provides everything needed to build sophisticated, stateful AI agent workflows. From basic graph construction to advanced production deployment patterns, use these examples and best practices to create powerful, scalable AI applications with LangGraph's graph-based approach.