Saltar a contenido

Langgraph

Hoja de Cheat Marco LangGraph

Overview

LangGraph representa un cambio de paradigma en la construcción de flujos de trabajo de agentes de IA, ofreciendo un marco de orquestación estatal que aporta control y flexibilidad sin precedentes a aplicaciones basadas en agentes. Desarrollado por el equipo de LangChain, LangGraph aborda las limitaciones de los marcos tradicionales de agentes lineales introduciendo un enfoque basado en gráficos donde los flujos de trabajo complejos se estructuran como nodos y bordes interconectados, permitiendo sofisticados sistemas multiagent que pueden manejar la lógica de ramificación, la ejecución condicional y las interacciones estatales.

Lo que diferencia LangGraph es su capacidad para modelar flujos de trabajo complejos del mundo real que requieren puntos de decisión, procesamiento paralelo y enrutamiento dinámico basado en resultados intermedios. A diferencia de simples enfoques basados en cadena, LangGraph permite a los desarrolladores crear flujos de trabajo donde los agentes pueden colaborar, competir o operar de forma independiente manteniendo el estado y contexto compartidos. Esto hace que sea particularmente poderoso para aplicaciones que requieren un razonamiento sofisticado, solución de problemas de varios pasos, y comportamiento adaptativo basado en condiciones cambiantes.

El marco combina la flexibilidad de la computación basada en gráficos con la fiabilidad necesaria para los sistemas de producción, ofreciendo características como persistencia, streaming, soporte de depuración y despliegue sin obstáculos a través de la plataforma LangGraph. Esta posición LangGraph como la solución de ir a los desarrolladores que construyen aplicaciones de IA de próxima generación que requieren más que patrones simples de respuesta a solicitudes.

Instalación y configuración

Basic installation

# Install LangGraph
pip install langgraph

# Install with additional dependencies
pip install "langgraph[all]"

# Install development version
pip install git+https://github.com/langchain-ai/langgraph.git

# Install LangGraph Platform CLI (for deployment)
pip install langgraph-cli

Environment Configuration

import os
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# Set up API keys
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-api-key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# Initialize LLMs
openai_llm = ChatOpenAI(model="gpt-4", temperature=0)
anthropic_llm = ChatAnthropic(model="claude-3-sonnet-20240229")

Project Structure

langgraph_project/
├── graphs/
│   ├── __init__.py
│   ├── research_graph.py
│   └── analysis_graph.py
├── nodes/
│   ├── __init__.py
│   ├── agent_nodes.py
│   └── tool_nodes.py
├── state/
│   ├── __init__.py
│   └── state_schemas.py
├── tools/
│   ├── __init__.py
│   └── custom_tools.py
├── checkpoints/
│   └── memory.db
├── config/
│   ├── __init__.py
│   └── settings.py
└── main.py

Core Concepts

State Management

from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph

# Define state schema
class AgentState(TypedDict):
    messages: List[str]
    current_task: str
    completed_tasks: List[str]
    research_data: Optional[dict]
    analysis_results: Optional[dict]
    final_output: Optional[str]
    iteration_count: int
    error_messages: List[str]

# Initialize state
initial_state = AgentState(
    messages=[],
    current_task="",
    completed_tasks=[],
    research_data=None,
    analysis_results=None,
    final_output=None,
    iteration_count=0,
    error_messages=[]
)

Graph Construction

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver

# Create graph with state management
workflow = StateGraph(AgentState)

# Add nodes (functions that process state)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("synthesize", synthesize_node)
workflow.add_node("review", review_node)

# Define edges (transitions between nodes)
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "synthesize")
workflow.add_edge("synthesize", "review")

# Conditional edges based on state
def should_continue(state: AgentState) -> str:
    if state["iteration_count"] > 5:
        return END
    elif state["error_messages"]:
        return "research"  # Retry research if errors
    else:
        return "review"

workflow.add_conditional_edges(
    "review",
    should_continue,
    \\\\{
        END: END,
        "research": "research",
        "review": "review"
    \\\\}
)

# Set entry point
workflow.set_entry_point("research")

# Compile graph with checkpointing
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)

Node Implementation

Funciones básicas del nodo

from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")

def research_node(state: AgentState) -> AgentState:
    """Research node that gathers information"""
    current_task = state["current_task"]

    # Perform research using LLM
    research_prompt = f"""
    Research the following topic comprehensively:
    \\\\{current_task\\\\}

    Provide detailed findings including:
    1. Key facts and statistics
    2. Current trends and developments
    3. Expert opinions and analysis
    4. Relevant sources and references
    """

    response = llm.invoke([HumanMessage(content=research_prompt)])

    # Update state
    state["research_data"] = \\\\{
        "findings": response.content,
        "timestamp": datetime.now().isoformat(),
        "sources": extract_sources(response.content)
    \\\\}
    state["completed_tasks"].append("research")
    state["messages"].append(f"Research completed for: \\\\{current_task\\\\}")

    return state

