La boîte à outils de gouvernance des agents open-source de Microsoft fournit une gouvernance de sécurité à l’exécution pour les agents IA. Application déterministe des politiques, identité zéro confiance, anneaux d’exécution et vérification de conformité sur les 10 risques OWASP Agentic. Fonctionne avec LangChain, CrewAI, AutoGen, Anthropic, etc.
GitHub: https://github.com/microsoft/agent-governance-toolkit
License: MIT
Languages: Python, TypeScript, .NET, Rust, Go
# Full installation with all components
pip install agent-governance-toolkit[full]
# Individual components (à la carte)
pip install agent-os-kernel # Policy engine
pip install agentmesh-platform # Zero-trust identity & trust mesh
pip install agentmesh-runtime # Runtime supervisor & privilege rings
pip install agent-sre # SRE toolkit (SLOs, error budgets)
pip install agent-governance-toolkit # Compliance & attestation
pip install agentmesh-marketplace # Plugin lifecycle management
pip install agentmesh-lightning # RL training governance
npm install @agentmesh/sdk
dotnet add package Microsoft.AgentGovernance
cargo add agentmesh
go get github.com/microsoft/agent-governance-toolkit/packages/agent-mesh/sdks/go
from agent_os.policy import PolicyEngine, CapabilityModel
# Define agent capabilities
capabilities = CapabilityModel(
allowed_tools=["web_search", "read_file", "send_email"],
denied_tools=["delete_file", "execute_shell"],
blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"], # Block SSN
max_tool_calls=10,
max_tokens_per_call=4096,
require_human_approval=True
)
# Create policy engine
engine = PolicyEngine(capabilities=capabilities)
# Evaluate actions
result = engine.evaluate(
agent_id="researcher-1",
action="web_search",
input_text="latest security news"
)
print(f"Allowed: {result.allowed}")
print(f"Reason: {result.reason}")
import { PolicyEngine, AgentIdentity, AuditLogger } from "@agentmesh/sdk";
// Generate cryptographic identity
const identity = AgentIdentity.generate("my-agent", ["web_search", "read_file"]);
// Create policy engine
const engine = new PolicyEngine([
{ action: "web_search", effect: "allow" },
{ action: "read_file", effect: "allow" },
{ action: "delete_file", effect: "deny" },
{ action: "shell_exec", effect: "deny" },
]);
const decision = engine.evaluate("web_search"); // "allow"
const denied = engine.evaluate("delete_file"); // "deny"
| Component | Purpose | Key Features |
|---|
| Agent OS | Policy engine & framework adapters | Sub-millisecond evaluation, regex/semantic detection |
| Agent Mesh | Zero-trust identity & trust scoring | Ed25519 signatures, SPIFFE/SVID, 0–1000 trust scale |
| Agent Runtime | Execution supervisor & sandboxing | 4-tier privilege rings, kill switch, saga orchestration |
| Agent SRE | Reliability engineering | SLOs, error budgets, circuit breakers, replay debugging |
| Agent Compliance | OWASP verification & attestation | Badge generation, JSON audit trails, integrity checks |
| Agent Marketplace | Plugin lifecycle management | MCP security scanning, rug-pull detection |
| Agent Lightning | RL training governance | Training data validation, model drift detection |
policies:
researcher_agent:
allowed_tools:
- web_search
- read_file
- database_query
denied_tools:
- execute_code
- delete_database
- modify_system
# Block sensitive patterns (SSN, API keys, emails)
blocked_patterns:
- "\\b\\d{3}-\\d{2}-\\d{4}\\b" # SSN
- "[Aa][Pp][Ii][_-]?[Kk][Ee][Yy]" # API key
- "\\S+@\\S+\\.\\S+" # Email (PII)
# Resource limits
max_tool_calls: 20
max_tokens_per_call: 8192
max_concurrent_calls: 5
# Privilege ring assignment
execution_ring: 2 # 0=kernel, 1=system, 2=user, 3=sandbox
# Require approval for certain actions
require_human_approval_for:
- send_email
- modify_database
- external_api_call
# Trust thresholds
min_trust_score: 500 # 0–1000 scale
# Output validation
enable_prompt_injection_detection: true
enable_sensitive_data_detection: true
enable_code_validation: true
from agent_os.policy import PolicyEngine, CapabilityModel
# Load policy from YAML
engine = PolicyEngine.from_yaml("policies.yaml")
# Evaluate tool calls
decision = engine.evaluate(
agent_id="researcher-1",
action="tool_call",
tool="web_search",
params={"query": "security trends"}
)
if not decision.allowed:
print(f"Blocked: {decision.reason}")
else:
print("Proceeding with tool call...")
from agent_os import PolicyEngine, CapabilityModel
from agent_os.integrations import LangChainIntegration
# Define capabilities
capabilities = CapabilityModel(
allowed_tools=["web_search", "calculator"],
max_tool_calls=10
)
# Create policy engine
engine = PolicyEngine(capabilities=capabilities)
# For LangChain agents
from langchain.agents import initialize_agent
governed_agent = LangChainIntegration(
agent=your_langchain_agent,
policy_engine=engine,
audit_log=True
)
# Every tool call is now intercepted and evaluated
result = governed_agent.run("What is 2+2?")
# 1. Tool allowlist/blocklist check
✓ Is tool in allowed_tools list?
✓ Is tool NOT in denied_tools list?
# 2. Pattern matching (injections, PII, secrets)
✓ No prompt injection patterns detected
✓ No SSN, API keys, or PII in parameters
✓ No SQL injection or code execution payloads
# 3. Resource constraints
✓ Token usage within limits
✓ Concurrent call limit not exceeded
✓ Rate limiting not triggered
# 4. Privilege ring verification
✓ Agent has required privilege level
✓ Tool operates within agent's ring tier
# 5. Trust scoring
✓ Agent meets minimum trust score
✓ No anomalous behavior detected
# 6. Human approval (if required)
✓ Sensitive action approved by human
import { AgentIdentity, TrustCard } from "@agentmesh/sdk";
// Generate cryptographic identity
const identity = AgentIdentity.generate(
"researcher-agent",
["web_search", "read_file"]
);
console.log(identity.did); // did:mesh:agent-xxxxx
console.log(identity.publicKeyPEM); // Ed25519 public key
console.log(identity.allowedCapabilities); // ["web_search", "read_file"]
// Sign outbound communication
const signature = identity.sign("outgoing message");
// Verify peer agent identity
const isValid = identity.verify(peerPublicKey, message, signature);
from agentmesh.trust import TrustScorer
scorer = TrustScorer()
# Components of trust score:
trust_score = scorer.compute(
agent_id="agent-1",
factors={
"success_rate": 0.95, # 95% of tasks succeed
"error_budget_remaining": 0.8, # 80% error budget left
"last_violation_age_hours": 72, # Last violation 3 days ago
"api_key_rotation_days_ago": 30, # Keys rotated 30 days ago
"audit_log_completeness": 1.0, # Full audit trail
}
)
print(f"Trust score: {trust_score} / 1000")
# Trust tiers
# 0–200: Untrusted (sandbox only)
# 200–400: Low trust (limited tools)
# 400–600: Medium trust (standard tools)
# 600–800: High trust (privileged tools)
# 800–1000: Maximum trust (admin capabilities)
┌─────────────────────────────────┐
│ Ring 0: Kernel │ Filesystem, system calls, process control
├─────────────────────────────────┤
│ Ring 1: System │ Database, API gateways, secrets manager
├─────────────────────────────────┤
│ Ring 2: User │ Web search, internal APIs, file read
├─────────────────────────────────┤
│ Ring 3: Sandbox │ No outbound access, isolated execution
└─────────────────────────────────┘
from agent_os.runtime import ExecutionRing, AgentRuntime
runtime = AgentRuntime()
# Assign agent to ring based on trust
runtime.assign_ring(agent_id="trusted-agent", ring=ExecutionRing.SYSTEM)
runtime.assign_ring(agent_id="untrusted-agent", ring=ExecutionRing.SANDBOX)
# Enforce ring during execution
@runtime.enforce_ring
def execute_tool(agent_id, tool_name, params):
# This function executes only if agent has privilege for ring
return tool_name(params)
# Kill switch for rogue agents
runtime.terminate_agent("agent-123", reason="Excessive tool calls")
# Generate verification report
agent-compliance verify
# Output as JSON (for CI/CD)
agent-compliance verify --json
# Generate badge for README
agent-compliance verify --badge
# Verify module integrity (Ed25519 signatures)
agent-compliance integrity --verify
{
"version": "1.0",
"timestamp": "2026-04-04T12:00:00Z",
"coverage": {
"ASI-01": {"status": "covered", "mechanism": "PolicyEngine"},
"ASI-02": {"status": "covered", "mechanism": "MCPGateway"},
"ASI-03": {"status": "covered", "mechanism": "MemoryGuard"},
"ASI-04": {"status": "covered", "mechanism": "RateLimiter"},
"ASI-05": {"status": "covered", "mechanism": "SupplyChainGuard"},
"ASI-06": {"status": "covered", "mechanism": "PII Detection"},
"ASI-07": {"status": "covered", "mechanism": "MCPSecurityScanner"},
"ASI-08": {"status": "covered", "mechanism": "ExecutionRings"},
"ASI-09": {"status": "covered", "mechanism": "DriftDetector"},
"ASI-10": {"status": "out_of_scope", "reason": "Model-level, not agent-level"}
},
"overall_score": "9/10"
}
# Generate integrity hashes for modules
agent-compliance integrity --generate
# Verify no tampering detected
agent-compliance integrity --verify
# Output as badge
agent-compliance integrity --badge
from agent_os.integrations import LangChainIntegration
from agent_os.policy import PolicyEngine, CapabilityModel
from langchain.agents import initialize_agent
from langchain.tools import Tool
# Create tools
tools = [
Tool(name="web_search", func=search_fn, description="Search web"),
Tool(name="calculator", func=calc_fn, description="Calculate"),
]
# Initialize governed agent
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")
# Wrap with governance
capabilities = CapabilityModel(allowed_tools=["web_search"])
governed = LangChainIntegration(agent, PolicyEngine(capabilities))
# Tool calls now enforced
result = governed.run("What is 2+2?")
from crewai import Agent, Task, Crew
from agent_os.integrations import CrewAIDecorator
from agent_os.policy import PolicyEngine, CapabilityModel
@CrewAIDecorator(
policy_engine=PolicyEngine(
CapabilityModel(allowed_tools=["web_search", "file_read"])
)
)
def create_crew():
agent = Agent(
role="Researcher",
tools=[web_search_tool, file_read_tool],
)
task = Task(description="Research AI trends", agent=agent)
return Crew(agents=[agent], tasks=[task])
crew = create_crew()
crew.kickoff()
using Microsoft.Agent.Framework;
using AgentGovernance.Policy;
var kernel = new GovernanceKernel(new GovernanceOptions
{
PolicyPaths = new() { "policies/default.yaml" },
EnablePromptInjectionDetection = true,
EnableSensitiveDataDetection = true,
});
var middleware = new FunctionMiddleware(kernel);
// Register with Semantic Kernel
kernel.ImportPlugin(middleware);
// All function calls now pass through governance
var result = await kernel.InvokeAsync("web_search", "AI trends");
name: Agent Security Scan
on: [push, pull_request]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install governance toolkit
run: pip install agent-governance-toolkit[full]
- name: Scan agent code
run: |
agent-compliance verify --json > report.json
- name: Generate badge
run: agent-compliance verify --badge > GOVERNANCE_BADGE.md
- name: Check OWASP coverage
run: |
COVERAGE=$(jq '.overall_score' report.json)
if [[ "$COVERAGE" != "10/10" && "$COVERAGE" != "9/10" ]]; then
echo "OWASP coverage below threshold: $COVERAGE"
exit 1
fi
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: governance-report
path: report.json
#!/bin/bash
# .git/hooks/pre-commit
FILES=$(git diff --cached --name-only | grep -E "\.yaml$")
for file in $FILES; do
echo "Validating policy: $file"
agent-compliance verify "$file" || exit 1
done
exit 0
| # | Risk | Mechanism | Status |
|---|
| ASI-01 | Prompt Injection | PolicyEngine + PromptInjectionDetector + MCP scanning | 9/10 |
| ASI-02 | Insecure Output Handling | CodeValidator + DriftDetector | 9/10 |
| ASI-03 | Training Data Poisoning | MemoryGuard + ContentHashInterceptor | 9/10 |
| ASI-04 | Model Denial of Service | TokenBudgetTracker + RateLimiter + circuit breakers | 9/10 |
| ASI-05 | Supply Chain Vulnerabilities | SBOM + Ed25519 signing + MCPFingerprinting | 9/10 |
| ASI-06 | Sensitive Information Disclosure | PII patterns + secret detection + egress policy | 9/10 |
| ASI-07 | Insecure Plugin Design | MCPGateway + schema validation + rug-pull detection | 9/10 |
| ASI-08 | Excessive Agency | ExecutionRings + kill switch + rogue detection | 9/10 |
| ASI-09 | Overreliance on Agent Output | DriftDetector + confidence threshold | 9/10 |
| ASI-10 | Model Theft | Out of scope (model-level, not agent-level) | 0/10 |
| TOTAL | | Covers 9 of 10 OWASP risks | 9/10 |
from agent_os.marketplace import MCPSecurityScanner
scanner = MCPSecurityScanner()
# Scan tool definitions for threats
findings = scanner.scan(
tool_name="suspicious_tool",
schema={
"type": "object",
"properties": {
"command": {"type": "string"},
"api_key": {"type": "string", "description": "Never ask for this"}
}
}
)
if findings.has_rug_pull_patterns:
print("WARNING: Rug-pull detection triggered!")
print(f"Issues: {findings.issues}")
if findings.has_typosquatting:
print("Tool name matches known typosquat target")
if findings.has_hidden_instructions:
print("Detected hidden instructions in schema")
# Layer multiple controls
governance = PolicyEngine(
capabilities=CapabilityModel(
allowed_tools=["web_search"],
blocked_patterns=[r"password", r"api.?key"],
max_tool_calls=10,
require_human_approval=True,
),
enable_injection_detection=True,
enable_pii_detection=True,
enable_code_validation=True,
)
# Assign agents to lowest required ring
policies:
untrusted_agent:
execution_ring: 3 # Sandbox: no filesystem, no network
allowed_tools: [] # No tools
researcher_agent:
execution_ring: 2 # User: limited tools
allowed_tools: [web_search, read_file]
system_agent:
execution_ring: 1 # System: database, APIs
allowed_tools: [database_query, api_call]
from agent_os.audit import AuditLogger
audit = AuditLogger(
storage="elasticsearch", # Persistent audit trail
include_params=False, # Don't log sensitive data
structured_logging=True,
)
engine = PolicyEngine(
capabilities=capabilities,
audit_logger=audit,
)
# Every decision logged with timestamp, agent ID, action
from agentmesh.trust import TrustMonitor
monitor = TrustMonitor()
# Alert if agent's trust drops
monitor.watch(
agent_id="researcher-1",
min_trust_score=400,
alert_webhook="https://slack.com/hooks/...",
)
# Automatic remediation: demote to sandbox if trust < 200
@monitor.on_low_trust
def demote_to_sandbox(agent_id):
runtime.assign_ring(agent_id, ExecutionRing.SANDBOX)
# Schedule weekly integrity verification
0 0 * * 0 agent-compliance integrity --verify
# Rotate agent credentials monthly
0 0 1 * * for agent in $(agent-mesh list --all); do
agentmesh rotate-credentials "$agent"
done
# Generate monthly governance report
0 0 1 * * agent-compliance verify --json > reports/$(date +%Y-%m).json
| Issue | Solution |
|---|
| Policy not enforced | Ensure PolicyEngine is wired into framework integration (LangChain callback, CrewAI decorator) |
| Tool calls timeout | Check max_tokens_per_call and rate_limiter settings; increase limits if valid |
| High false positives on PII | Tune regex patterns in blocked_patterns; use allowlist for known safe patterns |
| Agent demoted to sandbox | Check trust score: agentmesh trust report <agent-id>; restore credentials if expired |
| Module integrity failure | Verify Python version (3.10+); re-run scripts/patch-datastore.mjs if using Astro build |