Skip to content

LangGraph Framework Cheat Sheet

Overview

LangGraph represents a paradigm shift in building AI agent workflows, offering a stateful, orchestration framework that brings unprecedented control and flexibility to agent-based applications. Developed by the LangChain team, LangGraph addresses the limitations of traditional linear agent frameworks by introducing a graph-based approach where complex workflows are structured as interconnected nodes and edges, enabling sophisticated multi-agent systems that can handle branching logic, conditional execution, and stateful interactions.

What sets LangGraph apart is its ability to model complex real-world workflows that require decision points, parallel processing, and dynamic routing based on intermediate results. Unlike simple chain-based approaches, LangGraph allows developers to create workflows where agents can collaborate, compete, or operate independently while maintaining shared state and context. This makes it particularly powerful for applications requiring sophisticated reasoning, multi-step problem solving, and adaptive behavior based on changing conditions.

The framework combines the flexibility of graph-based computation with the reliability needed for production systems, offering features like persistence, streaming, debugging support, and seamless deployment through the LangGraph Platform. This positions LangGraph as the go-to solution for developers building next-generation AI applications that require more than simple request-response patterns.

Installation and Setup

Basic Installation

bash
# Install LangGraph
pip install langgraph

# Install with additional dependencies
pip install "langgraph[all]"

# Install development version
pip install git+https://github.com/langchain-ai/langgraph.git

# Install LangGraph Platform CLI (for deployment)
pip install langgraph-cli

Environment Configuration

python
import os
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# Set up API keys
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-api-key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# Initialize LLMs
openai_llm = ChatOpenAI(model="gpt-4", temperature=0)
anthropic_llm = ChatAnthropic(model="claude-3-sonnet-20240229")

Project Structure

langgraph_project/
├── graphs/
│   ├── __init__.py
│   ├── research_graph.py
│   └── analysis_graph.py
├── nodes/
│   ├── __init__.py
│   ├── agent_nodes.py
│   └── tool_nodes.py
├── state/
│   ├── __init__.py
│   └── state_schemas.py
├── tools/
│   ├── __init__.py
│   └── custom_tools.py
├── checkpoints/
│   └── memory.db
├── config/
│   ├── __init__.py
│   └── settings.py
└── main.py

Core Concepts

State Management

python
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph

# Define state schema
class AgentState(TypedDict):
    messages: List[str]
    current_task: str
    completed_tasks: List[str]
    research_data: Optional[dict]
    analysis_results: Optional[dict]
    final_output: Optional[str]
    iteration_count: int
    error_messages: List[str]

# Initialize state
initial_state = AgentState(
    messages=[],
    current_task="",
    completed_tasks=[],
    research_data=None,
    analysis_results=None,
    final_output=None,
    iteration_count=0,
    error_messages=[]
)

Graph Construction

python
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver

# Create graph with state management
workflow = StateGraph(AgentState)

# Add nodes (functions that process state)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("synthesize", synthesize_node)
workflow.add_node("review", review_node)

# Define edges (transitions between nodes)
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "synthesize")
workflow.add_edge("synthesize", "review")

# Conditional edges based on state
def should_continue(state: AgentState) -> str:
    if state["iteration_count"] > 5:
        return END
    elif state["error_messages"]:
        return "research"  # Retry research if errors
    else:
        return "review"

workflow.add_conditional_edges(
    "review",
    should_continue,
    {
        END: END,
        "research": "research",
        "review": "review"
    }
)

# Set entry point
workflow.set_entry_point("research")

# Compile graph with checkpointing
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)

Node Implementation

Basic Node Functions

python
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")

def research_node(state: AgentState) -> AgentState:
    """Research node that gathers information"""
    current_task = state["current_task"]
    
    # Perform research using LLM
    research_prompt = f"""
    Research the following topic comprehensively:
    {current_task}
    
    Provide detailed findings including:
    1. Key facts and statistics
    2. Current trends and developments
    3. Expert opinions and analysis
    4. Relevant sources and references
    """
    
    response = llm.invoke([HumanMessage(content=research_prompt)])
    
    # Update state
    state["research_data"] = {
        "findings": response.content,
        "timestamp": datetime.now().isoformat(),
        "sources": extract_sources(response.content)
    }
    state["completed_tasks"].append("research")
    state["messages"].append(f"Research completed for: {current_task}")
    
    return state

