Skip to content

Agent Governance Toolkit

Microsoft’s open-source Agent Governance Toolkit provides runtime security governance for AI agents. Deterministic policy enforcement, zero-trust identity, execution rings, and compliance verification across all 10 OWASP Agentic risks. Works with LangChain, CrewAI, AutoGen, Anthropic, and more.

GitHub: https://github.com/microsoft/agent-governance-toolkit License: MIT Languages: Python, TypeScript, .NET, Rust, Go

# Full installation with all components
pip install agent-governance-toolkit[full]

# Individual components (à la carte)
pip install agent-os-kernel              # Policy engine
pip install agentmesh-platform           # Zero-trust identity & trust mesh
pip install agentmesh-runtime            # Runtime supervisor & privilege rings
pip install agent-sre                    # SRE toolkit (SLOs, error budgets)
pip install agent-governance-toolkit     # Compliance & attestation
pip install agentmesh-marketplace        # Plugin lifecycle management
pip install agentmesh-lightning          # RL training governance
npm install @agentmesh/sdk
dotnet add package Microsoft.AgentGovernance
cargo add agentmesh
go get github.com/microsoft/agent-governance-toolkit/packages/agent-mesh/sdks/go
from agent_os.policy import PolicyEngine, CapabilityModel

# Define agent capabilities
capabilities = CapabilityModel(
    allowed_tools=["web_search", "read_file", "send_email"],
    denied_tools=["delete_file", "execute_shell"],
    blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"],  # Block SSN
    max_tool_calls=10,
    max_tokens_per_call=4096,
    require_human_approval=True
)

# Create policy engine
engine = PolicyEngine(capabilities=capabilities)

# Evaluate actions
result = engine.evaluate(
    agent_id="researcher-1",
    action="web_search",
    input_text="latest security news"
)

print(f"Allowed: {result.allowed}")
print(f"Reason: {result.reason}")
import { PolicyEngine, AgentIdentity, AuditLogger } from "@agentmesh/sdk";

// Generate cryptographic identity
const identity = AgentIdentity.generate("my-agent", ["web_search", "read_file"]);

// Create policy engine
const engine = new PolicyEngine([
  { action: "web_search", effect: "allow" },
  { action: "read_file", effect: "allow" },
  { action: "delete_file", effect: "deny" },
  { action: "shell_exec", effect: "deny" },
]);

const decision = engine.evaluate("web_search");  // "allow"
const denied = engine.evaluate("delete_file");   // "deny"
ComponentPurposeKey Features
Agent OSPolicy engine & framework adaptersSub-millisecond evaluation, regex/semantic detection
Agent MeshZero-trust identity & trust scoringEd25519 signatures, SPIFFE/SVID, 0–1000 trust scale
Agent RuntimeExecution supervisor & sandboxing4-tier privilege rings, kill switch, saga orchestration
Agent SREReliability engineeringSLOs, error budgets, circuit breakers, replay debugging
Agent ComplianceOWASP verification & attestationBadge generation, JSON audit trails, integrity checks
Agent MarketplacePlugin lifecycle managementMCP security scanning, rug-pull detection
Agent LightningRL training governanceTraining data validation, model drift detection
policies:
  researcher_agent:
    allowed_tools:
      - web_search
      - read_file
      - database_query
    denied_tools:
      - execute_code
      - delete_database
      - modify_system

    # Block sensitive patterns (SSN, API keys, emails)
    blocked_patterns:
      - "\\b\\d{3}-\\d{2}-\\d{4}\\b"  # SSN
      - "[Aa][Pp][Ii][_-]?[Kk][Ee][Yy]"  # API key
      - "\\S+@\\S+\\.\\S+"  # Email (PII)

    # Resource limits
    max_tool_calls: 20
    max_tokens_per_call: 8192
    max_concurrent_calls: 5

    # Privilege ring assignment
    execution_ring: 2  # 0=kernel, 1=system, 2=user, 3=sandbox

    # Require approval for certain actions
    require_human_approval_for:
      - send_email
      - modify_database
      - external_api_call

    # Trust thresholds
    min_trust_score: 500  # 0–1000 scale

    # Output validation
    enable_prompt_injection_detection: true
    enable_sensitive_data_detection: true
    enable_code_validation: true
