O Agent Governance Toolkit de código aberto da Microsoft fornece governança de segurança em tempo de execução para agentes de IA. Execução de política determinística, identidade zero-trust, anéis de execução e verificação de conformidade em todos os 10 riscos OWASP Agentic. Funciona com LangChain, CrewAI, AutoGen, Anthropic e muito mais.
GitHub: https://github.com/microsoft/agent-governance-toolkit
License: MIT
Languages: Python, TypeScript, .NET, Rust, Go
# Instalação completa com todos os componentes
pip install agent-governance-toolkit[full]
# Componentes individuais (à la carte)
pip install agent-os-kernel # Policy engine
pip install agentmesh-platform # Zero-trust identity & trust mesh
pip install agentmesh-runtime # Runtime supervisor & privilege rings
pip install agent-sre # SRE toolkit (SLOs, error budgets)
pip install agent-governance-toolkit # Compliance & attestation
pip install agentmesh-marketplace # Plugin lifecycle management
pip install agentmesh-lightning # RL training governance
npm install @agentmesh/sdk
dotnet add package Microsoft.AgentGovernance
cargo add agentmesh
go get github.com/microsoft/agent-governance-toolkit/packages/agent-mesh/sdks/go
from agent_os.policy import PolicyEngine, CapabilityModel
# Definir capacidades de agente
capabilities = CapabilityModel(
allowed_tools=["web_search", "read_file", "send_email"],
denied_tools=["delete_file", "execute_shell"],
blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"], # Block SSN
max_tool_calls=10,
max_tokens_per_call=4096,
require_human_approval=True
)
# Criar mecanismo de política
engine = PolicyEngine(capabilities=capabilities)
# Avaliar ações
result = engine.evaluate(
agent_id="researcher-1",
action="web_search",
input_text="latest security news"
)
print(f"Allowed: {result.allowed}")
print(f"Reason: {result.reason}")
import { PolicyEngine, AgentIdentity, AuditLogger } from "@agentmesh/sdk";
// Gerar identidade criptográfica
const identity = AgentIdentity.generate("my-agent", ["web_search", "read_file"]);
// Criar mecanismo de política
const engine = new PolicyEngine([
{ action: "web_search", effect: "allow" },
{ action: "read_file", effect: "allow" },
{ action: "delete_file", effect: "deny" },
{ action: "shell_exec", effect: "deny" },
]);
const decision = engine.evaluate("web_search"); // "allow"
const denied = engine.evaluate("delete_file"); // "deny"
| Component | Purpose | Key Features |
|---|
| Agent OS | Mecanismo de política & adaptadores de framework | Sub-millisecond evaluation, regex/semantic detection |
| Agent Mesh | Identidade zero-trust & pontuação de confiança | Ed25519 signatures, SPIFFE/SVID, 0–1000 trust scale |
| Agent Runtime | Supervisor de execução & sandboxing | 4-tier privilege rings, kill switch, saga orchestration |
| Agent SRE | Engenharia de confiabilidade | SLOs, error budgets, circuit breakers, replay debugging |
| Agent Compliance | Verificação OWASP & atestado | Badge generation, JSON audit trails, integrity checks |
| Agent Marketplace | Gerenciamento de ciclo de vida de plug-in | MCP security scanning, rug-pull detection |
| Agent Lightning | Governança de treinamento RL | Training data validation, model drift detection |
policies:
researcher_agent:
allowed_tools:
- web_search
- read_file
- database_query
denied_tools:
- execute_code
- delete_database
- modify_system
# Bloquear padrões sensíveis (SSN, API keys, emails)
blocked_patterns:
- "\\b\\d{3}-\\d{2}-\\d{4}\\b" # SSN
- "[Aa][Pp][Ii][_-]?[Kk][Ee][Yy]" # API key
- "\\S+@\\S+\\.\\S+" # Email (PII)
# Limites de recurso
max_tool_calls: 20
max_tokens_per_call: 8192
max_concurrent_calls: 5
# Atribuição de anel de privilégio
execution_ring: 2 # 0=kernel, 1=system, 2=user, 3=sandbox
# Requer aprovação para certas ações
require_human_approval_for:
- send_email
- modify_database
- external_api_call
# Limites de confiança
min_trust_score: 500 # 0–1000 scale
# Validação de saída
enable_prompt_injection_detection: true
enable_sensitive_data_detection: true
enable_code_validation: true
from agent_os.policy import PolicyEngine, CapabilityModel
# Carregar política de YAML
engine = PolicyEngine.from_yaml("policies.yaml")
# Avaliar chamadas de ferramentas
decision = engine.evaluate(
agent_id="researcher-1",
action="tool_call",
tool="web_search",
params={"query": "security trends"}
)
if not decision.allowed:
print(f"Blocked: {decision.reason}")
else:
print("Proceeding with tool call...")
from agent_os import PolicyEngine, CapabilityModel
from agent_os.integrations import LangChainIntegration
# Definir capacidades
capabilities = CapabilityModel(
allowed_tools=["web_search", "calculator"],
max_tool_calls=10
)
# Criar mecanismo de política
engine = PolicyEngine(capabilities=capabilities)
# Para agentes LangChain
from langchain.agents import initialize_agent
governed_agent = LangChainIntegration(
agent=your_langchain_agent,
policy_engine=engine,
audit_log=True
)
# Cada chamada de ferramenta agora é interceptada e avaliada
result = governed_agent.run("What is 2+2?")
# 1. Verificação de lista de permissões/blocklist de ferramenta
✓ Is tool in allowed_tools list?
✓ Is tool NOT in denied_tools list?
# 2. Correspondência de padrão (injeções, PII, segredos)
✓ No prompt injection patterns detected
✓ No SSN, API keys, or PII in parameters
✓ No SQL injection or code execution payloads
# 3. Restrições de recurso
✓ Token usage within limits
✓ Concurrent call limit not exceeded
✓ Rate limiting not triggered
# 4. Verificação de anel de privilégio
✓ Agent has required privilege level
✓ Tool operates within agent's ring tier
# 5. Pontuação de confiança
✓ Agent meets minimum trust score
✓ No anomalous behavior detected
# 6. Aprovação humana (se necessário)
✓ Sensitive action approved by human
import { AgentIdentity, TrustCard } from "@agentmesh/sdk";
// Gerar identidade criptográfica
const identity = AgentIdentity.generate(
"researcher-agent",
["web_search", "read_file"]
);
console.log(identity.did); // did:mesh:agent-xxxxx
console.log(identity.publicKeyPEM); // Ed25519 public key
console.log(identity.allowedCapabilities); // ["web_search", "read_file"]
// Assinar comunicação de saída
const signature = identity.sign("outgoing message");
// Verificar identidade de agente par
const isValid = identity.verify(peerPublicKey, message, signature);
from agentmesh.trust import TrustScorer
scorer = TrustScorer()
# Componentes da pontuação de confiança:
trust_score = scorer.compute(
agent_id="agent-1",
factors={
"success_rate": 0.95, # 95% of tasks succeed
"error_budget_remaining": 0.8, # 80% error budget left
"last_violation_age_hours": 72, # Last violation 3 days ago
"api_key_rotation_days_ago": 30, # Keys rotated 30 days ago
"audit_log_completeness": 1.0, # Full audit trail
}
)
print(f"Trust score: {trust_score} / 1000")
# Camadas de confiança
# 0–200: Untrusted (sandbox only)
# 200–400: Low trust (limited tools)
# 400–600: Medium trust (standard tools)
# 600–800: High trust (privileged tools)
# 800–1000: Maximum trust (admin capabilities)
┌─────────────────────────────────┐
│ Ring 0: Kernel │ Filesystem, system calls, process control
├─────────────────────────────────┤
│ Ring 1: System │ Database, API gateways, secrets manager
├─────────────────────────────────┤
│ Ring 2: User │ Web search, internal APIs, file read
├─────────────────────────────────┤
│ Ring 3: Sandbox │ No outbound access, isolated execution
└─────────────────────────────────┘
from agent_os.runtime import ExecutionRing, AgentRuntime
runtime = AgentRuntime()
# Atribuir agente a anel com base em confiança
runtime.assign_ring(agent_id="trusted-agent", ring=ExecutionRing.SYSTEM)
runtime.assign_ring(agent_id="untrusted-agent", ring=ExecutionRing.SANDBOX)
# Aplicar anel durante execução
@runtime.enforce_ring
def execute_tool(agent_id, tool_name, params):
# Esta função executa apenas se o agente tem privilégio para anel
return tool_name(params)
# Interruptor de morte para agentes desobnestos
runtime.terminate_agent("agent-123", reason="Excessive tool calls")
# Gerar relatório de verificação
agent-compliance verify
# Saída como JSON (para CI/CD)
agent-compliance verify --json
# Gerar badge para README
agent-compliance verify --badge
# Verificar integridade do módulo (assinaturas Ed25519)
agent-compliance integrity --verify
{
"version": "1.0",
"timestamp": "2026-04-04T12:00:00Z",
"coverage": {
"ASI-01": {"status": "covered", "mechanism": "PolicyEngine"},
"ASI-02": {"status": "covered", "mechanism": "MCPGateway"},
"ASI-03": {"status": "covered", "mechanism": "MemoryGuard"},
"ASI-04": {"status": "covered", "mechanism": "RateLimiter"},
"ASI-05": {"status": "covered", "mechanism": "SupplyChainGuard"},
"ASI-06": {"status": "covered", "mechanism": "PII Detection"},
"ASI-07": {"status": "covered", "mechanism": "MCPSecurityScanner"},
"ASI-08": {"status": "covered", "mechanism": "ExecutionRings"},
"ASI-09": {"status": "covered", "mechanism": "DriftDetector"},
"ASI-10": {"status": "out_of_scope", "reason": "Model-level, not agent-level"}
},
"overall_score": "9/10"
}
# Gerar hashes de integridade para módulos
agent-compliance integrity --generate
# Verificar se nenhuma adulteração foi detectada
agent-compliance integrity --verify
# Saída como badge
agent-compliance integrity --badge
from agent_os.integrations import LangChainIntegration
from agent_os.policy import PolicyEngine, CapabilityModel
from langchain.agents import initialize_agent
from langchain.tools import Tool
# Criar ferramentas
tools = [
Tool(name="web_search", func=search_fn, description="Search web"),
Tool(name="calculator", func=calc_fn, description="Calculate"),
]
# Inicializar agente governado
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")
# Envolver com governança
capabilities = CapabilityModel(allowed_tools=["web_search"])
governed = LangChainIntegration(agent, PolicyEngine(capabilities))
# Chamadas de ferramentas agora são aplicadas
result = governed.run("What is 2+2?")
from crewai import Agent, Task, Crew
from agent_os.integrations import CrewAIDecorator
from agent_os.policy import PolicyEngine, CapabilityModel
@CrewAIDecorator(
policy_engine=PolicyEngine(
CapabilityModel(allowed_tools=["web_search", "file_read"])
)
)
def create_crew():
agent = Agent(
role="Researcher",
tools=[web_search_tool, file_read_tool],
)
task = Task(description="Research AI trends", agent=agent)
return Crew(agents=[agent], tasks=[task])
crew = create_crew()
crew.kickoff()
using Microsoft.Agent.Framework;
using AgentGovernance.Policy;
var kernel = new GovernanceKernel(new GovernanceOptions
{
PolicyPaths = new() { "policies/default.yaml" },
EnablePromptInjectionDetection = true,
EnableSensitiveDataDetection = true,
});
var middleware = new FunctionMiddleware(kernel);
// Registrar com Semantic Kernel
kernel.ImportPlugin(middleware);
// Todas as chamadas de função agora passam por governança
var result = await kernel.InvokeAsync("web_search", "AI trends");
name: Agent Security Scan
on: [push, pull_request]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install governance toolkit
run: pip install agent-governance-toolkit[full]
- name: Scan agent code
run: |
agent-compliance verify --json > report.json
- name: Generate badge
run: agent-compliance verify --badge > GOVERNANCE_BADGE.md
- name: Check OWASP coverage
run: |
COVERAGE=$(jq '.overall_score' report.json)
if [[ "$COVERAGE" != "10/10" && "$COVERAGE" != "9/10" ]]; then
echo "OWASP coverage below threshold: $COVERAGE"
exit 1
fi
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: governance-report
path: report.json
#!/bin/bash
# .git/hooks/pre-commit
FILES=$(git diff --cached --name-only | grep -E "\.yaml$")
for file in $FILES; do
echo "Validating policy: $file"
agent-compliance verify "$file" || exit 1
done
exit 0
| # | Risk | Mechanism | Status |
|---|
| ASI-01 | Injeção de Prompt | PolicyEngine + PromptInjectionDetector + MCP scanning | 9/10 |
| ASI-02 | Tratamento de Saída Inseguro | CodeValidator + DriftDetector | 9/10 |
| ASI-03 | Envenenamento de Dados de Treinamento | MemoryGuard + ContentHashInterceptor | 9/10 |
| ASI-04 | Negação de Serviço de Modelo | TokenBudgetTracker + RateLimiter + circuit breakers | 9/10 |
| ASI-05 | Vulnerabilidades da Cadeia de Suprimentos | SBOM + Ed25519 signing + MCPFingerprinting | 9/10 |
| ASI-06 | Divulgação de Informações Sensíveis | PII patterns + secret detection + egress policy | 9/10 |
| ASI-07 | Projeto de Plug-in Inseguro | MCPGateway + schema validation + rug-pull detection | 9/10 |
| ASI-08 | Agência Excessiva | ExecutionRings + kill switch + rogue detection | 9/10 |
| ASI-09 | Confiança Excessiva na Saída do Agente | DriftDetector + confidence threshold | 9/10 |
| ASI-10 | Roubo de Modelo | Fora do escopo (model-level, not agent-level) | 0/10 |
| TOTAL | | Cobre 9 de 10 riscos OWASP | 9/10 |
from agent_os.marketplace import MCPSecurityScanner
scanner = MCPSecurityScanner()
# Escanear definições de ferramentas para ameaças
findings = scanner.scan(
tool_name="suspicious_tool",
schema={
"type": "object",
"properties": {
"command": {"type": "string"},
"api_key": {"type": "string", "description": "Never ask for this"}
}
}
)
if findings.has_rug_pull_patterns:
print("WARNING: Rug-pull detection triggered!")
print(f"Issues: {findings.issues}")
if findings.has_typosquatting:
print("Tool name matches known typosquat target")
if findings.has_hidden_instructions:
print("Detected hidden instructions in schema")
# Camadas de controles múltiplos
governance = PolicyEngine(
capabilities=CapabilityModel(
allowed_tools=["web_search"],
blocked_patterns=[r"password", r"api.?key"],
max_tool_calls=10,
require_human_approval=True,
),
enable_injection_detection=True,
enable_pii_detection=True,
enable_code_validation=True,
)
# Atribuir agentes ao anel mínimo necessário
policies:
untrusted_agent:
execution_ring: 3 # Sandbox: no filesystem, no network
allowed_tools: [] # No tools
researcher_agent:
execution_ring: 2 # User: limited tools
allowed_tools: [web_search, read_file]
system_agent:
execution_ring: 1 # System: database, APIs
allowed_tools: [database_query, api_call]
from agent_os.audit import AuditLogger
audit = AuditLogger(
storage="elasticsearch", # Persistent audit trail
include_params=False, # Don't log sensitive data
structured_logging=True,
)
engine = PolicyEngine(
capabilities=capabilities,
audit_logger=audit,
)
# Cada decisão registrada com timestamp, agent ID, action
from agentmesh.trust import TrustMonitor
monitor = TrustMonitor()
# Alerta se a confiança do agente cair
monitor.watch(
agent_id="researcher-1",
min_trust_score=400,
alert_webhook="https://slack.com/hooks/...",
)
# Remediação automática: rebaixar para sandbox se confiança < 200
@monitor.on_low_trust
def demote_to_sandbox(agent_id):
runtime.assign_ring(agent_id, ExecutionRing.SANDBOX)
# Agendar verificação de integridade semanal
0 0 * * 0 agent-compliance integrity --verify
# Girar credenciais de agente mensalmente
0 0 1 * * for agent in $(agent-mesh list --all); do
agentmesh rotate-credentials "$agent"
done
# Gerar relatório de governança mensal
0 0 1 * * agent-compliance verify --json > reports/$(date +%Y-%m).json
| Issue | Solution |
|---|
| Política não é aplicada | Verificar se PolicyEngine está conectada à integração de framework (callback LangChain, decorator CrewAI) |
| Chamadas de ferramenta com timeout | Verificar configurações max_tokens_per_call e rate_limiter; aumentar limites se válido |
| Alto número de falsos positivos em PII | Ajustar padrões regex em blocked_patterns; usar lista de permissões para padrões conhecidos como seguros |
| Agente rebaixado para sandbox | Verificar pontuação de confiança: agentmesh trust report ; restaurar credenciais se expiradas |
| Falha de integridade do módulo | Verificar versão do Python (3.10+); re-executar scripts/patch-datastore.mjs se usar build Astro |