def analyze_node(state: AgentState) -> AgentState:
    """Analysis node that processes research data"""
    research_data = state["research_data"]
    
    if not research_data:
        state["error_messages"].append("No research data available for analysis")
        return state
    
    analysis_prompt = f"""
    Analyze the following research findings:
    {research_data['findings']}
    
    Provide:
    1. Key insights and patterns
    2. Strengths and weaknesses
    3. Opportunities and threats
    4. Strategic recommendations
    """
    
    response = llm.invoke([HumanMessage(content=analysis_prompt)])
    
    state["analysis_results"] = {
        "insights": response.content,
        "timestamp": datetime.now().isoformat(),
        "confidence_score": calculate_confidence(response.content)
    }
    state["completed_tasks"].append("analysis")
    state["messages"].append("Analysis completed")
    
    return state

def synthesize_node(state: AgentState) -> AgentState:
    """Synthesis node that combines research and analysis"""
    research_data = state["research_data"]
    analysis_results = state["analysis_results"]
    
    synthesis_prompt = f"""
    Synthesize the following research and analysis into a comprehensive report:
    
    Research Findings:
    {research_data['findings']}
    
    Analysis Results:
    {analysis_results['insights']}
    
    Create a cohesive narrative that:
    1. Integrates all key findings
    2. Provides actionable recommendations
    3. Addresses potential concerns
    4. Outlines next steps
    """
    
    response = llm.invoke([HumanMessage(content=synthesis_prompt)])
    
    state["final_output"] = response.content
    state["completed_tasks"].append("synthesis")
    state["messages"].append("Synthesis completed")
    
    return state

Advanced Node Patterns

python
from typing import Dict, Any
import asyncio

class AsyncResearchNode:
    """Async node for parallel research operations"""
    
    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools
    
    async def __call__(self, state: AgentState) -> AgentState:
        current_task = state["current_task"]
        
        # Parallel research tasks
        research_tasks = [
            self.web_research(current_task),
            self.academic_research(current_task),
            self.news_research(current_task)
        ]
        
        results = await asyncio.gather(*research_tasks, return_exceptions=True)
        
        # Combine results
        combined_research = {
            "web_findings": results[0] if not isinstance(results[0], Exception) else None,
            "academic_findings": results[1] if not isinstance(results[1], Exception) else None,
            "news_findings": results[2] if not isinstance(results[2], Exception) else None,
            "timestamp": datetime.now().isoformat()
        }
        
        state["research_data"] = combined_research
        state["completed_tasks"].append("parallel_research")
        
        return state
    
    async def web_research(self, topic: str) -> Dict[str, Any]:
        # Implement web research logic
        pass
    
    async def academic_research(self, topic: str) -> Dict[str, Any]:
        # Implement academic research logic
        pass
    
    async def news_research(self, topic: str) -> Dict[str, Any]:
        # Implement news research logic
        pass

# Multi-agent collaboration node
class CollaborativeNode:
    """Node that coordinates multiple agents"""
    
    def __init__(self, agents: Dict[str, Any]):
        self.agents = agents
    
    def __call__(self, state: AgentState) -> AgentState:
        current_task = state["current_task"]
        
        # Assign tasks to different agents
        agent_results = {}
        
        for agent_name, agent in self.agents.items():
            agent_task = self.create_agent_task(current_task, agent_name)
            result = agent.invoke(agent_task)
            agent_results[agent_name] = result
        
        # Synthesize agent results
        synthesis_prompt = f"""
        Multiple agents have worked on the task: {current_task}
        
        Agent Results:
        {self.format_agent_results(agent_results)}
        
        Synthesize these perspectives into a cohesive solution.
        """
        
        final_result = self.synthesizer_llm.invoke(synthesis_prompt)
        
        state["collaborative_results"] = {
            "individual_results": agent_results,
            "synthesized_result": final_result.content,
            "participating_agents": list(self.agents.keys())
        }
        
        return state

Tool Integration Nodes

python
from langgraph.prebuilt import ToolExecutor
from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun

# Create tool executor
tools = [DuckDuckGoSearchRun(), WikipediaQueryRun()]
tool_executor = ToolExecutor(tools)

def tool_calling_node(state: AgentState) -> AgentState:
    """Node that uses tools to gather information"""
    current_task = state["current_task"]
    
    # Determine which tools to use
    tool_selection_prompt = f"""
    For the task: {current_task}
    
    Which tools should be used and what queries should be made?
    Available tools: {[tool.name for tool in tools]}
    
    Respond with a JSON list of tool calls:
    [{"tool": "tool_name", "query": "search_query"}, ...]
    """
    
    tool_plan = llm.invoke([HumanMessage(content=tool_selection_prompt)])
    tool_calls = parse_tool_calls(tool_plan.content)
    
    # Execute tools
    tool_results = []
    for tool_call in tool_calls:
        try:
            result = tool_executor.invoke({
                "tool": tool_call["tool"],
                "tool_input": tool_call["query"]
            })
            tool_results.append({
                "tool": tool_call["tool"],
                "query": tool_call["query"],
                "result": result,
                "success": True
            })
        except Exception as e:
            tool_results.append({
                "tool": tool_call["tool"],
                "query": tool_call["query"],
                "error": str(e),
                "success": False
            })
    
    state["tool_results"] = tool_results
    state["completed_tasks"].append("tool_execution")
    
    return state

