Zum Inhalt

LangGraph Framework Cheat Sheet

Überblick

LangGraph stellt eine Paradigmenverschiebung im Bau von KI-Agenten-Workflows dar und bietet einen staatlich-Orchestrationsrahmen, der beispiellose Kontrolle und Flexibilität für agentenbasierte Anwendungen bringt. LangGraph, entwickelt vom LangChain-Team, befasst sich mit den Einschränkungen traditioneller linearer Agenten-Frameworks durch die Einführung eines graphischen Ansatzes, bei dem komplexe Workflows als zusammenhängende Knoten und Kanten strukturiert sind, wodurch anspruchsvolle multiagente Systeme ermöglicht werden können, die mit Verzweigungslogik, bedingter Ausführung und staatlich bedingten Interaktionen umgehen können.

Was LangGraph auseinander setzt, ist seine Fähigkeit, komplexe reale Workflows zu modellieren, die Entscheidungspunkte, parallele Verarbeitung und dynamisches Routing auf Basis von Zwischenergebnissen erfordern. Im Gegensatz zu einfachen kettenbasierten Ansätzen ermöglicht LangGraph Entwicklern, Workflows zu erstellen, in denen Agenten zusammenarbeiten, konkurrieren oder selbständig arbeiten können, während gemeinsamer Zustand und Kontext erhalten bleiben. Dies macht es besonders leistungsstark für Anwendungen, die anspruchsvolle Argumentation, mehrstufige Problemlösung und adaptive Verhalten basierend auf wechselnden Bedingungen erfordern.

Der Rahmen kombiniert die Flexibilität der graphischen Berechnung mit der für Produktionssysteme benötigten Zuverlässigkeit und bietet Funktionen wie Persistenz, Streaming, Debugging-Unterstützung und nahtlose Bereitstellung über die LangGraph Platform. Dies positioniert LangGraph als Go-to-Lösung für Entwickler, die KI-Anwendungen der nächsten Generation aufbauen, die mehr als einfache Anfrage-Response-Muster benötigen.

Installation und Inbetriebnahme

Einfache Installation

# Install LangGraph
pip install langgraph

# Install with additional dependencies
pip install "langgraph[all]"

# Install development version
pip install git+https://github.com/langchain-ai/langgraph.git

# Install LangGraph Platform CLI (for deployment)
pip install langgraph-cli
```_

### Umweltkonfiguration
```python
import os
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# Set up API keys
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-api-key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# Initialize LLMs
openai_llm = ChatOpenAI(model="gpt-4", temperature=0)
anthropic_llm = ChatAnthropic(model="claude-3-sonnet-20240229")
```_

### Projektstruktur

langgraph_project/ ├── graphs/ │ ├── init.py │ ├── research_graph.py │ └── analysis_graph.py ├── nodes/ │ ├── init.py │ ├── agent_nodes.py │ └── tool_nodes.py ├── state/ │ ├── init.py │ └── state_schemas.py ├── tools/ │ ├── init.py │ └── custom_tools.py ├── checkpoints/ │ └── memory.db ├── config/ │ ├── init.py │ └── settings.py └── main.py ```_

Kernkonzepte

Staatliche Verwaltung

```python from typing import TypedDict, List, Optional from langgraph.graph import StateGraph

Define state schema

class AgentState(TypedDict): messages: List[str] current_task: str completed_tasks: List[str] research_data: Optional[dict] analysis_results: Optional[dict] final_output: Optional[str] iteration_count: int error_messages: List[str]

Initialize state

initial_state = AgentState( messages=[], current_task="", completed_tasks=[], research_data=None, analysis_results=None, final_output=None, iteration_count=0, error_messages=[] ) ```_

Baugewerbe

```python from langgraph.graph import StateGraph, END from langgraph.checkpoint.sqlite import SqliteSaver

Create graph with state management

workflow = StateGraph(AgentState)

Add nodes (functions that process state)

workflow.add_node("research", research_node) workflow.add_node("analyze", analyze_node) workflow.add_node("synthesize", synthesize_node) workflow.add_node("review", review_node)

Define edges (transitions between nodes)

workflow.add_edge("research", "analyze") workflow.add_edge("analyze", "synthesize") workflow.add_edge("synthesize", "review")

Conditional edges based on state

def should_continue(state: AgentState) -> str: if state["iteration_count"] > 5: return END elif state["error_messages"]: return "research" # Retry research if errors else: return "review"

workflow.add_conditional_edges( "review", should_continue, \\{ END: END, "research": "research", "review": "review" \\} )

Set entry point

workflow.set_entry_point("research")

Compile graph with checkpointing

memory = SqliteSaver.from_conn_string(":memory:") app = workflow.compile(checkpointer=memory) ```_

Keine Implementierung

Grundlegende Knotenfunktionen