def analyze_node(state: AgentState) -> AgentState:
    """Analysis node that processes research data"""
    research_data = state["research_data"]

    if not research_data:
        state["error_messages"].append("No research data available for analysis")
        return state

    analysis_prompt = f"""
    Analyze the following research findings:
    \\\\{research_data['findings']\\\\}

    Provide:
    1. Key insights and patterns
    2. Strengths and weaknesses
    3. Opportunities and threats
    4. Strategic recommendations
    """

    response = llm.invoke([HumanMessage(content=analysis_prompt)])

    state["analysis_results"] = \\\\{
        "insights": response.content,
        "timestamp": datetime.now().isoformat(),
        "confidence_score": calculate_confidence(response.content)
    \\\\}
    state["completed_tasks"].append("analysis")
    state["messages"].append("Analysis completed")

    return state

def synthesize_node(state: AgentState) -> AgentState:
    """Synthesis node that combines research and analysis"""
    research_data = state["research_data"]
    analysis_results = state["analysis_results"]

    synthesis_prompt = f"""
    Synthesize the following research and analysis into a comprehensive report:

    Research Findings:
    \\\\{research_data['findings']\\\\}

    Analysis Results:
    \\\\{analysis_results['insights']\\\\}

    Create a cohesive narrative that:
    1. Integrates all key findings
    2. Provides actionable recommendations
    3. Addresses potential concerns
    4. Outlines next steps
    """

    response = llm.invoke([HumanMessage(content=synthesis_prompt)])

    state["final_output"] = response.content
    state["completed_tasks"].append("synthesis")
    state["messages"].append("Synthesis completed")

    return state

Patrones avanzados de nodo

from typing import Dict, Any
import asyncio

class AsyncResearchNode:
    """Async node for parallel research operations"""

    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools

    async def __call__(self, state: AgentState) -> AgentState:
        current_task = state["current_task"]

        # Parallel research tasks
        research_tasks = [
            self.web_research(current_task),
            self.academic_research(current_task),
            self.news_research(current_task)
        ]

        results = await asyncio.gather(*research_tasks, return_exceptions=True)

        # Combine results
        combined_research = \\\\{
            "web_findings": results[0] if not isinstance(results[0], Exception) else None,
            "academic_findings": results[1] if not isinstance(results[1], Exception) else None,
            "news_findings": results[2] if not isinstance(results[2], Exception) else None,
            "timestamp": datetime.now().isoformat()
        \\\\}

        state["research_data"] = combined_research
        state["completed_tasks"].append("parallel_research")

        return state

    async def web_research(self, topic: str) -> Dict[str, Any]:
        # Implement web research logic
        pass

    async def academic_research(self, topic: str) -> Dict[str, Any]:
        # Implement academic research logic
        pass

    async def news_research(self, topic: str) -> Dict[str, Any]:
        # Implement news research logic
        pass

# Multi-agent collaboration node
class CollaborativeNode:
    """Node that coordinates multiple agents"""

    def __init__(self, agents: Dict[str, Any]):
        self.agents = agents

    def __call__(self, state: AgentState) -> AgentState:
        current_task = state["current_task"]

        # Assign tasks to different agents
        agent_results = \\\\{\\\\}

        for agent_name, agent in self.agents.items():
            agent_task = self.create_agent_task(current_task, agent_name)
            result = agent.invoke(agent_task)
            agent_results[agent_name] = result

        # Synthesize agent results
        synthesis_prompt = f"""
        Multiple agents have worked on the task: \\\\{current_task\\\\}

        Agent Results:
        \\\\{self.format_agent_results(agent_results)\\\\}

        Synthesize these perspectives into a cohesive solution.
        """

        final_result = self.synthesizer_llm.invoke(synthesis_prompt)

        state["collaborative_results"] = \\\\{
            "individual_results": agent_results,
            "synthesized_result": final_result.content,
            "participating_agents": list(self.agents.keys())
        \\\\}

        return state

Nodos de integración de herramientas

from langgraph.prebuilt import ToolExecutor
from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun

# Create tool executor
tools = [DuckDuckGoSearchRun(), WikipediaQueryRun()]
tool_executor = ToolExecutor(tools)