from agent_os.policy import PolicyEngine, CapabilityModel

# Load policy from YAML
engine = PolicyEngine.from_yaml("policies.yaml")

# Evaluate tool calls
decision = engine.evaluate(
    agent_id="researcher-1",
    action="tool_call",
    tool="web_search",
    params={"query": "security trends"}
)

if not decision.allowed:
    print(f"Blocked: {decision.reason}")
else:
    print("Proceeding with tool call...")
from agent_os import PolicyEngine, CapabilityModel
from agent_os.integrations import LangChainIntegration

# Define capabilities
capabilities = CapabilityModel(
    allowed_tools=["web_search", "calculator"],
    max_tool_calls=10
)

# Create policy engine
engine = PolicyEngine(capabilities=capabilities)

# For LangChain agents
from langchain.agents import initialize_agent

governed_agent = LangChainIntegration(
    agent=your_langchain_agent,
    policy_engine=engine,
    audit_log=True
)

# Every tool call is now intercepted and evaluated
result = governed_agent.run("What is 2+2?")
# 1. Tool allowlist/blocklist check
✓ Is tool in allowed_tools list?
✓ Is tool NOT in denied_tools list?

# 2. Pattern matching (injections, PII, secrets)
✓ No prompt injection patterns detected
✓ No SSN, API keys, or PII in parameters
✓ No SQL injection or code execution payloads

# 3. Resource constraints
✓ Token usage within limits
✓ Concurrent call limit not exceeded
✓ Rate limiting not triggered

# 4. Privilege ring verification
✓ Agent has required privilege level
✓ Tool operates within agent's ring tier

# 5. Trust scoring
✓ Agent meets minimum trust score
✓ No anomalous behavior detected

# 6. Human approval (if required)
✓ Sensitive action approved by human
import { AgentIdentity, TrustCard } from "@agentmesh/sdk";

// Generate cryptographic identity
const identity = AgentIdentity.generate(
  "researcher-agent",
  ["web_search", "read_file"]
);

console.log(identity.did);              // did:mesh:agent-xxxxx
console.log(identity.publicKeyPEM);     // Ed25519 public key
console.log(identity.allowedCapabilities);  // ["web_search", "read_file"]

// Sign outbound communication
const signature = identity.sign("outgoing message");

// Verify peer agent identity
const isValid = identity.verify(peerPublicKey, message, signature);
from agentmesh.trust import TrustScorer

scorer = TrustScorer()

# Components of trust score:
trust_score = scorer.compute(
    agent_id="agent-1",
    factors={
        "success_rate": 0.95,          # 95% of tasks succeed
        "error_budget_remaining": 0.8, # 80% error budget left
        "last_violation_age_hours": 72, # Last violation 3 days ago
        "api_key_rotation_days_ago": 30, # Keys rotated 30 days ago
        "audit_log_completeness": 1.0, # Full audit trail
    }
)

print(f"Trust score: {trust_score} / 1000")

# Trust tiers
# 0–200: Untrusted (sandbox only)
# 200–400: Low trust (limited tools)
# 400–600: Medium trust (standard tools)
# 600–800: High trust (privileged tools)
# 800–1000: Maximum trust (admin capabilities)
┌─────────────────────────────────┐
│  Ring 0: Kernel                 │  Filesystem, system calls, process control
├─────────────────────────────────┤
│  Ring 1: System                 │  Database, API gateways, secrets manager
├─────────────────────────────────┤
│  Ring 2: User                   │  Web search, internal APIs, file read
├─────────────────────────────────┤
│  Ring 3: Sandbox                │  No outbound access, isolated execution
└─────────────────────────────────┘
from agent_os.runtime import ExecutionRing, AgentRuntime

runtime = AgentRuntime()

# Assign agent to ring based on trust
runtime.assign_ring(agent_id="trusted-agent", ring=ExecutionRing.SYSTEM)
runtime.assign_ring(agent_id="untrusted-agent", ring=ExecutionRing.SANDBOX)

# Enforce ring during execution
@runtime.enforce_ring
def execute_tool(agent_id, tool_name, params):
    # This function executes only if agent has privilege for ring
    return tool_name(params)

# Kill switch for rogue agents
runtime.terminate_agent("agent-123", reason="Excessive tool calls")