def parse_tool_calls(content: str) -> List[Dict[str, str]]:
    """Parse tool calls from LLM response"""
    import json
    try:
        return json.loads(content)
    except:
        # Fallback parsing logic
        return [{"tool": "search", "query": content}]

Conditional Logic and Routing

Dynamic Routing

python
def route_based_on_content(state: AgentState) -> str:
    """Route based on content analysis"""
    current_task = state["current_task"].lower()
    
    if "research" in current_task or "analyze" in current_task:
        return "research_path"
    elif "create" in current_task or "generate" in current_task:
        return "creation_path"
    elif "review" in current_task or "evaluate" in current_task:
        return "review_path"
    else:
        return "general_path"

def quality_gate(state: AgentState) -> str:
    """Quality gate for output validation"""
    if not state["final_output"]:
        return "retry"
    
    # Quality assessment
    quality_score = assess_output_quality(state["final_output"])
    
    if quality_score > 0.8:
        return "approve"
    elif quality_score > 0.6:
        return "revise"
    else:
        return "retry"

# Add conditional routing to graph
workflow.add_conditional_edges(
    "initial_analysis",
    route_based_on_content,
    {
        "research_path": "research_node",
        "creation_path": "creation_node",
        "review_path": "review_node",
        "general_path": "general_node"
    }
)

workflow.add_conditional_edges(
    "final_review",
    quality_gate,
    {
        "approve": END,
        "revise": "revision_node",
        "retry": "research_node"
    }
)

Complex Decision Trees

python
def multi_criteria_router(state: AgentState) -> str:
    """Multi-criteria decision routing"""
    criteria = {
        "complexity": assess_task_complexity(state["current_task"]),
        "urgency": assess_task_urgency(state),
        "resources": assess_available_resources(state),
        "expertise_needed": assess_expertise_requirements(state["current_task"])
    }
    
    # Decision matrix
    if criteria["complexity"] > 0.8 and criteria["expertise_needed"] > 0.7:
        return "expert_collaboration"
    elif criteria["urgency"] > 0.9:
        return "fast_track"
    elif criteria["resources"] < 0.3:
        return "resource_optimization"
    else:
        return "standard_processing"

def adaptive_retry_logic(state: AgentState) -> str:
    """Adaptive retry with different strategies"""
    error_count = len(state["error_messages"])
    last_error = state["error_messages"][-1] if state["error_messages"] else ""
    
    if error_count == 0:
        return "proceed"
    elif error_count == 1:
        if "timeout" in last_error.lower():
            return "retry_with_timeout"
        else:
            return "retry_with_different_approach"
    elif error_count == 2:
        return "escalate_to_human"
    else:
        return "abort"

# Complex routing configuration
workflow.add_conditional_edges(
    "task_assessment",
    multi_criteria_router,
    {
        "expert_collaboration": "expert_node",
        "fast_track": "fast_track_node",
        "resource_optimization": "optimized_node",
        "standard_processing": "standard_node"
    }
)

Parallel Processing and Concurrency

Parallel Node Execution

python
from langgraph.graph import StateGraph
import asyncio

class ParallelProcessingGraph:
    """Graph with parallel processing capabilities"""
    
    def __init__(self):
        self.workflow = StateGraph(AgentState)
        self.setup_parallel_nodes()
    
    def setup_parallel_nodes(self):
        # Add parallel processing nodes
        self.workflow.add_node("parallel_research", self.parallel_research_node)
        self.workflow.add_node("parallel_analysis", self.parallel_analysis_node)
        self.workflow.add_node("synthesis", self.synthesis_node)
        
        # Parallel execution pattern
        self.workflow.add_edge("parallel_research", "synthesis")
        self.workflow.add_edge("parallel_analysis", "synthesis")
        
        self.workflow.set_entry_point("parallel_research")
        self.workflow.set_entry_point("parallel_analysis")
    
    async def parallel_research_node(self, state: AgentState) -> AgentState:
        """Execute multiple research tasks in parallel"""
        research_tasks = [
            self.web_search(state["current_task"]),
            self.database_query(state["current_task"]),
            self.expert_consultation(state["current_task"])
        ]
        
        results = await asyncio.gather(*research_tasks, return_exceptions=True)
        
        state["parallel_research_results"] = {
            "web_search": results[0] if not isinstance(results[0], Exception) else None,
            "database_query": results[1] if not isinstance(results[1], Exception) else None,
            "expert_consultation": results[2] if not isinstance(results[2], Exception) else None
        }
        
        return state
    
    async def parallel_analysis_node(self, state: AgentState) -> AgentState:
        """Execute multiple analysis tasks in parallel"""
        analysis_tasks = [
            self.sentiment_analysis(state["current_task"]),
            self.trend_analysis(state["current_task"]),
            self.competitive_analysis(state["current_task"])
        ]
        
        results = await asyncio.gather(*analysis_tasks, return_exceptions=True)
        
        state["parallel_analysis_results"] = {
            "sentiment": results[0] if not isinstance(results[0], Exception) else None,
            "trends": results[1] if not isinstance(results[1], Exception) else None,
            "competitive": results[2] if not isinstance(results[2], Exception) else None
        }
        
        return state