```python from langchain_core.messages import HumanMessage, AIMessage from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")

def research_node(state: AgentState) -> AgentState: """Research node that gathers information""" current_task = state["current_task"]

# Perform research using LLM
research_prompt = f"""
Research the following topic comprehensively:
\\\\{current_task\\\\}

Provide detailed findings including:
1. Key facts and statistics
2. Current trends and developments
3. Expert opinions and analysis
4. Relevant sources and references
"""

response = llm.invoke([HumanMessage(content=research_prompt)])

# Update state
state["research_data"] = \\\\{
    "findings": response.content,
    "timestamp": datetime.now().isoformat(),
    "sources": extract_sources(response.content)
\\\\}
state["completed_tasks"].append("research")
state["messages"].append(f"Research completed for: \\\\{current_task\\\\}")

return state

def analyze_node(state: AgentState) -> AgentState: """Analysis node that processes research data""" research_data = state["research_data"]

if not research_data:
    state["error_messages"].append("No research data available for analysis")
    return state

analysis_prompt = f"""
Analyze the following research findings:
\\\\{research_data['findings']\\\\}

Provide:
1. Key insights and patterns
2. Strengths and weaknesses
3. Opportunities and threats
4. Strategic recommendations
"""

response = llm.invoke([HumanMessage(content=analysis_prompt)])

state["analysis_results"] = \\\\{
    "insights": response.content,
    "timestamp": datetime.now().isoformat(),
    "confidence_score": calculate_confidence(response.content)
\\\\}
state["completed_tasks"].append("analysis")
state["messages"].append("Analysis completed")

return state

def synthesize_node(state: AgentState) -> AgentState: """Synthesis node that combines research and analysis""" research_data = state["research_data"] analysis_results = state["analysis_results"]

synthesis_prompt = f"""
Synthesize the following research and analysis into a comprehensive report:

Research Findings:
\\\\{research_data['findings']\\\\}

Analysis Results:
\\\\{analysis_results['insights']\\\\}

Create a cohesive narrative that:
1. Integrates all key findings
2. Provides actionable recommendations
3. Addresses potential concerns
4. Outlines next steps
"""

response = llm.invoke([HumanMessage(content=synthesis_prompt)])

state["final_output"] = response.content
state["completed_tasks"].append("synthesis")
state["messages"].append("Synthesis completed")

return state

```_

Erweiterte Node Muster

```python from typing import Dict, Any import asyncio

class AsyncResearchNode: """Async node for parallel research operations"""

def __init__(self, llm, tools):
    self.llm = llm
    self.tools = tools

async def __call__(self, state: AgentState) -> AgentState:
    current_task = state["current_task"]

    # Parallel research tasks
    research_tasks = [
        self.web_research(current_task),
        self.academic_research(current_task),
        self.news_research(current_task)
    ]

    results = await asyncio.gather(*research_tasks, return_exceptions=True)

    # Combine results
    combined_research = \\\\{
        "web_findings": results[0] if not isinstance(results[0], Exception) else None,
        "academic_findings": results[1] if not isinstance(results[1], Exception) else None,
        "news_findings": results[2] if not isinstance(results[2], Exception) else None,
        "timestamp": datetime.now().isoformat()
    \\\\}

    state["research_data"] = combined_research
    state["completed_tasks"].append("parallel_research")

    return state

async def web_research(self, topic: str) -> Dict[str, Any]:
    # Implement web research logic
    pass

async def academic_research(self, topic: str) -> Dict[str, Any]:
    # Implement academic research logic
    pass

async def news_research(self, topic: str) -> Dict[str, Any]:
    # Implement news research logic
    pass

Multi-agent collaboration node

class CollaborativeNode: """Node that coordinates multiple agents"""

def __init__(self, agents: Dict[str, Any]):
    self.agents = agents

def __call__(self, state: AgentState) -> AgentState:
    current_task = state["current_task"]

    # Assign tasks to different agents
    agent_results = \\\\{\\\\}

    for agent_name, agent in self.agents.items():
        agent_task = self.create_agent_task(current_task, agent_name)
        result = agent.invoke(agent_task)
        agent_results[agent_name] = result

    # Synthesize agent results
    synthesis_prompt = f"""
    Multiple agents have worked on the task: \\\\{current_task\\\\}

    Agent Results:
    \\\\{self.format_agent_results(agent_results)\\\\}

    Synthesize these perspectives into a cohesive solution.
    """

    final_result = self.synthesizer_llm.invoke(synthesis_prompt)

    state["collaborative_results"] = \\\\{
        "individual_results": agent_results,
        "synthesized_result": final_result.content,
        "participating_agents": list(self.agents.keys())
    \\\\}

    return state

```_

Tool Integration Nodes