def tool_calling_node(state: AgentState) -> AgentState:
    """Node that uses tools to gather information"""
    current_task = state["current_task"]

    # Determine which tools to use
    tool_selection_prompt = f"""
    For the task: \\\\{current_task\\\\}

    Which tools should be used and what queries should be made?
    Available tools: \\\\{[tool.name for tool in tools]\\\\}

    Respond with a JSON list of tool calls:
    [\\\\{"tool": "tool_name", "query": "search_query"\\\\}, ...]
    """

    tool_plan = llm.invoke([HumanMessage(content=tool_selection_prompt)])
    tool_calls = parse_tool_calls(tool_plan.content)

    # Execute tools
    tool_results = []
    for tool_call in tool_calls:
        try:
            result = tool_executor.invoke(\\\\{
                "tool": tool_call["tool"],
                "tool_input": tool_call["query"]
            \\\\})
            tool_results.append(\\\\{
                "tool": tool_call["tool"],
                "query": tool_call["query"],
                "result": result,
                "success": True
            \\\\})
        except Exception as e:
            tool_results.append(\\\\{
                "tool": tool_call["tool"],
                "query": tool_call["query"],
                "error": str(e),
                "success": False
            \\\\})

    state["tool_results"] = tool_results
    state["completed_tasks"].append("tool_execution")

    return state

def parse_tool_calls(content: str) -> List[Dict[str, str]]:
    """Parse tool calls from LLM response"""
    import json
    try:
        return json.loads(content)
    except:
        # Fallback parsing logic
        return [\\\\{"tool": "search", "query": content\\\\}]

condicional Logic and Routing

Dynamic Routing

def route_based_on_content(state: AgentState) -> str:
    """Route based on content analysis"""
    current_task = state["current_task"].lower()

    if "research" in current_task or "analyze" in current_task:
        return "research_path"
    elif "create" in current_task or "generate" in current_task:
        return "creation_path"
    elif "review" in current_task or "evaluate" in current_task:
        return "review_path"
    else:
        return "general_path"

def quality_gate(state: AgentState) -> str:
    """Quality gate for output validation"""
    if not state["final_output"]:
        return "retry"

    # Quality assessment
    quality_score = assess_output_quality(state["final_output"])

    if quality_score > 0.8:
        return "approve"
    elif quality_score > 0.6:
        return "revise"
    else:
        return "retry"

# Add conditional routing to graph
workflow.add_conditional_edges(
    "initial_analysis",
    route_based_on_content,
    \\\\{
        "research_path": "research_node",
        "creation_path": "creation_node",
        "review_path": "review_node",
        "general_path": "general_node"
    \\\\}
)

workflow.add_conditional_edges(
    "final_review",
    quality_gate,
    \\\\{
        "approve": END,
        "revise": "revision_node",
        "retry": "research_node"
    \\\\}
)

Arboles de decisión complejos

def multi_criteria_router(state: AgentState) -> str:
    """Multi-criteria decision routing"""
    criteria = \\\\{
        "complexity": assess_task_complexity(state["current_task"]),
        "urgency": assess_task_urgency(state),
        "resources": assess_available_resources(state),
        "expertise_needed": assess_expertise_requirements(state["current_task"])
    \\\\}

    # Decision matrix
    if criteria["complexity"] > 0.8 and criteria["expertise_needed"] > 0.7:
        return "expert_collaboration"
    elif criteria["urgency"] > 0.9:
        return "fast_track"
    elif criteria["resources"] < 0.3:
        return "resource_optimization"
    else:
        return "standard_processing"

def adaptive_retry_logic(state: AgentState) -> str:
    """Adaptive retry with different strategies"""
    error_count = len(state["error_messages"])
    last_error = state["error_messages"][-1] if state["error_messages"] else ""

    if error_count == 0:
        return "proceed"
    elif error_count == 1:
        if "timeout" in last_error.lower():
            return "retry_with_timeout"
        else:
            return "retry_with_different_approach"
    elif error_count == 2:
        return "escalate_to_human"
    else:
        return "abort"

# Complex routing configuration
workflow.add_conditional_edges(
    "task_assessment",
    multi_criteria_router,
    \\\\{
        "expert_collaboration": "expert_node",
        "fast_track": "fast_track_node",
        "resource_optimization": "optimized_node",
        "standard_processing": "standard_node"
    \\\\}
)

Parallel Processing and Concurrency

Parallel Node Execution

from langgraph.graph import StateGraph
import asyncio

class ParallelProcessingGraph:
    """Graph with parallel processing capabilities"""

    def __init__(self):
        self.workflow = StateGraph(AgentState)
        self.setup_parallel_nodes()

    def setup_parallel_nodes(self):
        # Add parallel processing nodes
        self.workflow.add_node("parallel_research", self.parallel_research_node)
        self.workflow.add_node("parallel_analysis", self.parallel_analysis_node)
        self.workflow.add_node("synthesis", self.synthesis_node)

        # Parallel execution pattern
        self.workflow.add_edge("parallel_research", "synthesis")
        self.workflow.add_edge("parallel_analysis", "synthesis")

        self.workflow.set_entry_point("parallel_research")
        self.workflow.set_entry_point("parallel_analysis")

    async def parallel_research_node(self, state: AgentState) -> AgentState:
        """Execute multiple research tasks in parallel"""
        research_tasks = [
            self.web_search(state["current_task"]),
            self.database_query(state["current_task"]),
            self.expert_consultation(state["current_task"])
        ]

        results = await asyncio.gather(*research_tasks, return_exceptions=True)

        state["parallel_research_results"] = \\\\{
            "web_search": results[0] if not isinstance(results[0], Exception) else None,
            "database_query": results[1] if not isinstance(results[1], Exception) else None,
            "expert_consultation": results[2] if not isinstance(results[2], Exception) else None
        \\\\}

        return state

    async def parallel_analysis_node(self, state: AgentState) -> AgentState:
        """Execute multiple analysis tasks in parallel"""
        analysis_tasks = [
            self.sentiment_analysis(state["current_task"]),
            self.trend_analysis(state["current_task"]),
            self.competitive_analysis(state["current_task"])
        ]

        results = await asyncio.gather(*analysis_tasks, return_exceptions=True)

        state["parallel_analysis_results"] = \\\\{
            "sentiment": results[0] if not isinstance(results[0], Exception) else None,
            "trends": results[1] if not isinstance(results[1], Exception) else None,
            "competitive": results[2] if not isinstance(results[2], Exception) else None
        \\\\}

        return state