# Fan-out/Fan-in pattern
def fan_out_node(state: AgentState) -> AgentState:
    """Split work into parallel streams"""
    task = state["current_task"]
    
    # Create subtasks
    subtasks = split_task_into_subtasks(task)
    state["subtasks"] = subtasks
    state["parallel_streams"] = len(subtasks)
    
    return state

def fan_in_node(state: AgentState) -> AgentState:
    """Combine results from parallel streams"""
    subtask_results = state.get("subtask_results", {})
    
    if len(subtask_results) < state["parallel_streams"]:
        # Not all streams completed yet
        return state
    
    # Combine all results
    combined_result = combine_subtask_results(subtask_results)
    state["final_output"] = combined_result
    
    return state

Load Balancing and Resource Management

python
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

class ResourceManagedGraph:
    """Graph with resource management and load balancing"""
    
    def __init__(self, max_workers=4, max_concurrent_llm_calls=2):
        self.max_workers = max_workers
        self.max_concurrent_llm_calls = max_concurrent_llm_calls
        self.llm_semaphore = threading.Semaphore(max_concurrent_llm_calls)
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.task_queue = Queue()
    
    def resource_aware_node(self, state: AgentState) -> AgentState:
        """Node that manages resource usage"""
        with self.llm_semaphore:
            # LLM call with resource management
            result = self.make_llm_call(state["current_task"])
            
        state["resource_managed_result"] = result
        return state
    
    def load_balanced_processing(self, state: AgentState) -> AgentState:
        """Distribute work across available resources"""
        tasks = state.get("pending_tasks", [])
        
        if not tasks:
            return state
        
        # Submit tasks to thread pool
        futures = []
        for task in tasks:
            future = self.executor.submit(self.process_single_task, task)
            futures.append(future)
        
        # Collect results
        results = []
        for future in futures:
            try:
                result = future.result(timeout=30)
                results.append(result)
            except Exception as e:
                results.append({"error": str(e)})
        
        state["load_balanced_results"] = results
        return state
    
    def adaptive_scaling_node(self, state: AgentState) -> AgentState:
        """Adapt resource usage based on workload"""
        current_load = self.assess_current_load()
        
        if current_load > 0.8:
            # High load - reduce concurrency
            self.llm_semaphore = threading.Semaphore(1)
        elif current_load < 0.3:
            # Low load - increase concurrency
            self.llm_semaphore = threading.Semaphore(self.max_concurrent_llm_calls * 2)
        
        return state

Memory and Persistence

Checkpointing and State Persistence

python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
import sqlite3

# SQLite checkpointing
def setup_sqlite_checkpointing():
    conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
    memory = SqliteSaver(conn)
    return memory

# PostgreSQL checkpointing for production
def setup_postgres_checkpointing():
    connection_string = "postgresql://user:password@localhost/langgraph_db"
    memory = PostgresSaver.from_conn_string(connection_string)
    return memory

# Custom checkpointing implementation
class CustomCheckpointer:
    """Custom checkpointing with additional features"""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.checkpoint_history = {}
    
    def save_checkpoint(self, thread_id: str, state: AgentState, step: int):
        checkpoint_data = {
            "thread_id": thread_id,
            "state": state,
            "step": step,
            "timestamp": datetime.now().isoformat(),
            "metadata": self.extract_metadata(state)
        }
        
        self.storage.save(f"{thread_id}_{step}", checkpoint_data)
        
        # Keep checkpoint history
        if thread_id not in self.checkpoint_history:
            self.checkpoint_history[thread_id] = []
        self.checkpoint_history[thread_id].append(step)
    
    def load_checkpoint(self, thread_id: str, step: int = None):
        if step is None:
            # Load latest checkpoint
            steps = self.checkpoint_history.get(thread_id, [])
            if not steps:
                return None
            step = max(steps)
        
        return self.storage.load(f"{thread_id}_{step}")
    
    def rollback_to_checkpoint(self, thread_id: str, step: int):
        checkpoint = self.load_checkpoint(thread_id, step)
        if checkpoint:
            return checkpoint["state"]
        return None