Agent Compliance — Verification & Attestation

Section titled “Agent Compliance — Verification & Attestation”
# Generate verification report
agent-compliance verify

# Output as JSON (for CI/CD)
agent-compliance verify --json

# Generate badge for README
agent-compliance verify --badge

# Verify module integrity (Ed25519 signatures)
agent-compliance integrity --verify
{
  "version": "1.0",
  "timestamp": "2026-04-04T12:00:00Z",
  "coverage": {
    "ASI-01": {"status": "covered", "mechanism": "PolicyEngine"},
    "ASI-02": {"status": "covered", "mechanism": "MCPGateway"},
    "ASI-03": {"status": "covered", "mechanism": "MemoryGuard"},
    "ASI-04": {"status": "covered", "mechanism": "RateLimiter"},
    "ASI-05": {"status": "covered", "mechanism": "SupplyChainGuard"},
    "ASI-06": {"status": "covered", "mechanism": "PII Detection"},
    "ASI-07": {"status": "covered", "mechanism": "MCPSecurityScanner"},
    "ASI-08": {"status": "covered", "mechanism": "ExecutionRings"},
    "ASI-09": {"status": "covered", "mechanism": "DriftDetector"},
    "ASI-10": {"status": "out_of_scope", "reason": "Model-level, not agent-level"}
  },
  "overall_score": "9/10"
}
# Generate integrity hashes for modules
agent-compliance integrity --generate

# Verify no tampering detected
agent-compliance integrity --verify

# Output as badge
agent-compliance integrity --badge
from agent_os.integrations import LangChainIntegration
from agent_os.policy import PolicyEngine, CapabilityModel
from langchain.agents import initialize_agent
from langchain.tools import Tool

# Create tools
tools = [
    Tool(name="web_search", func=search_fn, description="Search web"),
    Tool(name="calculator", func=calc_fn, description="Calculate"),
]

# Initialize governed agent
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")

# Wrap with governance
capabilities = CapabilityModel(allowed_tools=["web_search"])
governed = LangChainIntegration(agent, PolicyEngine(capabilities))

# Tool calls now enforced
result = governed.run("What is 2+2?")
from crewai import Agent, Task, Crew
from agent_os.integrations import CrewAIDecorator
from agent_os.policy import PolicyEngine, CapabilityModel

@CrewAIDecorator(
    policy_engine=PolicyEngine(
        CapabilityModel(allowed_tools=["web_search", "file_read"])
    )
)
def create_crew():
    agent = Agent(
        role="Researcher",
        tools=[web_search_tool, file_read_tool],
    )

    task = Task(description="Research AI trends", agent=agent)

    return Crew(agents=[agent], tasks=[task])

crew = create_crew()
crew.kickoff()

Microsoft Agent Framework + FunctionMiddleware

Section titled “Microsoft Agent Framework + FunctionMiddleware”
using Microsoft.Agent.Framework;
using AgentGovernance.Policy;

var kernel = new GovernanceKernel(new GovernanceOptions
{
    PolicyPaths = new() { "policies/default.yaml" },
    EnablePromptInjectionDetection = true,
    EnableSensitiveDataDetection = true,
});

var middleware = new FunctionMiddleware(kernel);

// Register with Semantic Kernel
kernel.ImportPlugin(middleware);

// All function calls now pass through governance
var result = await kernel.InvokeAsync("web_search", "AI trends");
name: Agent Security Scan

on: [push, pull_request]

jobs:
  security-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Install governance toolkit
        run: pip install agent-governance-toolkit[full]

      - name: Scan agent code
        run: |
          agent-compliance verify --json > report.json

      - name: Generate badge
        run: agent-compliance verify --badge > GOVERNANCE_BADGE.md

      - name: Check OWASP coverage
        run: |
          COVERAGE=$(jq '.overall_score' report.json)
          if [[ "$COVERAGE" != "10/10" && "$COVERAGE" != "9/10" ]]; then
            echo "OWASP coverage below threshold: $COVERAGE"
            exit 1
          fi

      - name: Upload artifact
        uses: actions/upload-artifact@v3
        with:
          name: governance-report
          path: report.json
#!/bin/bash
# .git/hooks/pre-commit

FILES=$(git diff --cached --name-only | grep -E "\.yaml$")