# Fan-out/Fan-in pattern
def fan_out_node(state: AgentState) -> AgentState:
    """Split work into parallel streams"""
    task = state["current_task"]

    # Create subtasks
    subtasks = split_task_into_subtasks(task)
    state["subtasks"] = subtasks
    state["parallel_streams"] = len(subtasks)

    return state

def fan_in_node(state: AgentState) -> AgentState:
    """Combine results from parallel streams"""
    subtask_results = state.get("subtask_results", \\\\{\\\\})

    if len(subtask_results) < state["parallel_streams"]:
        # Not all streams completed yet
        return state

    # Combine all results
    combined_result = combine_subtask_results(subtask_results)
    state["final_output"] = combined_result

    return state

Carga Balancing and Resource Management

import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

class ResourceManagedGraph:
    """Graph with resource management and load balancing"""

    def __init__(self, max_workers=4, max_concurrent_llm_calls=2):
        self.max_workers = max_workers
        self.max_concurrent_llm_calls = max_concurrent_llm_calls
        self.llm_semaphore = threading.Semaphore(max_concurrent_llm_calls)
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.task_queue = Queue()

    def resource_aware_node(self, state: AgentState) -> AgentState:
        """Node that manages resource usage"""
        with self.llm_semaphore:
            # LLM call with resource management
            result = self.make_llm_call(state["current_task"])

        state["resource_managed_result"] = result
        return state

    def load_balanced_processing(self, state: AgentState) -> AgentState:
        """Distribute work across available resources"""
        tasks = state.get("pending_tasks", [])

        if not tasks:
            return state

        # Submit tasks to thread pool
        futures = []
        for task in tasks:
            future = self.executor.submit(self.process_single_task, task)
            futures.append(future)

        # Collect results
        results = []
        for future in futures:
            try:
                result = future.result(timeout=30)
                results.append(result)
            except Exception as e:
                results.append(\\\\{"error": str(e)\\\\})

        state["load_balanced_results"] = results
        return state

    def adaptive_scaling_node(self, state: AgentState) -> AgentState:
        """Adapt resource usage based on workload"""
        current_load = self.assess_current_load()

        if current_load > 0.8:
            # High load - reduce concurrency
            self.llm_semaphore = threading.Semaphore(1)
        elif current_load ``< 0.3:
            # Low load - increase concurrency
            self.llm_semaphore = threading.Semaphore(self.max_concurrent_llm_calls * 2)

        return state

Memory and Persistence

Checkpointing and State Persistence

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
import sqlite3

# SQLite checkpointing
def setup_sqlite_checkpointing():
    conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
    memory = SqliteSaver(conn)
    return memory

# PostgreSQL checkpointing for production
def setup_postgres_checkpointing():
    connection_string = "postgresql://user:password@localhost/langgraph_db"
    memory = PostgresSaver.from_conn_string(connection_string)
    return memory

# Custom checkpointing implementation
class CustomCheckpointer:
    """Custom checkpointing with additional features"""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.checkpoint_history = \\\{\\\}

    def save_checkpoint(self, thread_id: str, state: AgentState, step: int):
        checkpoint_data = \\\{
            "thread_id": thread_id,
            "state": state,
            "step": step,
            "timestamp": datetime.now().isoformat(),
            "metadata": self.extract_metadata(state)
        \\\}

        self.storage.save(f"\\\{thread_id\\\}_\\\{step\\\}", checkpoint_data)

        # Keep checkpoint history
        if thread_id not in self.checkpoint_history:
            self.checkpoint_history[thread_id] = []
        self.checkpoint_history[thread_id].append(step)

    def load_checkpoint(self, thread_id: str, step: int = None):
        if step is None:
            # Load latest checkpoint
            steps = self.checkpoint_history.get(thread_id, [])
            if not steps:
                return None
            step = max(steps)

        return self.storage.load(f"\\\{thread_id\\\}_\\\{step\\\}")

    def rollback_to_checkpoint(self, thread_id: str, step: int):
        checkpoint = self.load_checkpoint(thread_id, step)
        if checkpoint:
            return checkpoint["state"]
        return None