# Usage with custom checkpointing
custom_memory = CustomCheckpointer(storage_backend)
app = workflow.compile(checkpointer=custom_memory)

Thread Management and Session Handling

python
from typing import Dict, Any
import uuid

class ThreadManager:
    """Manage multiple conversation threads"""
    
    def __init__(self, checkpointer):
        self.checkpointer = checkpointer
        self.active_threads = {}
        self.thread_metadata = {}
    
    def create_thread(self, user_id: str, initial_state: AgentState) -> str:
        thread_id = str(uuid.uuid4())
        
        self.active_threads[thread_id] = {
            "user_id": user_id,
            "created_at": datetime.now(),
            "last_activity": datetime.now(),
            "state": initial_state
        }
        
        self.thread_metadata[thread_id] = {
            "user_id": user_id,
            "session_count": 0,
            "total_messages": 0
        }
        
        return thread_id
    
    def get_thread_state(self, thread_id: str) -> AgentState:
        if thread_id in self.active_threads:
            return self.active_threads[thread_id]["state"]
        
        # Try to load from checkpoint
        checkpoint = self.checkpointer.load_checkpoint(thread_id)
        if checkpoint:
            return checkpoint["state"]
        
        return None
    
    def update_thread_state(self, thread_id: str, new_state: AgentState):
        if thread_id in self.active_threads:
            self.active_threads[thread_id]["state"] = new_state
            self.active_threads[thread_id]["last_activity"] = datetime.now()
        
        # Save checkpoint
        self.checkpointer.save_checkpoint(thread_id, new_state, 
                                         self.get_current_step(thread_id))
    
    def cleanup_inactive_threads(self, max_age_hours: int = 24):
        cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
        
        inactive_threads = [
            thread_id for thread_id, data in self.active_threads.items()
            if data["last_activity"] < cutoff_time
        ]
        
        for thread_id in inactive_threads:
            del self.active_threads[thread_id]

# Session-aware graph execution
def execute_with_session(app, thread_id: str, input_data: Dict[str, Any]):
    config = {
        "configurable": {
            "thread_id": thread_id,
            "checkpoint_ns": f"session_{thread_id}"
        }
    }
    
    result = app.invoke(input_data, config=config)
    return result

Long-term Memory Integration

python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

class LongTermMemoryGraph:
    """Graph with long-term memory capabilities"""
    
    def __init__(self, llm):
        self.llm = llm
        self.conversation_memory = ConversationSummaryBufferMemory(
            llm=llm,
            max_token_limit=2000,
            return_messages=True
        )
        self.episodic_memory = FAISS.from_texts(
            ["Initial memory"], 
            OpenAIEmbeddings()
        )
        self.semantic_memory = {}
    
    def memory_enhanced_node(self, state: AgentState) -> AgentState:
        """Node that leverages long-term memory"""
        current_task = state["current_task"]
        
        # Retrieve relevant memories
        relevant_episodes = self.episodic_memory.similarity_search(
            current_task, k=3
        )
        
        conversation_context = self.conversation_memory.load_memory_variables({})
        
        # Enhance prompt with memory context
        enhanced_prompt = f"""
        Current Task: {current_task}
        
        Relevant Past Experiences:
        {self.format_episodes(relevant_episodes)}
        
        Conversation History:
        {conversation_context.get('history', '')}
        
        Semantic Knowledge:
        {self.get_relevant_semantic_knowledge(current_task)}
        
        Based on this context, proceed with the task.
        """
        
        response = self.llm.invoke([HumanMessage(content=enhanced_prompt)])
        
        # Update memories
        self.update_episodic_memory(current_task, response.content)
        self.conversation_memory.save_context(
            {"input": current_task},
            {"output": response.content}
        )
        
        state["memory_enhanced_result"] = response.content
        return state
    
    def update_episodic_memory(self, task: str, result: str):
        """Update episodic memory with new experience"""
        episode = f"Task: {task}\nResult: {result}\nTimestamp: {datetime.now()}"
        
        # Add to vector store
        self.episodic_memory.add_texts([episode])
    
    def update_semantic_memory(self, concept: str, knowledge: str):
        """Update semantic memory with new knowledge"""
        self.semantic_memory[concept] = knowledge
    
    def get_relevant_semantic_knowledge(self, query: str) -> str:
        """Retrieve relevant semantic knowledge"""
        relevant_concepts = []
        for concept, knowledge in self.semantic_memory.items():
            if any(word in query.lower() for word in concept.lower().split()):
                relevant_concepts.append(f"{concept}: {knowledge}")
        
        return "\n".join(relevant_concepts)

