Il toolkit Microsoft Agent Governance open-source fornisce governance di sicurezza runtime per gli agenti IA. Applicazione deterministico delle politiche, identità zero-trust, execution rings e verifica di conformità in tutti i 10 rischi OWASP Agentic. Funziona con LangChain, CrewAI, AutoGen, Anthropic e altri.
GitHub: https://github.com/microsoft/agent-governance-toolkit
Licenza: MIT
Linguaggi: Python, TypeScript, .NET, Rust, Go
# Full installation with all components
pip install agent-governance-toolkit[full]
# Individual components (à la carte)
pip install agent-os-kernel # Policy engine
pip install agentmesh-platform # Zero-trust identity & trust mesh
pip install agentmesh-runtime # Runtime supervisor & privilege rings
pip install agent-sre # SRE toolkit (SLOs, error budgets)
pip install agent-governance-toolkit # Compliance & attestation
pip install agentmesh-marketplace # Plugin lifecycle management
pip install agentmesh-lightning # RL training governance
npm install @agentmesh/sdk
dotnet add package Microsoft.AgentGovernance
cargo add agentmesh
go get github.com/microsoft/agent-governance-toolkit/packages/agent-mesh/sdks/go
from agent_os.policy import PolicyEngine, CapabilityModel
# Define agent capabilities
capabilities = CapabilityModel(
allowed_tools=["web_search", "read_file", "send_email"],
denied_tools=["delete_file", "execute_shell"],
blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"], # Block SSN
max_tool_calls=10,
max_tokens_per_call=4096,
require_human_approval=True
)
# Create policy engine
engine = PolicyEngine(capabilities=capabilities)
# Evaluate actions
result = engine.evaluate(
agent_id="researcher-1",
action="web_search",
input_text="latest security news"
)
print(f"Allowed: {result.allowed}")
print(f"Reason: {result.reason}")
import { PolicyEngine, AgentIdentity, AuditLogger } from "@agentmesh/sdk";
// Generate cryptographic identity
const identity = AgentIdentity.generate("my-agent", ["web_search", "read_file"]);
// Create policy engine
const engine = new PolicyEngine([
{ action: "web_search", effect: "allow" },
{ action: "read_file", effect: "allow" },
{ action: "delete_file", effect: "deny" },
{ action: "shell_exec", effect: "deny" },
]);
const decision = engine.evaluate("web_search"); // "allow"
const denied = engine.evaluate("delete_file"); // "deny"
| Componente | Scopo | Caratteristiche chiave |
|---|
| Agent OS | Policy Engine & Framework Adapter | Valutazione sub-millisecondo, rilevamento regex/semantico |
| Agent Mesh | Identità zero-trust & Trust Scoring | Firme Ed25519, SPIFFE/SVID, scala di fiducia 0-1000 |
| Agent Runtime | Execution Supervisor & Sandboxing | Ring di privilegio a 4 livelli, kill switch, orchestrazione saga |
| Agent SRE | Ingegneria dell’affidabilità | SLO, budget di errore, circuit breaker, debug replay |
| Agent Compliance | Verifica OWASP e attestazione | Generazione badge, tracce di audit JSON, controlli di integrità |
| Agent Marketplace | Gestione del ciclo di vita dei plugin | Scansione sicurezza MCP, rilevamento rug-pull |
| Agent Lightning | Governance training RL | Convalida dati di training, rilevamento drift del modello |
policies:
researcher_agent:
allowed_tools:
- web_search
- read_file
- database_query
denied_tools:
- execute_code
- delete_database
- modify_system
# Block sensitive patterns (SSN, API keys, emails)
blocked_patterns:
- "\\b\\d{3}-\\d{2}-\\d{4}\\b" # SSN
- "[Aa][Pp][Ii][_-]?[Kk][Ee][Yy]" # API key
- "\\S+@\\S+\\.\\S+" # Email (PII)
# Resource limits
max_tool_calls: 20
max_tokens_per_call: 8192
max_concurrent_calls: 5
# Privilege ring assignment
execution_ring: 2 # 0=kernel, 1=system, 2=user, 3=sandbox
# Require approval for certain actions
require_human_approval_for:
- send_email
- modify_database
- external_api_call
# Trust thresholds
min_trust_score: 500 # 0–1000 scale
# Output validation
enable_prompt_injection_detection: true
enable_sensitive_data_detection: true
enable_code_validation: true
from agent_os.policy import PolicyEngine, CapabilityModel
# Load policy from YAML
engine = PolicyEngine.from_yaml("policies.yaml")
# Evaluate tool calls
decision = engine.evaluate(
agent_id="researcher-1",
action="tool_call",
tool="web_search",
params={"query": "security trends"}
)
if not decision.allowed:
print(f"Blocked: {decision.reason}")
else:
print("Proceeding with tool call...")
from agent_os import PolicyEngine, CapabilityModel
from agent_os.integrations import LangChainIntegration
# Define capabilities
capabilities = CapabilityModel(
allowed_tools=["web_search", "calculator"],
max_tool_calls=10
)
# Create policy engine
engine = PolicyEngine(capabilities=capabilities)
# For LangChain agents
from langchain.agents import initialize_agent
governed_agent = LangChainIntegration(
agent=your_langchain_agent,
policy_engine=engine,
audit_log=True
)
# Every tool call is now intercepted and evaluated
result = governed_agent.run("What is 2+2?")
# 1. Tool allowlist/blocklist check
✓ Is tool in allowed_tools list?
✓ Is tool NOT in denied_tools list?
# 2. Pattern matching (injections, PII, secrets)
✓ No prompt injection patterns detected
✓ No SSN, API keys, or PII in parameters
✓ No SQL injection or code execution payloads
# 3. Resource constraints
✓ Token usage within limits
✓ Concurrent call limit not exceeded
✓ Rate limiting not triggered
# 4. Privilege ring verification
✓ Agent has required privilege level
✓ Tool operates within agent's ring tier
# 5. Trust scoring
✓ Agent meets minimum trust score
✓ No anomalous behavior detected
# 6. Human approval (if required)
✓ Sensitive action approved by human
import { AgentIdentity, TrustCard } from "@agentmesh/sdk";
// Generate cryptographic identity
const identity = AgentIdentity.generate(
"researcher-agent",
["web_search", "read_file"]
);
console.log(identity.did); // did:mesh:agent-xxxxx
console.log(identity.publicKeyPEM); // Ed25519 public key
console.log(identity.allowedCapabilities); // ["web_search", "read_file"]
// Sign outbound communication
const signature = identity.sign("outgoing message");
// Verify peer agent identity
const isValid = identity.verify(peerPublicKey, message, signature);
from agentmesh.trust import TrustScorer
scorer = TrustScorer()
# Components of trust score:
trust_score = scorer.compute(
agent_id="agent-1",
factors={
"success_rate": 0.95, # 95% of tasks succeed
"error_budget_remaining": 0.8, # 80% error budget left
"last_violation_age_hours": 72, # Last violation 3 days ago
"api_key_rotation_days_ago": 30, # Keys rotated 30 days ago
"audit_log_completeness": 1.0, # Full audit trail
}
)
print(f"Trust score: {trust_score} / 1000")
# Trust tiers
# 0–200: Untrusted (sandbox only)
# 200–400: Low trust (limited tools)
# 400–600: Medium trust (standard tools)
# 600–800: High trust (privileged tools)
# 800–1000: Maximum trust (admin capabilities)
┌─────────────────────────────────┐
│ Ring 0: Kernel │ Filesystem, system calls, process control
├─────────────────────────────────┤
│ Ring 1: System │ Database, API gateways, secrets manager
├─────────────────────────────────┤
│ Ring 2: User │ Web search, internal APIs, file read
├─────────────────────────────────┤
│ Ring 3: Sandbox │ No outbound access, isolated execution
└─────────────────────────────────┘
from agent_os.runtime import ExecutionRing, AgentRuntime
runtime = AgentRuntime()
# Assign agent to ring based on trust
runtime.assign_ring(agent_id="trusted-agent", ring=ExecutionRing.SYSTEM)
runtime.assign_ring(agent_id="untrusted-agent", ring=ExecutionRing.SANDBOX)
# Enforce ring during execution
@runtime.enforce_ring
def execute_tool(agent_id, tool_name, params):
# This function executes only if agent has privilege for ring
return tool_name(params)
# Kill switch for rogue agents
runtime.terminate_agent("agent-123", reason="Excessive tool calls")
# Generate verification report
agent-compliance verify
# Output as JSON (for CI/CD)
agent-compliance verify --json
# Generate badge for README
agent-compliance verify --badge
# Verify module integrity (Ed25519 signatures)
agent-compliance integrity --verify
{
"version": "1.0",
"timestamp": "2026-04-04T12:00:00Z",
"coverage": {
"ASI-01": {"status": "covered", "mechanism": "PolicyEngine"},
"ASI-02": {"status": "covered", "mechanism": "MCPGateway"},
"ASI-03": {"status": "covered", "mechanism": "MemoryGuard"},
"ASI-04": {"status": "covered", "mechanism": "RateLimiter"},
"ASI-05": {"status": "covered", "mechanism": "SupplyChainGuard"},
"ASI-06": {"status": "covered", "mechanism": "PII Detection"},
"ASI-07": {"status": "covered", "mechanism": "MCPSecurityScanner"},
"ASI-08": {"status": "covered", "mechanism": "ExecutionRings"},
"ASI-09": {"status": "covered", "mechanism": "DriftDetector"},
"ASI-10": {"status": "out_of_scope", "reason": "Model-level, not agent-level"}
},
"overall_score": "9/10"
}
# Generate integrity hashes for modules
agent-compliance integrity --generate
# Verify no tampering detected
agent-compliance integrity --verify
# Output as badge
agent-compliance integrity --badge
from agent_os.integrations import LangChainIntegration
from agent_os.policy import PolicyEngine, CapabilityModel
from langchain.agents import initialize_agent
from langchain.tools import Tool
# Create tools
tools = [
Tool(name="web_search", func=search_fn, description="Search web"),
Tool(name="calculator", func=calc_fn, description="Calculate"),
]
# Initialize governed agent
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")
# Wrap with governance
capabilities = CapabilityModel(allowed_tools=["web_search"])
governed = LangChainIntegration(agent, PolicyEngine(capabilities))
# Tool calls now enforced
result = governed.run("What is 2+2?")
from crewai import Agent, Task, Crew
from agent_os.integrations import CrewAIDecorator
from agent_os.policy import PolicyEngine, CapabilityModel
@CrewAIDecorator(
policy_engine=PolicyEngine(
CapabilityModel(allowed_tools=["web_search", "file_read"])
)
)
def create_crew():
agent = Agent(
role="Researcher",
tools=[web_search_tool, file_read_tool],
)
task = Task(description="Research AI trends", agent=agent)
return Crew(agents=[agent], tasks=[task])
crew = create_crew()
crew.kickoff()
using Microsoft.Agent.Framework;
using AgentGovernance.Policy;
var kernel = new GovernanceKernel(new GovernanceOptions
{
PolicyPaths = new() { "policies/default.yaml" },
EnablePromptInjectionDetection = true,
EnableSensitiveDataDetection = true,
});
var middleware = new FunctionMiddleware(kernel);
// Register with Semantic Kernel
kernel.ImportPlugin(middleware);
// All function calls now pass through governance
var result = await kernel.InvokeAsync("web_search", "AI trends");
name: Agent Security Scan
on: [push, pull_request]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install governance toolkit
run: pip install agent-governance-toolkit[full]
- name: Scan agent code
run: |
agent-compliance verify --json > report.json
- name: Generate badge
run: agent-compliance verify --badge > GOVERNANCE_BADGE.md
- name: Check OWASP coverage
run: |
COVERAGE=$(jq '.overall_score' report.json)
if [[ "$COVERAGE" != "10/10" && "$COVERAGE" != "9/10" ]]; then
echo "OWASP coverage below threshold: $COVERAGE"
exit 1
fi
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: governance-report
path: report.json
#!/bin/bash
# .git/hooks/pre-commit
FILES=$(git diff --cached --name-only | grep -E "\.yaml$")
for file in $FILES; do
echo "Validating policy: $file"
agent-compliance verify "$file" || exit 1
done
exit 0
| # | Rischio | Meccanismo | Stato |
|---|
| ASI-01 | Prompt Injection | PolicyEngine + PromptInjectionDetector + MCP scanning | 9/10 |
| ASI-02 | Insecure Output Handling | CodeValidator + DriftDetector | 9/10 |
| ASI-03 | Training Data Poisoning | MemoryGuard + ContentHashInterceptor | 9/10 |
| ASI-04 | Model Denial of Service | TokenBudgetTracker + RateLimiter + circuit breakers | 9/10 |
| ASI-05 | Supply Chain Vulnerabilities | SBOM + Ed25519 signing + MCPFingerprinting | 9/10 |
| ASI-06 | Sensitive Information Disclosure | PII patterns + secret detection + egress policy | 9/10 |
| ASI-07 | Insecure Plugin Design | MCPGateway + schema validation + rug-pull detection | 9/10 |
| ASI-08 | Excessive Agency | ExecutionRings + kill switch + rogue detection | 9/10 |
| ASI-09 | Overreliance on Agent Output | DriftDetector + confidence threshold | 9/10 |
| ASI-10 | Model Theft | Out of scope (model-level, not agent-level) | 0/10 |
| TOTALE | | Copre 9 di 10 rischi OWASP | 9/10 |
from agent_os.marketplace import MCPSecurityScanner
scanner = MCPSecurityScanner()
# Scan tool definitions for threats
findings = scanner.scan(
tool_name="suspicious_tool",
schema={
"type": "object",
"properties": {
"command": {"type": "string"},
"api_key": {"type": "string", "description": "Never ask for this"}
}
}
)
if findings.has_rug_pull_patterns:
print("WARNING: Rug-pull detection triggered!")
print(f"Issues: {findings.issues}")
if findings.has_typosquatting:
print("Tool name matches known typosquat target")
if findings.has_hidden_instructions:
print("Detected hidden instructions in schema")
# Layer multiple controls
governance = PolicyEngine(
capabilities=CapabilityModel(
allowed_tools=["web_search"],
blocked_patterns=[r"password", r"api.?key"],
max_tool_calls=10,
require_human_approval=True,
),
enable_injection_detection=True,
enable_pii_detection=True,
enable_code_validation=True,
)
# Assign agents to lowest required ring
policies:
untrusted_agent:
execution_ring: 3 # Sandbox: no filesystem, no network
allowed_tools: [] # No tools
researcher_agent:
execution_ring: 2 # User: limited tools
allowed_tools: [web_search, read_file]
system_agent:
execution_ring: 1 # System: database, APIs
allowed_tools: [database_query, api_call]
from agent_os.audit import AuditLogger
audit = AuditLogger(
storage="elasticsearch", # Persistent audit trail
include_params=False, # Don't log sensitive data
structured_logging=True,
)
engine = PolicyEngine(
capabilities=capabilities,
audit_logger=audit,
)
# Every decision logged with timestamp, agent ID, action
from agentmesh.trust import TrustMonitor
monitor = TrustMonitor()
# Alert if agent's trust drops
monitor.watch(
agent_id="researcher-1",
min_trust_score=400,
alert_webhook="https://slack.com/hooks/...",
)
# Automatic remediation: demote to sandbox if trust < 200
@monitor.on_low_trust
def demote_to_sandbox(agent_id):
runtime.assign_ring(agent_id, ExecutionRing.SANDBOX)
# Schedule weekly integrity verification
0 0 * * 0 agent-compliance integrity --verify
# Rotate agent credentials monthly
0 0 1 * * for agent in $(agent-mesh list --all); do
agentmesh rotate-credentials "$agent"
done
# Generate monthly governance report
0 0 1 * * agent-compliance verify --json > reports/$(date +%Y-%m).json
| Problema | Soluzione |
|---|
| Policy not enforced | Ensure PolicyEngine is wired into framework integration (LangChain callback, CrewAI decorator) |
| Tool calls timeout | Check max_tokens_per_call and rate_limiter settings; increase limits if valid |
| High false positives on PII | Tune regex patterns in blocked_patterns; use allowlist for known safe patterns |
| Agent demoted to sandbox | Check trust score: agentmesh trust report <agent-id>; restore credentials if expired |
| Module integrity failure | Verify Python version (3.10+); re-run scripts/patch-datastore.mjs if using Astro build |