# Usage with custom checkpointing
custom_memory = CustomCheckpointer(storage_backend)
app = workflow.compile(checkpointer=custom_memory)

Thread Management and Session Handling

from typing import Dict, Any
import uuid

class ThreadManager:
    """Manage multiple conversation threads"""

    def __init__(self, checkpointer):
        self.checkpointer = checkpointer
        self.active_threads = \\\{\\\}
        self.thread_metadata = \\\{\\\}

    def create_thread(self, user_id: str, initial_state: AgentState) ->`` str:
        thread_id = str(uuid.uuid4())

        self.active_threads[thread_id] = \\\\{
            "user_id": user_id,
            "created_at": datetime.now(),
            "last_activity": datetime.now(),
            "state": initial_state
        \\\\}

        self.thread_metadata[thread_id] = \\\\{
            "user_id": user_id,
            "session_count": 0,
            "total_messages": 0
        \\\\}

        return thread_id

    def get_thread_state(self, thread_id: str) -> AgentState:
        if thread_id in self.active_threads:
            return self.active_threads[thread_id]["state"]

        # Try to load from checkpoint
        checkpoint = self.checkpointer.load_checkpoint(thread_id)
        if checkpoint:
            return checkpoint["state"]

        return None

    def update_thread_state(self, thread_id: str, new_state: AgentState):
        if thread_id in self.active_threads:
            self.active_threads[thread_id]["state"] = new_state
            self.active_threads[thread_id]["last_activity"] = datetime.now()

        # Save checkpoint
        self.checkpointer.save_checkpoint(thread_id, new_state,
                                         self.get_current_step(thread_id))

    def cleanup_inactive_threads(self, max_age_hours: int = 24):
        cutoff_time = datetime.now() - timedelta(hours=max_age_hours)

        inactive_threads = [
            thread_id for thread_id, data in self.active_threads.items()
            if data["last_activity"] < cutoff_time
        ]

        for thread_id in inactive_threads:
            del self.active_threads[thread_id]

# Session-aware graph execution
def execute_with_session(app, thread_id: str, input_data: Dict[str, Any]):
    config = \\\\{
        "configurable": \\\\{
            "thread_id": thread_id,
            "checkpoint_ns": f"session_\\\\{thread_id\\\\}"
        \\\\}
    \\\\}

    result = app.invoke(input_data, config=config)
    return result

Integración de memoria a largo plazo

from langchain.memory import ConversationSummaryBufferMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

class LongTermMemoryGraph:
    """Graph with long-term memory capabilities"""

    def __init__(self, llm):
        self.llm = llm
        self.conversation_memory = ConversationSummaryBufferMemory(
            llm=llm,
            max_token_limit=2000,
            return_messages=True
        )
        self.episodic_memory = FAISS.from_texts(
            ["Initial memory"],
            OpenAIEmbeddings()
        )
        self.semantic_memory = \\\\{\\\\}

    def memory_enhanced_node(self, state: AgentState) -> AgentState:
        """Node that leverages long-term memory"""
        current_task = state["current_task"]

        # Retrieve relevant memories
        relevant_episodes = self.episodic_memory.similarity_search(
            current_task, k=3
        )

        conversation_context = self.conversation_memory.load_memory_variables(\\\\{\\\\})

        # Enhance prompt with memory context
        enhanced_prompt = f"""
        Current Task: \\\\{current_task\\\\}

        Relevant Past Experiences:
        \\\\{self.format_episodes(relevant_episodes)\\\\}

        Conversation History:
        \\\\{conversation_context.get('history', '')\\\\}

        Semantic Knowledge:
        \\\\{self.get_relevant_semantic_knowledge(current_task)\\\\}

        Based on this context, proceed with the task.
        """

        response = self.llm.invoke([HumanMessage(content=enhanced_prompt)])

        # Update memories
        self.update_episodic_memory(current_task, response.content)
        self.conversation_memory.save_context(
            \\\\{"input": current_task\\\\},
            \\\\{"output": response.content\\\\}
        )

        state["memory_enhanced_result"] = response.content
        return state

    def update_episodic_memory(self, task: str, result: str):
        """Update episodic memory with new experience"""
        episode = f"Task: \\\\{task\\\\}\nResult: \\\\{result\\\\}\nTimestamp: \\\\{datetime.now()\\\\}"

        # Add to vector store
        self.episodic_memory.add_texts([episode])

    def update_semantic_memory(self, concept: str, knowledge: str):
        """Update semantic memory with new knowledge"""
        self.semantic_memory[concept] = knowledge

    def get_relevant_semantic_knowledge(self, query: str) -> str:
        """Retrieve relevant semantic knowledge"""
        relevant_concepts = []
        for concept, knowledge in self.semantic_memory.items():
            if any(word in query.lower() for word in concept.lower().split()):
                relevant_concepts.append(f"\\\\{concept\\\\}: \\\\{knowledge\\\\}")

        return "\n".join(relevant_concepts)