Streaming and Real-time Processing

Stream Processing

python
from langgraph.graph import StateGraph
import asyncio
from typing import AsyncIterator

class StreamingGraph:
    """Graph with streaming capabilities"""
    
    def __init__(self):
        self.workflow = StateGraph(AgentState)
        self.setup_streaming_nodes()
    
    def setup_streaming_nodes(self):
        self.workflow.add_node("streaming_research", self.streaming_research_node)
        self.workflow.add_node("streaming_analysis", self.streaming_analysis_node)
        self.workflow.set_entry_point("streaming_research")
    
    async def streaming_research_node(self, state: AgentState) -> AsyncIterator[AgentState]:
        """Node that yields intermediate results"""
        current_task = state["current_task"]
        
        # Yield initial status
        state["status"] = "Starting research..."
        yield state
        
        # Perform research in chunks
        research_steps = [
            "Gathering initial sources...",
            "Analyzing primary data...",
            "Cross-referencing information...",
            "Finalizing research findings..."
        ]
        
        for i, step in enumerate(research_steps):
            state["status"] = step
            state["progress"] = (i + 1) / len(research_steps)
            
            # Simulate research work
            await asyncio.sleep(1)
            
            # Yield intermediate state
            yield state
        
        # Final research results
        state["research_data"] = {
            "findings": f"Research completed for: {current_task}",
            "confidence": 0.85,
            "sources": ["source1", "source2", "source3"]
        }
        state["status"] = "Research completed"
        yield state
    
    async def stream_execution(self, initial_state: AgentState) -> AsyncIterator[AgentState]:
        """Execute graph with streaming output"""
        async for state_update in self.workflow.astream(initial_state):
            yield state_update

# Usage
async def main():
    streaming_graph = StreamingGraph()
    initial_state = AgentState(current_task="Research AI trends", messages=[])
    
    async for state in streaming_graph.stream_execution(initial_state):
        print(f"Status: {state.get('status', 'Processing...')}")
        print(f"Progress: {state.get('progress', 0) * 100:.1f}%")
        print("---")

# Real-time event processing
class EventDrivenGraph:
    """Graph that responds to real-time events"""
    
    def __init__(self):
        self.event_queue = asyncio.Queue()
        self.active_streams = {}
    
    async def event_listener(self):
        """Listen for incoming events"""
        while True:
            event = await self.event_queue.get()
            await self.process_event(event)
    
    async def process_event(self, event: Dict[str, Any]):
        """Process incoming event"""
        event_type = event.get("type")
        
        if event_type == "new_task":
            await self.handle_new_task(event)
        elif event_type == "update_request":
            await self.handle_update_request(event)
        elif event_type == "cancellation":
            await self.handle_cancellation(event)
    
    async def handle_new_task(self, event: Dict[str, Any]):
        """Handle new task event"""
        task_id = event["task_id"]
        task_data = event["data"]
        
        # Create new streaming execution
        initial_state = AgentState(
            current_task=task_data["description"],
            task_id=task_id
        )
        
        stream = self.workflow.astream(initial_state)
        self.active_streams[task_id] = stream
        
        # Process stream
        async for state_update in stream:
            await self.emit_update(task_id, state_update)
    
    async def emit_update(self, task_id: str, state: AgentState):
        """Emit state update to subscribers"""
        update_event = {
            "task_id": task_id,
            "state": state,
            "timestamp": datetime.now().isoformat()
        }
        
        # Send to subscribers (WebSocket, SSE, etc.)
        await self.notify_subscribers(update_event)

WebSocket Integration

python
import websockets
import json
from typing import Set

