يوفر Agent Governance Toolkit مفتوح المصدر من Microsoft حوكمة أمان وقت التشغيل لوكلاء الذكاء الاصطناعي. فرض السياسة الحتمية وهوية الثقة الصفرية وحلقات التنفيذ والتحقق من الامتثال عبر جميع مخاطر OWASP Agentic العشرة. يعمل مع LangChain و CrewAI و AutoGen و Anthropic والمزيد.
GitHub: https://github.com/microsoft/agent-governance-toolkit
License: MIT
Languages: Python, TypeScript, .NET, Rust, Go
التثبيت
Python (مجموعة الأدوات الكاملة)
# التثبيت الكامل مع جميع المكونات
pip install agent-governance-toolkit[full]
# مكونات فردية (اختيارية)
pip install agent-os-kernel # Policy engine
pip install agentmesh-platform # Zero-trust identity & trust mesh
pip install agentmesh-runtime # Runtime supervisor & privilege rings
pip install agent-sre # SRE toolkit (SLOs, error budgets)
pip install agent-governance-toolkit # Compliance & attestation
pip install agentmesh-marketplace # Plugin lifecycle management
pip install agentmesh-lightning # RL training governance
TypeScript/Node.js
npm install @agentmesh/sdk
.NET
dotnet add package Microsoft.AgentGovernance
Rust
cargo add agentmesh
Go
go get github.com/microsoft/agent-governance-toolkit/packages/agent-mesh/sdks/go
البدء السريع — محرك السياسة
Python: إنشاء سياستك الأولى
from agent_os.policy import PolicyEngine, CapabilityModel
# تحديد قدرات الوكيل
capabilities = CapabilityModel(
allowed_tools=["web_search", "read_file", "send_email"],
denied_tools=["delete_file", "execute_shell"],
blocked_patterns=[r"\b\d{3}-\d{2}-\d{4}\b"], # Block SSN
max_tool_calls=10,
max_tokens_per_call=4096,
require_human_approval=True
)
# إنشاء محرك السياسة
engine = PolicyEngine(capabilities=capabilities)
# تقييم الإجراءات
result = engine.evaluate(
agent_id="researcher-1",
action="web_search",
input_text="latest security news"
)
print(f"Allowed: {result.allowed}")
print(f"Reason: {result.reason}")
TypeScript: فرض السياسة الأساسي
import { PolicyEngine, AgentIdentity, AuditLogger } from "@agentmesh/sdk";
// إنشاء هوية التشفير
const identity = AgentIdentity.generate("my-agent", ["web_search", "read_file"]);
// إنشاء محرك السياسة
const engine = new PolicyEngine([
{ action: "web_search", effect: "allow" },
{ action: "read_file", effect: "allow" },
{ action: "delete_file", effect: "deny" },
{ action: "shell_exec", effect: "deny" },
]);
const decision = engine.evaluate("web_search"); // "allow"
const denied = engine.evaluate("delete_file"); // "deny"
المكونات الأساسية
| Component | Purpose | Key Features |
|---|
| Agent OS | محرك السياسة وأداة الربط للإطارات | Sub-millisecond evaluation, regex/semantic detection |
| Agent Mesh | هوية الثقة الصفرية وتسجيل الثقة | Ed25519 signatures, SPIFFE/SVID, 0–1000 trust scale |
| Agent Runtime | المراقب والعزل | 4-tier privilege rings, kill switch, saga orchestration |
| Agent SRE | هندسة الموثوقية | SLOs, error budgets, circuit breakers, replay debugging |
| Agent Compliance | التحقق والشهادة OWASP | Badge generation, JSON audit trails, integrity checks |
| Agent Marketplace | إدارة دورة حياة المكون الإضافي | MCP security scanning, rug-pull detection |
| Agent Lightning | حوكمة تدريب RL | Training data validation, model drift detection |
تكوين السياسة (YAML)
تحديد السياسات في YAML
policies:
researcher_agent:
allowed_tools:
- web_search
- read_file
- database_query
denied_tools:
- execute_code
- delete_database
- modify_system
# حجب الأنماط الحساسة (SSN، مفاتيح API، رسائل البريد الإلكتروني)
blocked_patterns:
- "\\b\\d{3}-\\d{2}-\\d{4}\\b" # SSN
- "[Aa][Pp][Ii][_-]?[Kk][Ee][Yy]" # API key
- "\\S+@\\S+\\.\\S+" # Email (PII)
# حدود المورد
max_tool_calls: 20
max_tokens_per_call: 8192
max_concurrent_calls: 5
# تعيين حلقة الامتياز
execution_ring: 2 # 0=kernel, 1=system, 2=user, 3=sandbox
# يتطلب موافقة لإجراءات معينة
require_human_approval_for:
- send_email
- modify_database
- external_api_call
# عتبات الثقة
min_trust_score: 500 # 0–1000 scale
# التحقق من الإخراج
enable_prompt_injection_detection: true
enable_sensitive_data_detection: true
enable_code_validation: true
تحميل وفرض السياسة (Python)
from agent_os.policy import PolicyEngine, CapabilityModel
# تحميل السياسة من YAML
engine = PolicyEngine.from_yaml("policies.yaml")
# تقييم استدعاءات الأداة
decision = engine.evaluate(
agent_id="researcher-1",
action="tool_call",
tool="web_search",
params={"query": "security trends"}
)
if not decision.allowed:
print(f"Blocked: {decision.reason}")
else:
print("Proceeding with tool call...")
Agent OS — محرك السياسة
اعتراض الإجراءات (Python)
from agent_os import PolicyEngine, CapabilityModel
from agent_os.integrations import LangChainIntegration
# تحديد القدرات
capabilities = CapabilityModel(
allowed_tools=["web_search", "calculator"],
max_tool_calls=10
)
# إنشاء محرك السياسة
engine = PolicyEngine(capabilities=capabilities)
# لعملاء LangChain
from langchain.agents import initialize_agent
governed_agent = LangChainIntegration(
agent=your_langchain_agent,
policy_engine=engine,
audit_log=True
)
# يتم الآن اعتراض واعتراض كل استدعاء أداة وتقييمها
result = governed_agent.run("What is 2+2?")
قائمة التحقق من تقييم استدعاء الأداة
# 1. فحص قائمة السماح/الحجب للأداة
✓ Is tool in allowed_tools list?
✓ Is tool NOT in denied_tools list?
# 2. مطابقة الأنماط (الحقن، PII، الأسرار)
✓ No prompt injection patterns detected
✓ No SSN, API keys, or PII in parameters
✓ No SQL injection or code execution payloads
# 3. قيود المورد
✓ Token usage within limits
✓ Concurrent call limit not exceeded
✓ Rate limiting not triggered
# 4. التحقق من حلقة الامتياز
✓ Agent has required privilege level
✓ Tool operates within agent's ring tier
# 5. تسجيل الثقة
✓ Agent meets minimum trust score
✓ No anomalous behavior detected
# 6. الموافقة البشرية (إن لزم الأمر)
✓ Sensitive action approved by human
Agent Mesh — هوية الثقة الصفرية
إنشاء هوية الوكيل (TypeScript)
import { AgentIdentity, TrustCard } from "@agentmesh/sdk";
// إنشاء هوية التشفير
const identity = AgentIdentity.generate(
"researcher-agent",
["web_search", "read_file"]
);
console.log(identity.did); // did:mesh:agent-xxxxx
console.log(identity.publicKeyPEM); // Ed25519 public key
console.log(identity.allowedCapabilities); // ["web_search", "read_file"]
// توقيع الاتصالات الصادرة
const signature = identity.sign("outgoing message");
// التحقق من هوية الوكيل النظير
const isValid = identity.verify(peerPublicKey, message, signature);
تسجيل الثقة (مقياس 0–1000)
from agentmesh.trust import TrustScorer
scorer = TrustScorer()
# مكونات درجة الثقة:
trust_score = scorer.compute(
agent_id="agent-1",
factors={
"success_rate": 0.95, # 95% of tasks succeed
"error_budget_remaining": 0.8, # 80% error budget left
"last_violation_age_hours": 72, # Last violation 3 days ago
"api_key_rotation_days_ago": 30, # Keys rotated 30 days ago
"audit_log_completeness": 1.0, # Full audit trail
}
)
print(f"Trust score: {trust_score} / 1000")
# طبقات الثقة
# 0–200: Untrusted (sandbox only)
# 200–400: Low trust (limited tools)
# 400–600: Medium trust (standard tools)
# 600–800: High trust (privileged tools)
# 800–1000: Maximum trust (admin capabilities)
Agent Runtime — حلقات التنفيذ
معمارية حلقة الامتياز
┌─────────────────────────────────┐
│ Ring 0: Kernel │ Filesystem, system calls, process control
├─────────────────────────────────┤
│ Ring 1: System │ Database, API gateways, secrets manager
├─────────────────────────────────┤
│ Ring 2: User │ Web search, internal APIs, file read
├─────────────────────────────────┤
│ Ring 3: Sandbox │ No outbound access, isolated execution
└─────────────────────────────────┘
تعيين الوكيل إلى حلقة (Python)
from agent_os.runtime import ExecutionRing, AgentRuntime
runtime = AgentRuntime()
# تعيين الوكيل إلى حلقة بناءً على الثقة
runtime.assign_ring(agent_id="trusted-agent", ring=ExecutionRing.SYSTEM)
runtime.assign_ring(agent_id="untrusted-agent", ring=ExecutionRing.SANDBOX)
# فرض الحلقة أثناء التنفيذ
@runtime.enforce_ring
def execute_tool(agent_id, tool_name, params):
# تنفذ هذه الدالة فقط إذا كان للوكيل امتياز للحلقة
return tool_name(params)
# مفتاح القتل للوكلاء الخاطئين
runtime.terminate_agent("agent-123", reason="Excessive tool calls")
Agent Compliance — التحقق والشهادة
CLI: التحقق من تغطية OWASP
# إنشاء تقرير التحقق
agent-compliance verify
# الإخراج كـ JSON (لـ CI/CD)
agent-compliance verify --json
# إنشاء شارة لـ README
agent-compliance verify --badge
# التحقق من سلامة الوحدة (توقيعات Ed25519)
agent-compliance integrity --verify
مثال على إخراج التحقق
{
"version": "1.0",
"timestamp": "2026-04-04T12:00:00Z",
"coverage": {
"ASI-01": {"status": "covered", "mechanism": "PolicyEngine"},
"ASI-02": {"status": "covered", "mechanism": "MCPGateway"},
"ASI-03": {"status": "covered", "mechanism": "MemoryGuard"},
"ASI-04": {"status": "covered", "mechanism": "RateLimiter"},
"ASI-05": {"status": "covered", "mechanism": "SupplyChainGuard"},
"ASI-06": {"status": "covered", "mechanism": "PII Detection"},
"ASI-07": {"status": "covered", "mechanism": "MCPSecurityScanner"},
"ASI-08": {"status": "covered", "mechanism": "ExecutionRings"},
"ASI-09": {"status": "covered", "mechanism": "DriftDetector"},
"ASI-10": {"status": "out_of_scope", "reason": "Model-level, not agent-level"}
},
"overall_score": "9/10"
}
التحقق من السلامة
# إنشاء بصمات سلامة للوحدات
agent-compliance integrity --generate
# التحقق من عدم اكتشاف أي عبث
agent-compliance integrity --verify
# الإخراج كشارة
agent-compliance integrity --badge
تكامل الإطار
LangChain + PolicyEngine
from agent_os.integrations import LangChainIntegration
from agent_os.policy import PolicyEngine, CapabilityModel
from langchain.agents import initialize_agent
from langchain.tools import Tool
# إنشاء الأدوات
tools = [
Tool(name="web_search", func=search_fn, description="Search web"),
Tool(name="calculator", func=calc_fn, description="Calculate"),
]
# تهيئة الوكيل المحكوم
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")
# الالتفاف مع الحوكمة
capabilities = CapabilityModel(allowed_tools=["web_search"])
governed = LangChainIntegration(agent, PolicyEngine(capabilities))
# يتم الآن فرض استدعاءات الأداة
result = governed.run("What is 2+2?")
CrewAI + مزخرفات المهام
from crewai import Agent, Task, Crew
from agent_os.integrations import CrewAIDecorator
from agent_os.policy import PolicyEngine, CapabilityModel
@CrewAIDecorator(
policy_engine=PolicyEngine(
CapabilityModel(allowed_tools=["web_search", "file_read"])
)
)
def create_crew():
agent = Agent(
role="Researcher",
tools=[web_search_tool, file_read_tool],
)
task = Task(description="Research AI trends", agent=agent)
return Crew(agents=[agent], tasks=[task])
crew = create_crew()
crew.kickoff()
Microsoft Agent Framework + FunctionMiddleware
using Microsoft.Agent.Framework;
using AgentGovernance.Policy;
var kernel = new GovernanceKernel(new GovernanceOptions
{
PolicyPaths = new() { "policies/default.yaml" },
EnablePromptInjectionDetection = true,
EnableSensitiveDataDetection = true,
});
var middleware = new FunctionMiddleware(kernel);
// التسجيل مع Semantic Kernel
kernel.ImportPlugin(middleware);
// تمر جميع استدعاءات الدالة الآن عبر الحوكمة
var result = await kernel.InvokeAsync("web_search", "AI trends");
تكامل خط أنابيب CI/CD
GitHub Actions: فحص الأمان
name: Agent Security Scan
on: [push, pull_request]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install governance toolkit
run: pip install agent-governance-toolkit[full]
- name: Scan agent code
run: |
agent-compliance verify --json > report.json
- name: Generate badge
run: agent-compliance verify --badge > GOVERNANCE_BADGE.md
- name: Check OWASP coverage
run: |
COVERAGE=$(jq '.overall_score' report.json)
if [[ "$COVERAGE" != "10/10" && "$COVERAGE" != "9/10" ]]; then
echo "OWASP coverage below threshold: $COVERAGE"
exit 1
fi
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: governance-report
path: report.json
خطاف ما قبل الالتزام: التحقق من السياسات
#!/bin/bash
# .git/hooks/pre-commit
FILES=$(git diff --cached --name-only | grep -E "\.yaml$")
for file in $FILES; do
echo "Validating policy: $file"
agent-compliance verify "$file" || exit 1
done
exit 0
تغطية OWASP Agentic Top 10 (ASI)
| # | Risk | Mechanism | Status |
|---|
| ASI-01 | حقن الفوري | PolicyEngine + PromptInjectionDetector + MCP scanning | 9/10 |
| ASI-02 | معالجة الإخراج غير الآمنة | CodeValidator + DriftDetector | 9/10 |
| ASI-03 | تسميم بيانات التدريب | MemoryGuard + ContentHashInterceptor | 9/10 |
| ASI-04 | رفض الخدمة للنموذج | TokenBudgetTracker + RateLimiter + circuit breakers | 9/10 |
| ASI-05 | ثغرات سلسلة التوريد | SBOM + Ed25519 signing + MCPFingerprinting | 9/10 |
| ASI-06 | الكشف عن المعلومات الحساسة | PII patterns + secret detection + egress policy | 9/10 |
| ASI-07 | تصميم المكون الإضافي غير الآمن | MCPGateway + schema validation + rug-pull detection | 9/10 |
| ASI-08 | الوكالة المفرطة | ExecutionRings + kill switch + rogue detection | 9/10 |
| ASI-09 | الاعتماد الزائد على إخراج الوكيل | DriftDetector + confidence threshold | 9/10 |
| ASI-10 | سرقة النموذج | خارج النطاق (model-level, not agent-level) | 0/10 |
| TOTAL | | يغطي 9 من 10 مخاطر OWASP | 9/10 |
فحص أمان MCP
كشف تسمم الأداة وسحب الخصم
from agent_os.marketplace import MCPSecurityScanner
scanner = MCPSecurityScanner()
# مسح تعريفات الأدوات بحثاً عن التهديدات
findings = scanner.scan(
tool_name="suspicious_tool",
schema={
"type": "object",
"properties": {
"command": {"type": "string"},
"api_key": {"type": "string", "description": "Never ask for this"}
}
}
)
if findings.has_rug_pull_patterns:
print("WARNING: Rug-pull detection triggered!")
print(f"Issues: {findings.issues}")
if findings.has_typosquatting:
print("Tool name matches known typosquat target")
if findings.has_hidden_instructions:
print("Detected hidden instructions in schema")
أفضل الممارسات
1. الدفاع بعمق
# طبقة عناصر التحكم المتعددة
governance = PolicyEngine(
capabilities=CapabilityModel(
allowed_tools=["web_search"],
blocked_patterns=[r"password", r"api.?key"],
max_tool_calls=10,
require_human_approval=True,
),
enable_injection_detection=True,
enable_pii_detection=True,
enable_code_validation=True,
)
2. امتياز أقل
# تعيين الوكلاء إلى أقل حلقة مطلوبة
policies:
untrusted_agent:
execution_ring: 3 # Sandbox: no filesystem, no network
allowed_tools: [] # No tools
researcher_agent:
execution_ring: 2 # User: limited tools
allowed_tools: [web_search, read_file]
system_agent:
execution_ring: 1 # System: database, APIs
allowed_tools: [database_query, api_call]
3. تدقيق كل شيء
from agent_os.audit import AuditLogger
audit = AuditLogger(
storage="elasticsearch", # Persistent audit trail
include_params=False, # Don't log sensitive data
structured_logging=True,
)
engine = PolicyEngine(
capabilities=capabilities,
audit_logger=audit,
)
# يتم تسجيل كل قرار برطابع زمني معرف الوكيل والإجراء
4. مراقبة درجات الثقة
from agentmesh.trust import TrustMonitor
monitor = TrustMonitor()
# التنبيه إذا انخفضت ثقة الوكيل
monitor.watch(
agent_id="researcher-1",
min_trust_score=400,
alert_webhook="https://slack.com/hooks/...",
)
# المعالجة التلقائية: خفض إلى الحمض إذا كانت الثقة <200
@monitor.on_low_trust
def demote_to_sandbox(agent_id):
runtime.assign_ring(agent_id, ExecutionRing.SANDBOX)
5. فحوصات السلامة المنتظمة
# جدولة التحقق من السلامة الأسبوعي
0 0 * * 0 agent-compliance integrity --verify
# تدوير بيانات اعتماد الوكيل شهرياً
0 0 1 * * for agent in $(agent-mesh list --all); do
agentmesh rotate-credentials "$agent"
done
# إنشاء تقرير الحوكمة الشهري
0 0 1 * * agent-compliance verify --json > reports/$(date +%Y-%m).json
استكشاف الأخطاء والإصلاح
| Issue | Solution |
|---|
| السياسة لم يتم فرضها | التأكد من توصيل PolicyEngine بتكامل الإطار (استدعاء LangChain، مزخرف CrewAI) |
| انتهاء مهلة استدعاءات الأداة | التحقق من إعدادات max_tokens_per_call و rate_limiter؛ زيادة الحد الأقصى إذا كان صحيحاً |
| نسبة عالية من الإيجابيات الكاذبة بشأن PII | ضبط الأنماط في blocked_patterns؛ استخدام قائمة السماح للأنماط المعروفة بأنها آمنة |
| الوكيل مخفض إلى الحمض | التحقق من درجة الثقة: agentmesh trust report ؛ استعادة بيانات الاعتماد إذا انتهت صلاحيتها |
| فشل سلامة الوحدة | التحقق من إصدار Python (3.10+); إعادة تشغيل scripts/patch-datastore.mjs إذا كنت تستخدم بناء Astro |
الموارد