Streaming and Realtime Processing

Stream Processing

from langgraph.graph import StateGraph
import asyncio
from typing import AsyncIterator

class StreamingGraph:
    """Graph with streaming capabilities"""

    def __init__(self):
        self.workflow = StateGraph(AgentState)
        self.setup_streaming_nodes()

    def setup_streaming_nodes(self):
        self.workflow.add_node("streaming_research", self.streaming_research_node)
        self.workflow.add_node("streaming_analysis", self.streaming_analysis_node)
        self.workflow.set_entry_point("streaming_research")

    async def streaming_research_node(self, state: AgentState) -> AsyncIterator[AgentState]:
        """Node that yields intermediate results"""
        current_task = state["current_task"]

        # Yield initial status
        state["status"] = "Starting research..."
        yield state

        # Perform research in chunks
        research_steps = [
            "Gathering initial sources...",
            "Analyzing primary data...",
            "Cross-referencing information...",
            "Finalizing research findings..."
        ]

        for i, step in enumerate(research_steps):
            state["status"] = step
            state["progress"] = (i + 1) / len(research_steps)

            # Simulate research work
            await asyncio.sleep(1)

            # Yield intermediate state
            yield state

        # Final research results
        state["research_data"] = \\\\{
            "findings": f"Research completed for: \\\\{current_task\\\\}",
            "confidence": 0.85,
            "sources": ["source1", "source2", "source3"]
        \\\\}
        state["status"] = "Research completed"
        yield state

    async def stream_execution(self, initial_state: AgentState) -> AsyncIterator[AgentState]:
        """Execute graph with streaming output"""
        async for state_update in self.workflow.astream(initial_state):
            yield state_update

# Usage
async def main():
    streaming_graph = StreamingGraph()
    initial_state = AgentState(current_task="Research AI trends", messages=[])

    async for state in streaming_graph.stream_execution(initial_state):
        print(f"Status: \\\\{state.get('status', 'Processing...')\\\\}")
        print(f"Progress: \\\\{state.get('progress', 0) * 100:.1f\\\\}%")
        print("---")

# Real-time event processing
class EventDrivenGraph:
    """Graph that responds to real-time events"""

    def __init__(self):
        self.event_queue = asyncio.Queue()
        self.active_streams = \\\\{\\\\}

    async def event_listener(self):
        """Listen for incoming events"""
        while True:
            event = await self.event_queue.get()
            await self.process_event(event)

    async def process_event(self, event: Dict[str, Any]):
        """Process incoming event"""
        event_type = event.get("type")

        if event_type == "new_task":
            await self.handle_new_task(event)
        elif event_type == "update_request":
            await self.handle_update_request(event)
        elif event_type == "cancellation":
            await self.handle_cancellation(event)

    async def handle_new_task(self, event: Dict[str, Any]):
        """Handle new task event"""
        task_id = event["task_id"]
        task_data = event["data"]

        # Create new streaming execution
        initial_state = AgentState(
            current_task=task_data["description"],
            task_id=task_id
        )

        stream = self.workflow.astream(initial_state)
        self.active_streams[task_id] = stream

        # Process stream
        async for state_update in stream:
            await self.emit_update(task_id, state_update)

    async def emit_update(self, task_id: str, state: AgentState):
        """Emit state update to subscribers"""
        update_event = \\\\{
            "task_id": task_id,
            "state": state,
            "timestamp": datetime.now().isoformat()
        \\\\}

        # Send to subscribers (WebSocket, SSE, etc.)
        await self.notify_subscribers(update_event)

WebSocket Integration

import websockets
import json
from typing import Set