class WebSocketGraphServer:
    """WebSocket server for real-time graph execution"""
    
    def __init__(self, graph_app):
        self.graph_app = graph_app
        self.connected_clients: Set[websockets.WebSocketServerProtocol] = set()
        self.client_sessions = {}
    
    async def register_client(self, websocket: websockets.WebSocketServerProtocol):
        """Register new WebSocket client"""
        self.connected_clients.add(websocket)
        session_id = str(uuid.uuid4())
        self.client_sessions[websocket] = session_id
        
        await websocket.send(json.dumps({
            "type": "connection_established",
            "session_id": session_id
        }))
    
    async def unregister_client(self, websocket: websockets.WebSocketServerProtocol):
        """Unregister WebSocket client"""
        self.connected_clients.discard(websocket)
        if websocket in self.client_sessions:
            del self.client_sessions[websocket]
    
    async def handle_client_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
        """Handle message from WebSocket client"""
        try:
            data = json.loads(message)
            message_type = data.get("type")
            
            if message_type == "execute_graph":
                await self.execute_graph_for_client(websocket, data)
            elif message_type == "get_status":
                await self.send_status_to_client(websocket, data)
            
        except json.JSONDecodeError:
            await websocket.send(json.dumps({
                "type": "error",
                "message": "Invalid JSON format"
            }))
    
    async def execute_graph_for_client(self, websocket: websockets.WebSocketServerProtocol, data: Dict[str, Any]):
        """Execute graph and stream results to client"""
        session_id = self.client_sessions[websocket]
        initial_state = AgentState(**data.get("initial_state", {}))
        
        config = {
            "configurable": {
                "thread_id": session_id
            }
        }
        
        try:
            async for state_update in self.graph_app.astream(initial_state, config=config):
                await websocket.send(json.dumps({
                    "type": "state_update",
                    "session_id": session_id,
                    "state": self.serialize_state(state_update),
                    "timestamp": datetime.now().isoformat()
                }))
        
        except Exception as e:
            await websocket.send(json.dumps({
                "type": "execution_error",
                "error": str(e)
            }))
    
    async def broadcast_to_all_clients(self, message: Dict[str, Any]):
        """Broadcast message to all connected clients"""
        if self.connected_clients:
            await asyncio.gather(
                *[client.send(json.dumps(message)) for client in self.connected_clients],
                return_exceptions=True
            )
    
    def serialize_state(self, state: AgentState) -> Dict[str, Any]:
        """Serialize state for JSON transmission"""
        # Convert state to JSON-serializable format
        return {
            key: value for key, value in state.items()
            if isinstance(value, (str, int, float, bool, list, dict, type(None)))
        }

# Start WebSocket server
async def start_websocket_server(graph_app, host="localhost", port=8765):
    server = WebSocketGraphServer(graph_app)
    
    async def handle_client(websocket, path):
        await server.register_client(websocket)
        try:
            async for message in websocket:
                await server.handle_client_message(websocket, message)
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            await server.unregister_client(websocket)
    
    start_server = websockets.serve(handle_client, host, port)
    await start_server

Production Deployment

LangGraph Platform Deployment

bash
# Install LangGraph CLI
pip install langgraph-cli

# Initialize LangGraph project
langgraph init my-graph-project

# Configure deployment
cat > langgraph.json << EOF
{
  "dependencies": [
    "langgraph",
    "langchain-openai",
    "langchain-community"
  ],
  "graphs": {
    "my_graph": "./src/graph.py:workflow"
  },
  "env": {
    "OPENAI_API_KEY": "your-api-key"
  }
}
EOF

# Deploy to LangGraph Platform
langgraph deploy --name my-production-graph

Docker Deployment

dockerfile
# Dockerfile for LangGraph application
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application code
COPY src/ ./src/
COPY config/ ./config/