```python from langgraph.prebuilt import ToolExecutor from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun

Create tool executor

tools = [DuckDuckGoSearchRun(), WikipediaQueryRun()] tool_executor = ToolExecutor(tools)

def tool_calling_node(state: AgentState) -> AgentState: """Node that uses tools to gather information""" current_task = state["current_task"]

# Determine which tools to use
tool_selection_prompt = f"""
For the task: \\\\{current_task\\\\}

Which tools should be used and what queries should be made?
Available tools: \\\\{[tool.name for tool in tools]\\\\}

Respond with a JSON list of tool calls:
[\\\\{"tool": "tool_name", "query": "search_query"\\\\}, ...]
"""

tool_plan = llm.invoke([HumanMessage(content=tool_selection_prompt)])
tool_calls = parse_tool_calls(tool_plan.content)

# Execute tools
tool_results = []
for tool_call in tool_calls:
    try:
        result = tool_executor.invoke(\\\\{
            "tool": tool_call["tool"],
            "tool_input": tool_call["query"]
        \\\\})
        tool_results.append(\\\\{
            "tool": tool_call["tool"],
            "query": tool_call["query"],
            "result": result,
            "success": True
        \\\\})
    except Exception as e:
        tool_results.append(\\\\{
            "tool": tool_call["tool"],
            "query": tool_call["query"],
            "error": str(e),
            "success": False
        \\\\})

state["tool_results"] = tool_results
state["completed_tasks"].append("tool_execution")

return state

def parse_tool_calls(content: str) -> List[Dict[str, str]]: """Parse tool calls from LLM response""" import json try: return json.loads(content) except: # Fallback parsing logic return [\\{"tool": "search", "query": content\\}] ```_

Bedingte Logik und Routing

Dynamisches Routing

```python def route_based_on_content(state: AgentState) -> str: """Route based on content analysis""" current_task = state["current_task"].lower()

if "research" in current_task or "analyze" in current_task:
    return "research_path"
elif "create" in current_task or "generate" in current_task:
    return "creation_path"
elif "review" in current_task or "evaluate" in current_task:
    return "review_path"
else:
    return "general_path"

def quality_gate(state: AgentState) -> str: """Quality gate for output validation""" if not state["final_output"]: return "retry"

# Quality assessment
quality_score = assess_output_quality(state["final_output"])

if quality_score > 0.8:
    return "approve"
elif quality_score > 0.6:
    return "revise"
else:
    return "retry"

Add conditional routing to graph

workflow.add_conditional_edges( "initial_analysis", route_based_on_content, \\{ "research_path": "research_node", "creation_path": "creation_node", "review_path": "review_node", "general_path": "general_node" \\} )

workflow.add_conditional_edges( "final_review", quality_gate, \\{ "approve": END, "revise": "revision_node", "retry": "research_node" \\} ) ```_

Komplexe Entscheidung Bäume

```python def multi_criteria_router(state: AgentState) -> str: """Multi-criteria decision routing""" criteria = \\{ "complexity": assess_task_complexity(state["current_task"]), "urgency": assess_task_urgency(state), "resources": assess_available_resources(state), "expertise_needed": assess_expertise_requirements(state["current_task"]) \\}

# Decision matrix
if criteria["complexity"] > 0.8 and criteria["expertise_needed"] > 0.7:
    return "expert_collaboration"
elif criteria["urgency"] > 0.9:
    return "fast_track"
elif criteria["resources"] < 0.3:
    return "resource_optimization"
else:
    return "standard_processing"

def adaptive_retry_logic(state: AgentState) -> str: """Adaptive retry with different strategies""" error_count = len(state["error_messages"]) last_error = state["error_messages"][-1] if state["error_messages"] else ""

if error_count == 0:
    return "proceed"
elif error_count == 1:
    if "timeout" in last_error.lower():
        return "retry_with_timeout"
    else:
        return "retry_with_different_approach"
elif error_count == 2:
    return "escalate_to_human"
else:
    return "abort"

Complex routing configuration

workflow.add_conditional_edges( "task_assessment", multi_criteria_router, \\{ "expert_collaboration": "expert_node", "fast_track": "fast_track_node", "resource_optimization": "optimized_node", "standard_processing": "standard_node" \\} ) ```_

Parallelverarbeitung und Konkurs

Parallele Node Ausführung

```python from langgraph.graph import StateGraph import asyncio

class ParallelProcessingGraph: """Graph with parallel processing capabilities"""

def __init__(self):
    self.workflow = StateGraph(AgentState)
    self.setup_parallel_nodes()

def setup_parallel_nodes(self):
    # Add parallel processing nodes
    self.workflow.add_node("parallel_research", self.parallel_research_node)
    self.workflow.add_node("parallel_analysis", self.parallel_analysis_node)
    self.workflow.add_node("synthesis", self.synthesis_node)

    # Parallel execution pattern
    self.workflow.add_edge("parallel_research", "synthesis")
    self.workflow.add_edge("parallel_analysis", "synthesis")

    self.workflow.set_entry_point("parallel_research")
    self.workflow.set_entry_point("parallel_analysis")

async def parallel_research_node(self, state: AgentState) -> AgentState:
    """Execute multiple research tasks in parallel"""
    research_tasks = [
        self.web_search(state["current_task"]),
        self.database_query(state["current_task"]),
        self.expert_consultation(state["current_task"])
    ]

    results = await asyncio.gather(*research_tasks, return_exceptions=True)

    state["parallel_research_results"] = \\\\{
        "web_search": results[0] if not isinstance(results[0], Exception) else None,
        "database_query": results[1] if not isinstance(results[1], Exception) else None,
        "expert_consultation": results[2] if not isinstance(results[2], Exception) else None
    \\\\}

    return state

async def parallel_analysis_node(self, state: AgentState) -> AgentState:
    """Execute multiple analysis tasks in parallel"""
    analysis_tasks = [
        self.sentiment_analysis(state["current_task"]),
        self.trend_analysis(state["current_task"]),
        self.competitive_analysis(state["current_task"])
    ]

    results = await asyncio.gather(*analysis_tasks, return_exceptions=True)

    state["parallel_analysis_results"] = \\\\{
        "sentiment": results[0] if not isinstance(results[0], Exception) else None,
        "trends": results[1] if not isinstance(results[1], Exception) else None,
        "competitive": results[2] if not isinstance(results[2], Exception) else None
    \\\\}

    return state

Fan-out/Fan-in pattern

def fan_out_node(state: AgentState) -> AgentState: """Split work into parallel streams""" task = state["current_task"]

# Create subtasks
subtasks = split_task_into_subtasks(task)
state["subtasks"] = subtasks
state["parallel_streams"] = len(subtasks)

return state

def fan_in_node(state: AgentState) -> AgentState: """Combine results from parallel streams""" subtask_results = state.get("subtask_results", \\{\\})

if len(subtask_results) < state["parallel_streams"]:
    # Not all streams completed yet
    return state

# Combine all results
combined_result = combine_subtask_results(subtask_results)
state["final_output"] = combined_result

return state

```_