class WebSocketGraphServer:
    """WebSocket server for real-time graph execution"""

    def __init__(self, graph_app):
        self.graph_app = graph_app
        self.connected_clients: Set[websockets.WebSocketServerProtocol] = set()
        self.client_sessions = \\\\{\\\\}

    async def register_client(self, websocket: websockets.WebSocketServerProtocol):
        """Register new WebSocket client"""
        self.connected_clients.add(websocket)
        session_id = str(uuid.uuid4())
        self.client_sessions[websocket] = session_id

        await websocket.send(json.dumps(\\\\{
            "type": "connection_established",
            "session_id": session_id
        \\\\}))

    async def unregister_client(self, websocket: websockets.WebSocketServerProtocol):
        """Unregister WebSocket client"""
        self.connected_clients.discard(websocket)
        if websocket in self.client_sessions:
            del self.client_sessions[websocket]

    async def handle_client_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
        """Handle message from WebSocket client"""
        try:
            data = json.loads(message)
            message_type = data.get("type")

            if message_type == "execute_graph":
                await self.execute_graph_for_client(websocket, data)
            elif message_type == "get_status":
                await self.send_status_to_client(websocket, data)

        except json.JSONDecodeError:
            await websocket.send(json.dumps(\\\\{
                "type": "error",
                "message": "Invalid JSON format"
            \\\\}))

    async def execute_graph_for_client(self, websocket: websockets.WebSocketServerProtocol, data: Dict[str, Any]):
        """Execute graph and stream results to client"""
        session_id = self.client_sessions[websocket]
        initial_state = AgentState(**data.get("initial_state", \\\\{\\\\}))

        config = \\\\{
            "configurable": \\\\{
                "thread_id": session_id
            \\\\}
        \\\\}

        try:
            async for state_update in self.graph_app.astream(initial_state, config=config):
                await websocket.send(json.dumps(\\\\{
                    "type": "state_update",
                    "session_id": session_id,
                    "state": self.serialize_state(state_update),
                    "timestamp": datetime.now().isoformat()
                \\\\}))

        except Exception as e:
            await websocket.send(json.dumps(\\\\{
                "type": "execution_error",
                "error": str(e)
            \\\\}))

    async def broadcast_to_all_clients(self, message: Dict[str, Any]):
        """Broadcast message to all connected clients"""
        if self.connected_clients:
            await asyncio.gather(
                *[client.send(json.dumps(message)) for client in self.connected_clients],
                return_exceptions=True
            )

    def serialize_state(self, state: AgentState) -> Dict[str, Any]:
        """Serialize state for JSON transmission"""
        # Convert state to JSON-serializable format
        return \\\\{
            key: value for key, value in state.items()
            if isinstance(value, (str, int, float, bool, list, dict, type(None)))
        \\\\}

# Start WebSocket server
async def start_websocket_server(graph_app, host="localhost", port=8765):
    server = WebSocketGraphServer(graph_app)

    async def handle_client(websocket, path):
        await server.register_client(websocket)
        try:
            async for message in websocket:
                await server.handle_client_message(websocket, message)
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            await server.unregister_client(websocket)

    start_server = websockets.serve(handle_client, host, port)
    await start_server

Production Deployment

LangGraph Platform Deployment

# Install LangGraph CLI
pip install langgraph-cli

# Initialize LangGraph project
langgraph init my-graph-project

# Configure deployment
cat > langgraph.json << EOF
\\\\{
  "dependencies": [
    "langgraph",
    "langchain-openai",
    "langchain-community"
  ],
  "graphs": \\\\{
    "my_graph": "./src/graph.py:workflow"
  \\\\},
  "env": \\\\{
    "OPENAI_API_KEY": "your-api-key"
  \\\\}
\\\\}
EOF

# Deploy to LangGraph Platform
langgraph deploy --name my-production-graph

Docker Deployment

# Dockerfile for LangGraph application
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application code
COPY src/ ./src/
COPY config/ ./config/

# Set environment variables
ENV PYTHONPATH=/app
ENV LANGCHAIN_TRACING_V2=true

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health||exit 1

