يوفر Agent Governance Toolkit مفتوح المصدر من Microsoft حوكمة أمان وقت التشغيل لوكلاء الذكاء الاصطناعي. فرض السياسة الحتمية وهوية الثقة الصفرية وحلقات التنفيذ والتحقق من الامتثال عبر جميع مخاطر OWASP Agentic العشرة. يعمل مع LangChain و CrewAI و AutoGen و Anthropic والمزيد.
GitHub: https://github.com/microsoft/agent-governance-toolkit
License: MIT
Languages: Python, TypeScript, .NET, Rust, Go
# التثبيت الكامل مع جميع المكونات
pip install agent-governance-toolkit[full]
# مكونات فردية (اختيارية)
pip install agent-os-kernel # Policy engine
pip install agentmesh-platform # Zero-trust identity & trust mesh
pip install agentmesh-runtime # Runtime supervisor & privilege rings
pip install agent-sre # SRE toolkit (SLOs, error budgets)
pip install agent-governance-toolkit # Compliance & attestation
pip install agentmesh-marketplace # Plugin lifecycle management
pip install agentmesh-lightning # RL training governance
npm install @agentmesh/sdk
dotnet add package Microsoft.AgentGovernance
cargo add agentmesh
go get github.com/microsoft/agent-governance-toolkit/packages/agent-mesh/sdks/go
from agent_os.policy import PolicyEngine, CapabilityModel
# تحديد قدرات الوكيل
capabilities = CapabilityModel(
allowed_tools=["web_search", "read_file", "send_email"],
denied_tools=["delete_file", "execute_shell"],
blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"], # Block SSN
max_tool_calls=10,
max_tokens_per_call=4096,
require_human_approval=True
)
# إنشاء محرك السياسة
engine = PolicyEngine(capabilities=capabilities)
# تقييم الإجراءات
result = engine.evaluate(
agent_id="researcher-1",
action="web_search",
input_text="latest security news"
)
print(f"Allowed: {result.allowed}")
print(f"Reason: {result.reason}")
import { PolicyEngine, AgentIdentity, AuditLogger } from "@agentmesh/sdk";
// إنشاء هوية التشفير
const identity = AgentIdentity.generate("my-agent", ["web_search", "read_file"]);
// إنشاء محرك السياسة
const engine = new PolicyEngine([
{ action: "web_search", effect: "allow" },
{ action: "read_file", effect: "allow" },
{ action: "delete_file", effect: "deny" },
{ action: "shell_exec", effect: "deny" },
]);
const decision = engine.evaluate("web_search"); // "allow"
const denied = engine.evaluate("delete_file"); // "deny"
| Component | Purpose | Key Features |
|---|
| Agent OS | محرك السياسة وأداة الربط للإطارات | Sub-millisecond evaluation, regex/semantic detection |
| Agent Mesh | هوية الثقة الصفرية وتسجيل الثقة | Ed25519 signatures, SPIFFE/SVID, 0–1000 trust scale |
| Agent Runtime | المراقب والعزل | 4-tier privilege rings, kill switch, saga orchestration |
| Agent SRE | هندسة الموثوقية | SLOs, error budgets, circuit breakers, replay debugging |
| Agent Compliance | التحقق والشهادة OWASP | Badge generation, JSON audit trails, integrity checks |
| Agent Marketplace | إدارة دورة حياة المكون الإضافي | MCP security scanning, rug-pull detection |
| Agent Lightning | حوكمة تدريب RL | Training data validation, model drift detection |
policies:
researcher_agent:
allowed_tools:
- web_search
- read_file
- database_query
denied_tools:
- execute_code
- delete_database
- modify_system
# حجب الأنماط الحساسة (SSN، مفاتيح API، رسائل البريد الإلكتروني)
blocked_patterns:
- "\\b\\d{3}-\\d{2}-\\d{4}\\b" # SSN
- "[Aa][Pp][Ii][_-]?[Kk][Ee][Yy]" # API key
- "\\S+@\\S+\\.\\S+" # Email (PII)
# حدود المورد
max_tool_calls: 20
max_tokens_per_call: 8192
max_concurrent_calls: 5
# تعيين حلقة الامتياز
execution_ring: 2 # 0=kernel, 1=system, 2=user, 3=sandbox
# يتطلب موافقة لإجراءات معينة
require_human_approval_for:
- send_email
- modify_database
- external_api_call
# عتبات الثقة
min_trust_score: 500 # 0–1000 scale
# التحقق من الإخراج
enable_prompt_injection_detection: true
enable_sensitive_data_detection: true
enable_code_validation: true
from agent_os.policy import PolicyEngine, CapabilityModel
# تحميل السياسة من YAML
engine = PolicyEngine.from_yaml("policies.yaml")
# تقييم استدعاءات الأداة
decision = engine.evaluate(
agent_id="researcher-1",
action="tool_call",
tool="web_search",
params={"query": "security trends"}
)
if not decision.allowed:
print(f"Blocked: {decision.reason}")
else:
print("Proceeding with tool call...")
from agent_os import PolicyEngine, CapabilityModel
from agent_os.integrations import LangChainIntegration
# تحديد القدرات
capabilities = CapabilityModel(
allowed_tools=["web_search", "calculator"],
max_tool_calls=10
)
# إنشاء محرك السياسة
engine = PolicyEngine(capabilities=capabilities)
# لعملاء LangChain
from langchain.agents import initialize_agent
governed_agent = LangChainIntegration(
agent=your_langchain_agent,
policy_engine=engine,
audit_log=True
)
# يتم الآن اعتراض واعتراض كل استدعاء أداة وتقييمها
result = governed_agent.run("What is 2+2?")
# 1. فحص قائمة السماح/الحجب للأداة
✓ Is tool in allowed_tools list?
✓ Is tool NOT in denied_tools list?
# 2. مطابقة الأنماط (الحقن، PII، الأسرار)
✓ No prompt injection patterns detected
✓ No SSN, API keys, or PII in parameters
✓ No SQL injection or code execution payloads
# 3. قيود المورد
✓ Token usage within limits
✓ Concurrent call limit not exceeded
✓ Rate limiting not triggered
# 4. التحقق من حلقة الامتياز
✓ Agent has required privilege level
✓ Tool operates within agent's ring tier
# 5. تسجيل الثقة
✓ Agent meets minimum trust score
✓ No anomalous behavior detected
# 6. الموافقة البشرية (إن لزم الأمر)
✓ Sensitive action approved by human
import { AgentIdentity, TrustCard } from "@agentmesh/sdk";
// إنشاء هوية التشفير
const identity = AgentIdentity.generate(
"researcher-agent",
["web_search", "read_file"]
);
console.log(identity.did); // did:mesh:agent-xxxxx
console.log(identity.publicKeyPEM); // Ed25519 public key
console.log(identity.allowedCapabilities); // ["web_search", "read_file"]
// توقيع الاتصالات الصادرة
const signature = identity.sign("outgoing message");
// التحقق من هوية الوكيل النظير
const isValid = identity.verify(peerPublicKey, message, signature);
from agentmesh.trust import TrustScorer
scorer = TrustScorer()
# مكونات درجة الثقة:
trust_score = scorer.compute(
agent_id="agent-1",
factors={
"success_rate": 0.95, # 95% of tasks succeed
"error_budget_remaining": 0.8, # 80% error budget left
"last_violation_age_hours": 72, # Last violation 3 days ago
"api_key_rotation_days_ago": 30, # Keys rotated 30 days ago
"audit_log_completeness": 1.0, # Full audit trail
}
)
print(f"Trust score: {trust_score} / 1000")
# طبقات الثقة
# 0–200: Untrusted (sandbox only)
# 200–400: Low trust (limited tools)
# 400–600: Medium trust (standard tools)
# 600–800: High trust (privileged tools)
# 800–1000: Maximum trust (admin capabilities)
┌─────────────────────────────────┐
│ Ring 0: Kernel │ Filesystem, system calls, process control
├─────────────────────────────────┤
│ Ring 1: System │ Database, API gateways, secrets manager
├─────────────────────────────────┤
│ Ring 2: User │ Web search, internal APIs, file read
├─────────────────────────────────┤
│ Ring 3: Sandbox │ No outbound access, isolated execution
└─────────────────────────────────┘
from agent_os.runtime import ExecutionRing, AgentRuntime
runtime = AgentRuntime()
# تعيين الوكيل إلى حلقة بناءً على الثقة
runtime.assign_ring(agent_id="trusted-agent", ring=ExecutionRing.SYSTEM)
runtime.assign_ring(agent_id="untrusted-agent", ring=ExecutionRing.SANDBOX)
# فرض الحلقة أثناء التنفيذ
@runtime.enforce_ring
def execute_tool(agent_id, tool_name, params):
# تنفذ هذه الدالة فقط إذا كان للوكيل امتياز للحلقة
return tool_name(params)
# مفتاح القتل للوكلاء الخاطئين
runtime.terminate_agent("agent-123", reason="Excessive tool calls")
# إنشاء تقرير التحقق
agent-compliance verify
# الإخراج كـ JSON (لـ CI/CD)
agent-compliance verify --json
# إنشاء شارة لـ README
agent-compliance verify --badge
# التحقق من سلامة الوحدة (توقيعات Ed25519)
agent-compliance integrity --verify
{
"version": "1.0",
"timestamp": "2026-04-04T12:00:00Z",
"coverage": {
"ASI-01": {"status": "covered", "mechanism": "PolicyEngine"},
"ASI-02": {"status": "covered", "mechanism": "MCPGateway"},
"ASI-03": {"status": "covered", "mechanism": "MemoryGuard"},
"ASI-04": {"status": "covered", "mechanism": "RateLimiter"},
"ASI-05": {"status": "covered", "mechanism": "SupplyChainGuard"},
"ASI-06": {"status": "covered", "mechanism": "PII Detection"},
"ASI-07": {"status": "covered", "mechanism": "MCPSecurityScanner"},
"ASI-08": {"status": "covered", "mechanism": "ExecutionRings"},
"ASI-09": {"status": "covered", "mechanism": "DriftDetector"},
"ASI-10": {"status": "out_of_scope", "reason": "Model-level, not agent-level"}
},
"overall_score": "9/10"
}
# إنشاء بصمات سلامة للوحدات
agent-compliance integrity --generate
# التحقق من عدم اكتشاف أي عبث
agent-compliance integrity --verify
# الإخراج كشارة
agent-compliance integrity --badge
from agent_os.integrations import LangChainIntegration
from agent_os.policy import PolicyEngine, CapabilityModel
from langchain.agents import initialize_agent
from langchain.tools import Tool
# إنشاء الأدوات
tools = [
Tool(name="web_search", func=search_fn, description="Search web"),
Tool(name="calculator", func=calc_fn, description="Calculate"),
]
# تهيئة الوكيل المحكوم
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")
# الالتفاف مع الحوكمة
capabilities = CapabilityModel(allowed_tools=["web_search"])
governed = LangChainIntegration(agent, PolicyEngine(capabilities))
# يتم الآن فرض استدعاءات الأداة
result = governed.run("What is 2+2?")
from crewai import Agent, Task, Crew
from agent_os.integrations import CrewAIDecorator
from agent_os.policy import PolicyEngine, CapabilityModel
@CrewAIDecorator(
policy_engine=PolicyEngine(
CapabilityModel(allowed_tools=["web_search", "file_read"])
)
)
def create_crew():
agent = Agent(
role="Researcher",
tools=[web_search_tool, file_read_tool],
)
task = Task(description="Research AI trends", agent=agent)
return Crew(agents=[agent], tasks=[task])
crew = create_crew()
crew.kickoff()
using Microsoft.Agent.Framework;
using AgentGovernance.Policy;
var kernel = new GovernanceKernel(new GovernanceOptions
{
PolicyPaths = new() { "policies/default.yaml" },
EnablePromptInjectionDetection = true,
EnableSensitiveDataDetection = true,
});
var middleware = new FunctionMiddleware(kernel);
// التسجيل مع Semantic Kernel
kernel.ImportPlugin(middleware);
// تمر جميع استدعاءات الدالة الآن عبر الحوكمة
var result = await kernel.InvokeAsync("web_search", "AI trends");
name: Agent Security Scan
on: [push, pull_request]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install governance toolkit
run: pip install agent-governance-toolkit[full]
- name: Scan agent code
run: |
agent-compliance verify --json > report.json
- name: Generate badge
run: agent-compliance verify --badge > GOVERNANCE_BADGE.md
- name: Check OWASP coverage
run: |
COVERAGE=$(jq '.overall_score' report.json)
if [[ "$COVERAGE" != "10/10" && "$COVERAGE" != "9/10" ]]; then
echo "OWASP coverage below threshold: $COVERAGE"
exit 1
fi
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: governance-report
path: report.json
#!/bin/bash
# .git/hooks/pre-commit
FILES=$(git diff --cached --name-only | grep -E "\.yaml$")
for file in $FILES; do
echo "Validating policy: $file"
agent-compliance verify "$file" || exit 1
done
exit 0
| # | Risk | Mechanism | Status |
|---|
| ASI-01 | حقن الفوري | PolicyEngine + PromptInjectionDetector + MCP scanning | 9/10 |
| ASI-02 | معالجة الإخراج غير الآمنة | CodeValidator + DriftDetector | 9/10 |
| ASI-03 | تسميم بيانات التدريب | MemoryGuard + ContentHashInterceptor | 9/10 |
| ASI-04 | رفض الخدمة للنموذج | TokenBudgetTracker + RateLimiter + circuit breakers | 9/10 |
| ASI-05 | ثغرات سلسلة التوريد | SBOM + Ed25519 signing + MCPFingerprinting | 9/10 |
| ASI-06 | الكشف عن المعلومات الحساسة | PII patterns + secret detection + egress policy | 9/10 |
| ASI-07 | تصميم المكون الإضافي غير الآمن | MCPGateway + schema validation + rug-pull detection | 9/10 |
| ASI-08 | الوكالة المفرطة | ExecutionRings + kill switch + rogue detection | 9/10 |
| ASI-09 | الاعتماد الزائد على إخراج الوكيل | DriftDetector + confidence threshold | 9/10 |
| ASI-10 | سرقة النموذج | خارج النطاق (model-level, not agent-level) | 0/10 |
| TOTAL | | يغطي 9 من 10 مخاطر OWASP | 9/10 |
from agent_os.marketplace import MCPSecurityScanner
scanner = MCPSecurityScanner()
# مسح تعريفات الأدوات بحثاً عن التهديدات
findings = scanner.scan(
tool_name="suspicious_tool",
schema={
"type": "object",
"properties": {
"command": {"type": "string"},
"api_key": {"type": "string", "description": "Never ask for this"}
}
}
)
if findings.has_rug_pull_patterns:
print("WARNING: Rug-pull detection triggered!")
print(f"Issues: {findings.issues}")
if findings.has_typosquatting:
print("Tool name matches known typosquat target")
if findings.has_hidden_instructions:
print("Detected hidden instructions in schema")
# طبقة عناصر التحكم المتعددة
governance = PolicyEngine(
capabilities=CapabilityModel(
allowed_tools=["web_search"],
blocked_patterns=[r"password", r"api.?key"],
max_tool_calls=10,
require_human_approval=True,
),
enable_injection_detection=True,
enable_pii_detection=True,
enable_code_validation=True,
)
# تعيين الوكلاء إلى أقل حلقة مطلوبة
policies:
untrusted_agent:
execution_ring: 3 # Sandbox: no filesystem, no network
allowed_tools: [] # No tools
researcher_agent:
execution_ring: 2 # User: limited tools
allowed_tools: [web_search, read_file]
system_agent:
execution_ring: 1 # System: database, APIs
allowed_tools: [database_query, api_call]
from agent_os.audit import AuditLogger
audit = AuditLogger(
storage="elasticsearch", # Persistent audit trail
include_params=False, # Don't log sensitive data
structured_logging=True,
)
engine = PolicyEngine(
capabilities=capabilities,
audit_logger=audit,
)
# يتم تسجيل كل قرار برطابع زمني معرف الوكيل والإجراء
from agentmesh.trust import TrustMonitor
monitor = TrustMonitor()
# التنبيه إذا انخفضت ثقة الوكيل
monitor.watch(
agent_id="researcher-1",
min_trust_score=400,
alert_webhook="https://slack.com/hooks/...",
)
# المعالجة التلقائية: خفض إلى الحمض إذا كانت الثقة <200
@monitor.on_low_trust
def demote_to_sandbox(agent_id):
runtime.assign_ring(agent_id, ExecutionRing.SANDBOX)
# جدولة التحقق من السلامة الأسبوعي
0 0 * * 0 agent-compliance integrity --verify
# تدوير بيانات اعتماد الوكيل شهرياً
0 0 1 * * for agent in $(agent-mesh list --all); do
agentmesh rotate-credentials "$agent"
done
# إنشاء تقرير الحوكمة الشهري
0 0 1 * * agent-compliance verify --json > reports/$(date +%Y-%m).json
| Issue | Solution |
|---|
| السياسة لم يتم فرضها | التأكد من توصيل PolicyEngine بتكامل الإطار (استدعاء LangChain، مزخرف CrewAI) |
| انتهاء مهلة استدعاءات الأداة | التحقق من إعدادات max_tokens_per_call و rate_limiter؛ زيادة الحد الأقصى إذا كان صحيحاً |
| نسبة عالية من الإيجابيات الكاذبة بشأن PII | ضبط الأنماط في blocked_patterns؛ استخدام قائمة السماح للأنماط المعروفة بأنها آمنة |
| الوكيل مخفض إلى الحمض | التحقق من درجة الثقة: agentmesh trust report ؛ استعادة بيانات الاعتماد إذا انتهت صلاحيتها |
| فشل سلامة الوحدة | التحقق من إصدار Python (3.10+); إعادة تشغيل scripts/patch-datastore.mjs إذا كنت تستخدم بناء Astro |