Last Balancing und Ressourcenmanagement

```python import threading from queue import Queue from concurrent.futures import ThreadPoolExecutor

class ResourceManagedGraph: """Graph with resource management and load balancing"""

def __init__(self, max_workers=4, max_concurrent_llm_calls=2):
    self.max_workers = max_workers
    self.max_concurrent_llm_calls = max_concurrent_llm_calls
    self.llm_semaphore = threading.Semaphore(max_concurrent_llm_calls)
    self.executor = ThreadPoolExecutor(max_workers=max_workers)
    self.task_queue = Queue()

def resource_aware_node(self, state: AgentState) -> AgentState:
    """Node that manages resource usage"""
    with self.llm_semaphore:
        # LLM call with resource management
        result = self.make_llm_call(state["current_task"])

    state["resource_managed_result"] = result
    return state

def load_balanced_processing(self, state: AgentState) -> AgentState:
    """Distribute work across available resources"""
    tasks = state.get("pending_tasks", [])

    if not tasks:
        return state

    # Submit tasks to thread pool
    futures = []
    for task in tasks:
        future = self.executor.submit(self.process_single_task, task)
        futures.append(future)

    # Collect results
    results = []
    for future in futures:
        try:
            result = future.result(timeout=30)
            results.append(result)
        except Exception as e:
            results.append(\\\\{"error": str(e)\\\\})

    state["load_balanced_results"] = results
    return state

def adaptive_scaling_node(self, state: AgentState) -> AgentState:
    """Adapt resource usage based on workload"""
    current_load = self.assess_current_load()

    if current_load > 0.8:
        # High load - reduce concurrency
        self.llm_semaphore = threading.Semaphore(1)
    elif current_load ``< 0.3:
        # Low load - increase concurrency
        self.llm_semaphore = threading.Semaphore(self.max_concurrent_llm_calls * 2)

    return state

```_

Erinnerung und Beharrlichkeit

Überprüfung und Staatspersistenz

```python from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.checkpoint.postgres import PostgresSaver import sqlite3

SQLite checkpointing

def setup_sqlite_checkpointing(): conn = sqlite3.connect("checkpoints.db", check_same_thread=False) memory = SqliteSaver(conn) return memory

PostgreSQL checkpointing for production

def setup_postgres_checkpointing(): connection_string = "postgresql://user:password@localhost/langgraph_db" memory = PostgresSaver.from_conn_string(connection_string) return memory

Custom checkpointing implementation

class CustomCheckpointer: """Custom checkpointing with additional features"""

def __init__(self, storage_backend):
    self.storage = storage_backend
    self.checkpoint_history = \\\{\\\}

def save_checkpoint(self, thread_id: str, state: AgentState, step: int):
    checkpoint_data = \\\{
        "thread_id": thread_id,
        "state": state,
        "step": step,
        "timestamp": datetime.now().isoformat(),
        "metadata": self.extract_metadata(state)
    \\\}

    self.storage.save(f"\\\{thread_id\\\}_\\\{step\\\}", checkpoint_data)

    # Keep checkpoint history
    if thread_id not in self.checkpoint_history:
        self.checkpoint_history[thread_id] = []
    self.checkpoint_history[thread_id].append(step)

def load_checkpoint(self, thread_id: str, step: int = None):
    if step is None:
        # Load latest checkpoint
        steps = self.checkpoint_history.get(thread_id, [])
        if not steps:
            return None
        step = max(steps)

    return self.storage.load(f"\\\{thread_id\\\}_\\\{step\\\}")

def rollback_to_checkpoint(self, thread_id: str, step: int):
    checkpoint = self.load_checkpoint(thread_id, step)
    if checkpoint:
        return checkpoint["state"]
    return None

Usage with custom checkpointing

custom_memory = CustomCheckpointer(storage_backend) app = workflow.compile(checkpointer=custom_memory) ```_