# Start application
CMD ["python", "-m", "uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  langgraph-app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=$\\\\{OPENAI_API_KEY\\\\}
      - DATABASE_URL=postgresql://user:password@postgres:5432/langgraph_db
    depends_on:
      - postgres
      - redis
    volumes:
      - ./checkpoints:/app/checkpoints
    restart: unless-stopped

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: langgraph_db
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-app
  labels:
    app: langgraph-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph-app
  template:
    metadata:
      labels:
        app: langgraph-app
    spec:
      containers:
      - name: langgraph-app
        image: your-registry/langgraph-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-api-key
        - name: DATABASE_URL
          valueFrom:
            configMapKeyRef:
              name: app-config
              key: database-url
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: langgraph-service
spec:
  selector:
    app: langgraph-app
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer

Monitoring and Observability

Comprehensive Monitoring Setup

from langsmith import Client
import logging
import time
from typing import Dict, Any
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge

# Prometheus metrics
GRAPH_EXECUTIONS = Counter('langgraph_executions_total', 'Total graph executions', ['graph_name', 'status'])
EXECUTION_DURATION = Histogram('langgraph_execution_duration_seconds', 'Graph execution duration')
ACTIVE_THREADS = Gauge('langgraph_active_threads', 'Number of active threads')
NODE_EXECUTIONS = Counter('langgraph_node_executions_total', 'Total node executions', ['node_name', 'status'])

class MonitoringWrapper:
    """Comprehensive monitoring wrapper for LangGraph"""

    def __init__(self, graph_app, langsmith_client: Client = None):
        self.graph_app = graph_app
        self.langsmith_client = langsmith_client
        self.logger = self.setup_logging()
        self.active_executions = \\\\{\\\\}

    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('langgraph.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)

    async def execute_with_monitoring(self, initial_state: AgentState, config: Dict[str, Any] = None):
        """Execute graph with comprehensive monitoring"""
        execution_id = str(uuid.uuid4())
        thread_id = config.get('configurable', \\\\{\\\\}).get('thread_id', 'default')

        # Start monitoring
        start_time = time.time()
        self.active_executions[execution_id] = \\\\{
            'thread_id': thread_id,
            'start_time': start_time,
            'status': 'running'
        \\\\}
        ACTIVE_THREADS.inc()

        try:
            self.logger.info(f"Starting graph execution \\\\{execution_id\\\\} for thread \\\\{thread_id\\\\}")

            # Execute graph with streaming
            final_state = None
            async for state_update in self.graph_app.astream(initial_state, config=config):
                # Log state updates
                self.log_state_update(execution_id, state_update)

                # Send to LangSmith if configured
                if self.langsmith_client:
                    await self.send_to_langsmith(execution_id, state_update)

                final_state = state_update

            # Success metrics
            execution_time = time.time() - start_time
            GRAPH_EXECUTIONS.labels(graph_name='main', status='success').inc()
            EXECUTION_DURATION.observe(execution_time)

            self.logger.info(f"Graph execution \\\\{execution_id\\\\} completed successfully in \\\\{execution_time:.2f\\\\}s")

            return final_state

        except Exception as e:
            # Error metrics
            execution_time = time.time() - start_time
            GRAPH_EXECUTIONS.labels(graph_name='main', status='error').inc()
            EXECUTION_DURATION.observe(execution_time)

            self.logger.error(f"Graph execution \\\\{execution_id\\\\} failed: \\\\{str(e)\\\\}")
            raise

        finally:
            # Cleanup
            ACTIVE_THREADS.dec()
            if execution_id in self.active_executions:
                del self.active_executions[execution_id]

    def log_state_update(self, execution_id: str, state: AgentState):
        """Log state updates with structured logging"""
        log_data = \\\\{
            'execution_id': execution_id,
            'timestamp': datetime.now().isoformat(),
            'completed_tasks': state.get('completed_tasks', []),
            'current_task': state.get('current_task', ''),
            'iteration_count': state.get('iteration_count', 0)
        \\\\}

        self.logger.info(f"State update: \\\\{json.dumps(log_data)\\\\}")

    async def send_to_langsmith(self, execution_id: str, state: AgentState):
        """Send execution data to LangSmith"""
        if not self.langsmith_client:
            return

        try:
            await self.langsmith_client.create_run(
                name=f"langgraph_execution_\\\\{execution_id\\\\}",
                run_type="chain",
                inputs=\\\\{"state": state\\\\},
                extra=\\\\{"execution_id": execution_id\\\\}
            )
        except Exception as e:
            self.logger.warning(f"Failed to send data to LangSmith: \\\\{str(e)\\\\}")

# Health check endpoints
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return JSONResponse(\\\\{
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "active_threads": len(monitoring_wrapper.active_executions)
    \\\\})

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return prometheus_client.generate_latest()

@app.get("/status")
async def status():
    """Detailed status information"""
    return JSONResponse(\\\\{
        "active_executions": len(monitoring_wrapper.active_executions),
        "execution_details": monitoring_wrapper.active_executions,
        "system_info": \\\\{
            "memory_usage": get_memory_usage(),
            "cpu_usage": get_cpu_usage()
        \\\\}
    \\\\})

Mejores prácticas y patrones

Graph Design Principles

Responsabilidad total Cada nodo debe tener un propósito claro y centrado - ** Gestión Estatal**: Esquema del estado de diseño cuidadosamente para evitar la hinchazón - Manejo del espejo: Implementar un manejo integral de errores y recuperación Resource Management: Monitor and manage computational resources - Testing: Crear pruebas unitarias para nodos individuales y pruebas de integración para flujos de trabajo

Performance Optimization

Procesamiento del Paralelo: Proteger la ejecución paralela cuando proceda Caching: Implementar caching para operaciones costosas - Resource Pooling: Uso de conexión para servicios externos - Procesamiento de lotes: Operaciones similares de grupo para la eficiencia * Gestión de memoria* Monitorear y optimizar el uso de la memoria

Consideraciones de seguridad

  • ** Validación de entrada**: Validar todas las entradas para prevenir ataques de inyección ** Control de acceso**: Implementar la autenticación y autorización adecuadas
  • ** Privacidad de datos**: Manejar datos confidenciales según las normas de privacidad API Security: Secure API endpoints and manage rate limiting Audit Logging: Mantener pistas de auditoría completas

-...

*Esta completa hoja de trampolín de LangGraph proporciona todo lo necesario para construir flujos de trabajo sofisticados y elegantes agentes de AI. Desde la construcción básica de gráficos hasta patrones avanzados de implementación de producción, utilice estos ejemplos y mejores prácticas para crear aplicaciones de IA potentes y escalables con el enfoque basado en gráficos de LangGraph. *