# Set environment variables
ENV PYTHONPATH=/app
ENV LANGCHAIN_TRACING_V2=true

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Start application
CMD ["python", "-m", "uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
# docker-compose.yml
version: '3.8'

services:
  langgraph-app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://user:password@postgres:5432/langgraph_db
    depends_on:
      - postgres
      - redis
    volumes:
      - ./checkpoints:/app/checkpoints
    restart: unless-stopped

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: langgraph_db
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

Kubernetes Deployment

yaml
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-app
  labels:
    app: langgraph-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph-app
  template:
    metadata:
      labels:
        app: langgraph-app
    spec:
      containers:
      - name: langgraph-app
        image: your-registry/langgraph-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-api-key
        - name: DATABASE_URL
          valueFrom:
            configMapKeyRef:
              name: app-config
              key: database-url
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: langgraph-service
spec:
  selector:
    app: langgraph-app
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer

Monitoring and Observability

Comprehensive Monitoring Setup

python
from langsmith import Client
import logging
import time
from typing import Dict, Any
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge

# Prometheus metrics
GRAPH_EXECUTIONS = Counter('langgraph_executions_total', 'Total graph executions', ['graph_name', 'status'])
EXECUTION_DURATION = Histogram('langgraph_execution_duration_seconds', 'Graph execution duration')
ACTIVE_THREADS = Gauge('langgraph_active_threads', 'Number of active threads')
NODE_EXECUTIONS = Counter('langgraph_node_executions_total', 'Total node executions', ['node_name', 'status'])

class MonitoringWrapper:
    """Comprehensive monitoring wrapper for LangGraph"""
    
    def __init__(self, graph_app, langsmith_client: Client = None):
        self.graph_app = graph_app
        self.langsmith_client = langsmith_client
        self.logger = self.setup_logging()
        self.active_executions = {}
    
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('langgraph.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)
    
    async def execute_with_monitoring(self, initial_state: AgentState, config: Dict[str, Any] = None):
        """Execute graph with comprehensive monitoring"""
        execution_id = str(uuid.uuid4())
        thread_id = config.get('configurable', {}).get('thread_id', 'default')
        
        # Start monitoring
        start_time = time.time()
        self.active_executions[execution_id] = {
            'thread_id': thread_id,
            'start_time': start_time,
            'status': 'running'
        }
        ACTIVE_THREADS.inc()
        
        try:
            self.logger.info(f"Starting graph execution {execution_id} for thread {thread_id}")
            
            # Execute graph with streaming
            final_state = None
            async for state_update in self.graph_app.astream(initial_state, config=config):
                # Log state updates
                self.log_state_update(execution_id, state_update)
                
                # Send to LangSmith if configured
                if self.langsmith_client:
                    await self.send_to_langsmith(execution_id, state_update)
                
                final_state = state_update
            
            # Success metrics
            execution_time = time.time() - start_time
            GRAPH_EXECUTIONS.labels(graph_name='main', status='success').inc()
            EXECUTION_DURATION.observe(execution_time)
            
            self.logger.info(f"Graph execution {execution_id} completed successfully in {execution_time:.2f}s")
            
            return final_state
            
        except Exception as e:
            # Error metrics
            execution_time = time.time() - start_time
            GRAPH_EXECUTIONS.labels(graph_name='main', status='error').inc()
            EXECUTION_DURATION.observe(execution_time)
            
            self.logger.error(f"Graph execution {execution_id} failed: {str(e)}")
            raise
            
        finally:
            # Cleanup
            ACTIVE_THREADS.dec()
            if execution_id in self.active_executions:
                del self.active_executions[execution_id]
    
    def log_state_update(self, execution_id: str, state: AgentState):
        """Log state updates with structured logging"""
        log_data = {
            'execution_id': execution_id,
            'timestamp': datetime.now().isoformat(),
            'completed_tasks': state.get('completed_tasks', []),
            'current_task': state.get('current_task', ''),
            'iteration_count': state.get('iteration_count', 0)
        }
        
        self.logger.info(f"State update: {json.dumps(log_data)}")
    
    async def send_to_langsmith(self, execution_id: str, state: AgentState):
        """Send execution data to LangSmith"""
        if not self.langsmith_client:
            return
        
        try:
            await self.langsmith_client.create_run(
                name=f"langgraph_execution_{execution_id}",
                run_type="chain",
                inputs={"state": state},
                extra={"execution_id": execution_id}
            )
        except Exception as e:
            self.logger.warning(f"Failed to send data to LangSmith: {str(e)}")

# Health check endpoints
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return JSONResponse({
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "active_threads": len(monitoring_wrapper.active_executions)
    })

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return prometheus_client.generate_latest()

@app.get("/status")
async def status():
    """Detailed status information"""
    return JSONResponse({
        "active_executions": len(monitoring_wrapper.active_executions),
        "execution_details": monitoring_wrapper.active_executions,
        "system_info": {
            "memory_usage": get_memory_usage(),
            "cpu_usage": get_cpu_usage()
        }
    })

Best Practices and Patterns

Graph Design Principles

  • Single Responsibility: Each node should have a clear, focused purpose
  • State Management: Design state schema carefully to avoid bloat
  • Error Handling: Implement comprehensive error handling and recovery
  • Resource Management: Monitor and manage computational resources
  • Testing: Create unit tests for individual nodes and integration tests for workflows

Performance Optimization

  • Parallel Processing: Leverage parallel execution where appropriate
  • Caching: Implement caching for expensive operations
  • Resource Pooling: Use connection pooling for external services
  • Batch Processing: Group similar operations for efficiency
  • Memory Management: Monitor and optimize memory usage

Security Considerations

  • Input Validation: Validate all inputs to prevent injection attacks
  • Access Control: Implement proper authentication and authorization
  • Data Privacy: Handle sensitive data according to privacy regulations
  • API Security: Secure API endpoints and manage rate limiting
  • Audit Logging: Maintain comprehensive audit trails

This comprehensive LangGraph cheat sheet provides everything needed to build sophisticated, stateful AI agent workflows. From basic graph construction to advanced production deployment patterns, use these examples and best practices to create powerful, scalable AI applications with LangGraph's graph-based approach.