Thread Management und Session Handling

```python from typing import Dict, Any import uuid

class ThreadManager: """Manage multiple conversation threads"""

def __init__(self, checkpointer):
    self.checkpointer = checkpointer
    self.active_threads = \\\{\\\}
    self.thread_metadata = \\\{\\\}

def create_thread(self, user_id: str, initial_state: AgentState) ->`` str:
    thread_id = str(uuid.uuid4())

    self.active_threads[thread_id] = \\\\{
        "user_id": user_id,
        "created_at": datetime.now(),
        "last_activity": datetime.now(),
        "state": initial_state
    \\\\}

    self.thread_metadata[thread_id] = \\\\{
        "user_id": user_id,
        "session_count": 0,
        "total_messages": 0
    \\\\}

    return thread_id

def get_thread_state(self, thread_id: str) -> AgentState:
    if thread_id in self.active_threads:
        return self.active_threads[thread_id]["state"]

    # Try to load from checkpoint
    checkpoint = self.checkpointer.load_checkpoint(thread_id)
    if checkpoint:
        return checkpoint["state"]

    return None

def update_thread_state(self, thread_id: str, new_state: AgentState):
    if thread_id in self.active_threads:
        self.active_threads[thread_id]["state"] = new_state
        self.active_threads[thread_id]["last_activity"] = datetime.now()

    # Save checkpoint
    self.checkpointer.save_checkpoint(thread_id, new_state,
                                     self.get_current_step(thread_id))

def cleanup_inactive_threads(self, max_age_hours: int = 24):
    cutoff_time = datetime.now() - timedelta(hours=max_age_hours)

    inactive_threads = [
        thread_id for thread_id, data in self.active_threads.items()
        if data["last_activity"] < cutoff_time
    ]

    for thread_id in inactive_threads:
        del self.active_threads[thread_id]

Session-aware graph execution

def execute_with_session(app, thread_id: str, input_data: Dict[str, Any]): config = \\{ "configurable": \\{ "thread_id": thread_id, "checkpoint_ns": f"session_\\{thread_id\\}" \\} \\}

result = app.invoke(input_data, config=config)
return result

```_

Langzeitgedächtnisintegration

```python from langchain.memory import ConversationSummaryBufferMemory from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings

class LongTermMemoryGraph: """Graph with long-term memory capabilities"""

def __init__(self, llm):
    self.llm = llm
    self.conversation_memory = ConversationSummaryBufferMemory(
        llm=llm,
        max_token_limit=2000,
        return_messages=True
    )
    self.episodic_memory = FAISS.from_texts(
        ["Initial memory"],
        OpenAIEmbeddings()
    )
    self.semantic_memory = \\\\{\\\\}

def memory_enhanced_node(self, state: AgentState) -> AgentState:
    """Node that leverages long-term memory"""
    current_task = state["current_task"]

    # Retrieve relevant memories
    relevant_episodes = self.episodic_memory.similarity_search(
        current_task, k=3
    )

    conversation_context = self.conversation_memory.load_memory_variables(\\\\{\\\\})

    # Enhance prompt with memory context
    enhanced_prompt = f"""
    Current Task: \\\\{current_task\\\\}

    Relevant Past Experiences:
    \\\\{self.format_episodes(relevant_episodes)\\\\}

    Conversation History:
    \\\\{conversation_context.get('history', '')\\\\}

    Semantic Knowledge:
    \\\\{self.get_relevant_semantic_knowledge(current_task)\\\\}

    Based on this context, proceed with the task.
    """

    response = self.llm.invoke([HumanMessage(content=enhanced_prompt)])

    # Update memories
    self.update_episodic_memory(current_task, response.content)
    self.conversation_memory.save_context(
        \\\\{"input": current_task\\\\},
        \\\\{"output": response.content\\\\}
    )

    state["memory_enhanced_result"] = response.content
    return state

def update_episodic_memory(self, task: str, result: str):
    """Update episodic memory with new experience"""
    episode = f"Task: \\\\{task\\\\}\nResult: \\\\{result\\\\}\nTimestamp: \\\\{datetime.now()\\\\}"

    # Add to vector store
    self.episodic_memory.add_texts([episode])

def update_semantic_memory(self, concept: str, knowledge: str):
    """Update semantic memory with new knowledge"""
    self.semantic_memory[concept] = knowledge

def get_relevant_semantic_knowledge(self, query: str) -> str:
    """Retrieve relevant semantic knowledge"""
    relevant_concepts = []
    for concept, knowledge in self.semantic_memory.items():
        if any(word in query.lower() for word in concept.lower().split()):
            relevant_concepts.append(f"\\\\{concept\\\\}: \\\\{knowledge\\\\}")

    return "\n".join(relevant_concepts)

```_

Streaming und Echtzeitverarbeitung

Stream Processing