for file in $FILES; do
    echo "Validating policy: $file"
    agent-compliance verify "$file" || exit 1
done

exit 0
#RiskMechanismStatus
ASI-01Prompt InjectionPolicyEngine + PromptInjectionDetector + MCP scanning9/10
ASI-02Insecure Output HandlingCodeValidator + DriftDetector9/10
ASI-03Training Data PoisoningMemoryGuard + ContentHashInterceptor9/10
ASI-04Model Denial of ServiceTokenBudgetTracker + RateLimiter + circuit breakers9/10
ASI-05Supply Chain VulnerabilitiesSBOM + Ed25519 signing + MCPFingerprinting9/10
ASI-06Sensitive Information DisclosurePII patterns + secret detection + egress policy9/10
ASI-07Insecure Plugin DesignMCPGateway + schema validation + rug-pull detection9/10
ASI-08Excessive AgencyExecutionRings + kill switch + rogue detection9/10
ASI-09Overreliance on Agent OutputDriftDetector + confidence threshold9/10
ASI-10Model TheftOut of scope (model-level, not agent-level)0/10
TOTALCovers 9 of 10 OWASP risks9/10
from agent_os.marketplace import MCPSecurityScanner

scanner = MCPSecurityScanner()

# Scan tool definitions for threats
findings = scanner.scan(
    tool_name="suspicious_tool",
    schema={
        "type": "object",
        "properties": {
            "command": {"type": "string"},
            "api_key": {"type": "string", "description": "Never ask for this"}
        }
    }
)

if findings.has_rug_pull_patterns:
    print("WARNING: Rug-pull detection triggered!")
    print(f"Issues: {findings.issues}")

if findings.has_typosquatting:
    print("Tool name matches known typosquat target")

if findings.has_hidden_instructions:
    print("Detected hidden instructions in schema")
# Layer multiple controls
governance = PolicyEngine(
    capabilities=CapabilityModel(
        allowed_tools=["web_search"],
        blocked_patterns=[r"password", r"api.?key"],
        max_tool_calls=10,
        require_human_approval=True,
    ),
    enable_injection_detection=True,
    enable_pii_detection=True,
    enable_code_validation=True,
)
# Assign agents to lowest required ring
policies:
  untrusted_agent:
    execution_ring: 3  # Sandbox: no filesystem, no network
    allowed_tools: []  # No tools

  researcher_agent:
    execution_ring: 2  # User: limited tools
    allowed_tools: [web_search, read_file]

  system_agent:
    execution_ring: 1  # System: database, APIs
    allowed_tools: [database_query, api_call]
from agent_os.audit import AuditLogger

audit = AuditLogger(
    storage="elasticsearch",  # Persistent audit trail
    include_params=False,      # Don't log sensitive data
    structured_logging=True,
)

engine = PolicyEngine(
    capabilities=capabilities,
    audit_logger=audit,
)

# Every decision logged with timestamp, agent ID, action
from agentmesh.trust import TrustMonitor

monitor = TrustMonitor()

# Alert if agent's trust drops
monitor.watch(
    agent_id="researcher-1",
    min_trust_score=400,
    alert_webhook="https://slack.com/hooks/...",
)

# Automatic remediation: demote to sandbox if trust < 200
@monitor.on_low_trust
def demote_to_sandbox(agent_id):
    runtime.assign_ring(agent_id, ExecutionRing.SANDBOX)
# Schedule weekly integrity verification
0 0 * * 0 agent-compliance integrity --verify

# Rotate agent credentials monthly
0 0 1 * * for agent in $(agent-mesh list --all); do
  agentmesh rotate-credentials "$agent"
done

# Generate monthly governance report
0 0 1 * * agent-compliance verify --json > reports/$(date +%Y-%m).json
IssueSolution
Policy not enforcedEnsure PolicyEngine is wired into framework integration (LangChain callback, CrewAI decorator)
Tool calls timeoutCheck max_tokens_per_call and rate_limiter settings; increase limits if valid
High false positives on PIITune regex patterns in blocked_patterns; use allowlist for known safe patterns
Agent demoted to sandboxCheck trust score: agentmesh trust report <agent-id>; restore credentials if expired
Module integrity failureVerify Python version (3.10+); re-run scripts/patch-datastore.mjs if using Astro build