```python from langgraph.graph import StateGraph import asyncio from typing import AsyncIterator

class StreamingGraph: """Graph with streaming capabilities"""

def __init__(self):
    self.workflow = StateGraph(AgentState)
    self.setup_streaming_nodes()

def setup_streaming_nodes(self):
    self.workflow.add_node("streaming_research", self.streaming_research_node)
    self.workflow.add_node("streaming_analysis", self.streaming_analysis_node)
    self.workflow.set_entry_point("streaming_research")

async def streaming_research_node(self, state: AgentState) -> AsyncIterator[AgentState]:
    """Node that yields intermediate results"""
    current_task = state["current_task"]

    # Yield initial status
    state["status"] = "Starting research..."
    yield state

    # Perform research in chunks
    research_steps = [
        "Gathering initial sources...",
        "Analyzing primary data...",
        "Cross-referencing information...",
        "Finalizing research findings..."
    ]

    for i, step in enumerate(research_steps):
        state["status"] = step
        state["progress"] = (i + 1) / len(research_steps)

        # Simulate research work
        await asyncio.sleep(1)

        # Yield intermediate state
        yield state

    # Final research results
    state["research_data"] = \\\\{
        "findings": f"Research completed for: \\\\{current_task\\\\}",
        "confidence": 0.85,
        "sources": ["source1", "source2", "source3"]
    \\\\}
    state["status"] = "Research completed"
    yield state

async def stream_execution(self, initial_state: AgentState) -> AsyncIterator[AgentState]:
    """Execute graph with streaming output"""
    async for state_update in self.workflow.astream(initial_state):
        yield state_update

Usage

async def main(): streaming_graph = StreamingGraph() initial_state = AgentState(current_task="Research AI trends", messages=[])

async for state in streaming_graph.stream_execution(initial_state):
    print(f"Status: \\\\{state.get('status', 'Processing...')\\\\}")
    print(f"Progress: \\\\{state.get('progress', 0) * 100:.1f\\\\}%")
    print("---")

Real-time event processing

class EventDrivenGraph: """Graph that responds to real-time events"""

def __init__(self):
    self.event_queue = asyncio.Queue()
    self.active_streams = \\\\{\\\\}

async def event_listener(self):
    """Listen for incoming events"""
    while True:
        event = await self.event_queue.get()
        await self.process_event(event)

async def process_event(self, event: Dict[str, Any]):
    """Process incoming event"""
    event_type = event.get("type")

    if event_type == "new_task":
        await self.handle_new_task(event)
    elif event_type == "update_request":
        await self.handle_update_request(event)
    elif event_type == "cancellation":
        await self.handle_cancellation(event)

async def handle_new_task(self, event: Dict[str, Any]):
    """Handle new task event"""
    task_id = event["task_id"]
    task_data = event["data"]

    # Create new streaming execution
    initial_state = AgentState(
        current_task=task_data["description"],
        task_id=task_id
    )

    stream = self.workflow.astream(initial_state)
    self.active_streams[task_id] = stream

    # Process stream
    async for state_update in stream:
        await self.emit_update(task_id, state_update)

async def emit_update(self, task_id: str, state: AgentState):
    """Emit state update to subscribers"""
    update_event = \\\\{
        "task_id": task_id,
        "state": state,
        "timestamp": datetime.now().isoformat()
    \\\\}

    # Send to subscribers (WebSocket, SSE, etc.)
    await self.notify_subscribers(update_event)

```_

WebSocket Integration

```python import websockets import json from typing import Set

class WebSocketGraphServer: """WebSocket server for real-time graph execution"""

def __init__(self, graph_app):
    self.graph_app = graph_app
    self.connected_clients: Set[websockets.WebSocketServerProtocol] = set()
    self.client_sessions = \\\\{\\\\}

async def register_client(self, websocket: websockets.WebSocketServerProtocol):
    """Register new WebSocket client"""
    self.connected_clients.add(websocket)
    session_id = str(uuid.uuid4())
    self.client_sessions[websocket] = session_id

    await websocket.send(json.dumps(\\\\{
        "type": "connection_established",
        "session_id": session_id
    \\\\}))

async def unregister_client(self, websocket: websockets.WebSocketServerProtocol):
    """Unregister WebSocket client"""
    self.connected_clients.discard(websocket)
    if websocket in self.client_sessions:
        del self.client_sessions[websocket]

async def handle_client_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
    """Handle message from WebSocket client"""
    try:
        data = json.loads(message)
        message_type = data.get("type")

        if message_type == "execute_graph":
            await self.execute_graph_for_client(websocket, data)
        elif message_type == "get_status":
            await self.send_status_to_client(websocket, data)

    except json.JSONDecodeError:
        await websocket.send(json.dumps(\\\\{
            "type": "error",
            "message": "Invalid JSON format"
        \\\\}))

async def execute_graph_for_client(self, websocket: websockets.WebSocketServerProtocol, data: Dict[str, Any]):
    """Execute graph and stream results to client"""
    session_id = self.client_sessions[websocket]
    initial_state = AgentState(**data.get("initial_state", \\\\{\\\\}))

    config = \\\\{
        "configurable": \\\\{
            "thread_id": session_id
        \\\\}
    \\\\}

    try:
        async for state_update in self.graph_app.astream(initial_state, config=config):
            await websocket.send(json.dumps(\\\\{
                "type": "state_update",
                "session_id": session_id,
                "state": self.serialize_state(state_update),
                "timestamp": datetime.now().isoformat()
            \\\\}))

    except Exception as e:
        await websocket.send(json.dumps(\\\\{
            "type": "execution_error",
            "error": str(e)
        \\\\}))

async def broadcast_to_all_clients(self, message: Dict[str, Any]):
    """Broadcast message to all connected clients"""
    if self.connected_clients:
        await asyncio.gather(
            *[client.send(json.dumps(message)) for client in self.connected_clients],
            return_exceptions=True
        )

def serialize_state(self, state: AgentState) -> Dict[str, Any]:
    """Serialize state for JSON transmission"""
    # Convert state to JSON-serializable format
    return \\\\{
        key: value for key, value in state.items()
        if isinstance(value, (str, int, float, bool, list, dict, type(None)))
    \\\\}

Start WebSocket server

async def start_websocket_server(graph_app, host="localhost", port=8765): server = WebSocketGraphServer(graph_app)

async def handle_client(websocket, path):
    await server.register_client(websocket)
    try:
        async for message in websocket:
            await server.handle_client_message(websocket, message)
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        await server.unregister_client(websocket)

start_server = websockets.serve(handle_client, host, port)
await start_server

```_

Produktionsentwicklung

LangGraph Platform Bereitstellung

```bash

Install LangGraph CLI

pip install langgraph-cli

Initialize LangGraph project

langgraph init my-graph-project

Configure deployment

cat > langgraph.json << EOF \\{ "dependencies": [ "langgraph", "langchain-openai", "langchain-community" ], "graphs": \\{ "my_graph": "./src/graph.py:workflow" \\}, "env": \\{ "OPENAI_API_KEY": "your-api-key" \\} \\} EOF

Deploy to LangGraph Platform

langgraph deploy --name my-production-graph ```_

Einsatz von Docker

```dockerfile

Dockerfile for LangGraph application

FROM python:3.11-slim

WORKDIR /app

Install dependencies

COPY requirements.txt . RUN pip install -r requirements.txt

Copy application code

COPY src/ ./src/ COPY config/ ./config/

Set environment variables

ENV PYTHONPATH=/app ENV LANGCHAIN_TRACING_V2=true

Expose port

EXPOSE 8000

Health check

HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ | CMD curl -f http://localhost:8000/health | | exit 1 |

Start application

CMD ["python", "-m", "uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"] _yaml

docker-compose.yml

version: '3.8'

services: langgraph-app: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=$\\{OPENAI_API_KEY\\} - DATABASE_URL=postgresql://user:password@postgres:5432/langgraph_db depends_on: - postgres - redis volumes: - ./checkpoints:/app/checkpoints restart: unless-stopped

postgres: image: postgres:15 environment: POSTGRES_DB: langgraph_db POSTGRES_USER: user POSTGRES_PASSWORD: password volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432"

redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data

volumes: postgres_data: redis_data: ```_

Kubernetes Bereitstellung

```yaml

k8s-deployment.yaml

apiVersion: apps/v1 kind: Deployment metadata: name: langgraph-app labels: app: langgraph-app spec: replicas: 3 selector: matchLabels: app: langgraph-app template: metadata: labels: app: langgraph-app spec: containers: - name: langgraph-app image: your-registry/langgraph-app:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: api-keys key: openai-api-key - name: DATABASE_URL valueFrom: configMapKeyRef: name: app-config key: database-url resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5


apiVersion: v1 kind: Service metadata: name: langgraph-service spec: selector: app: langgraph-app ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancer ```_

Überwachung und Beobachtungsfähigkeit

Umfassende Monitoring-Setup

```python from langsmith import Client import logging import time from typing import Dict, Any import prometheus_client from prometheus_client import Counter, Histogram, Gauge

Prometheus metrics

GRAPH_EXECUTIONS = Counter('langgraph_executions_total', 'Total graph executions', ['graph_name', 'status']) EXECUTION_DURATION = Histogram('langgraph_execution_duration_seconds', 'Graph execution duration') ACTIVE_THREADS = Gauge('langgraph_active_threads', 'Number of active threads') NODE_EXECUTIONS = Counter('langgraph_node_executions_total', 'Total node executions', ['node_name', 'status'])

class MonitoringWrapper: """Comprehensive monitoring wrapper for LangGraph"""

def __init__(self, graph_app, langsmith_client: Client = None):
    self.graph_app = graph_app
    self.langsmith_client = langsmith_client
    self.logger = self.setup_logging()
    self.active_executions = \\\\{\\\\}

def setup_logging(self):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('langgraph.log'),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger(__name__)

async def execute_with_monitoring(self, initial_state: AgentState, config: Dict[str, Any] = None):
    """Execute graph with comprehensive monitoring"""
    execution_id = str(uuid.uuid4())
    thread_id = config.get('configurable', \\\\{\\\\}).get('thread_id', 'default')

    # Start monitoring
    start_time = time.time()
    self.active_executions[execution_id] = \\\\{
        'thread_id': thread_id,
        'start_time': start_time,
        'status': 'running'
    \\\\}
    ACTIVE_THREADS.inc()

    try:
        self.logger.info(f"Starting graph execution \\\\{execution_id\\\\} for thread \\\\{thread_id\\\\}")

        # Execute graph with streaming
        final_state = None
        async for state_update in self.graph_app.astream(initial_state, config=config):
            # Log state updates
            self.log_state_update(execution_id, state_update)

            # Send to LangSmith if configured
            if self.langsmith_client:
                await self.send_to_langsmith(execution_id, state_update)

            final_state = state_update

        # Success metrics
        execution_time = time.time() - start_time
        GRAPH_EXECUTIONS.labels(graph_name='main', status='success').inc()
        EXECUTION_DURATION.observe(execution_time)

        self.logger.info(f"Graph execution \\\\{execution_id\\\\} completed successfully in \\\\{execution_time:.2f\\\\}s")

        return final_state

    except Exception as e:
        # Error metrics
        execution_time = time.time() - start_time
        GRAPH_EXECUTIONS.labels(graph_name='main', status='error').inc()
        EXECUTION_DURATION.observe(execution_time)

        self.logger.error(f"Graph execution \\\\{execution_id\\\\} failed: \\\\{str(e)\\\\}")
        raise

    finally:
        # Cleanup
        ACTIVE_THREADS.dec()
        if execution_id in self.active_executions:
            del self.active_executions[execution_id]

def log_state_update(self, execution_id: str, state: AgentState):
    """Log state updates with structured logging"""
    log_data = \\\\{
        'execution_id': execution_id,
        'timestamp': datetime.now().isoformat(),
        'completed_tasks': state.get('completed_tasks', []),
        'current_task': state.get('current_task', ''),
        'iteration_count': state.get('iteration_count', 0)
    \\\\}

    self.logger.info(f"State update: \\\\{json.dumps(log_data)\\\\}")

async def send_to_langsmith(self, execution_id: str, state: AgentState):
    """Send execution data to LangSmith"""
    if not self.langsmith_client:
        return

    try:
        await self.langsmith_client.create_run(
            name=f"langgraph_execution_\\\\{execution_id\\\\}",
            run_type="chain",
            inputs=\\\\{"state": state\\\\},
            extra=\\\\{"execution_id": execution_id\\\\}
        )
    except Exception as e:
        self.logger.warning(f"Failed to send data to LangSmith: \\\\{str(e)\\\\}")

Health check endpoints

from fastapi import FastAPI from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health") async def health_check(): """Health check endpoint""" return JSONResponse(\\{ "status": "healthy", "timestamp": datetime.now().isoformat(), "active_threads": len(monitoring_wrapper.active_executions) \\})

@app.get("/metrics") async def metrics(): """Prometheus metrics endpoint""" return prometheus_client.generate_latest()

@app.get("/status") async def status(): """Detailed status information""" return JSONResponse(\\{ "active_executions": len(monitoring_wrapper.active_executions), "execution_details": monitoring_wrapper.active_executions, "system_info": \\{ "memory_usage": get_memory_usage(), "cpu_usage": get_cpu_usage() \\} \\}) ```_

Best Practices und Muster

Graphische Gestaltungsprinzipien

  • *Single Verantwortung: Jeder Knoten sollte einen klaren, fokussierten Zweck haben
  • State Management: Design Zustand Schema sorgfältig zu vermeiden bloat
  • Error Handling: Vollständige Fehlerbehandlung und Wiederherstellung
  • ** Ressourcenmanagement**: Computerressourcen überwachen und verwalten
  • Test: Gerätetests für einzelne Knoten und Integrationstests für Workflows erstellen

Leistungsoptimierung

  • *Parallel Processing: Verwalten Sie gegebenenfalls parallele Ausführung
  • Caching: Implementieren Sie Cache für teure Operationen
  • ** Ressourcenpooling**: Verwenden Sie Verbindungspooling für externe Dienste
  • *Batch Processing: Gruppen ähnliche Operationen für Effizienz
  • Memory Management: Speichernutzung überwachen und optimieren

Sicherheitsüberlegungen

  • ** Eingangsvalidierung*: Gültig alle Eingänge, um Injektionsangriffe zu verhindern
  • Access Control: Durchführung einer ordnungsgemäßen Authentifizierung und Autorisierung
  • Datenschutz: sensible Daten gemäß Datenschutzbestimmungen handhaben
  • API Security: Sichere API-Endpunkte und Geschwindigkeitsbegrenzung
  • *Audit Logging: Erhalten Sie umfassende Audit-Strecken

--

*Diese umfassende LangGraph Betrügerei bietet alles, was nötig ist, um anspruchsvolle, Stateful AI Agent Workflows aufzubauen. Von der Basisgrafik bis hin zu fortschrittlichen Produktionsmustern nutzen Sie diese Beispiele und Best Practices, um leistungsstarke, skalierbare KI-Anwendungen mit dem graphischen Ansatz von LangGraph zu erstellen. *