Stormspotter Cheat Sheet¶
Überblick¶
Stormspotter ist ein Azure Red Team-Tool zur Graphik von Azure und Azure AD Objekten. Ursprünglich vom Microsoft Azure Red Team entwickelt, erstellt Stormspotter eine umfassende Angriffs-Oberflächenkarte von Azure-Umgebungen, indem er Beziehungen zwischen Azure-Ressourcen, Identitäten und Berechtigungen sammelt und visualisiert. Es hilft Sicherheitsexperten, potenzielle Angriffspfade und Privileg Eskalationsmöglichkeiten in Azure-Umgebungen zu identifizieren.
ZEIT Warning: Verwenden Sie Stormspotter nur in Umgebungen, die Sie besitzen oder eine ausdrückliche Erlaubnis zum Testen haben. Unberechtigte Nutzung kann gegen Nutzungsbedingungen oder lokale Gesetze verstoßen.
Installation¶
Voraussetzungen¶
```bash
Install Python 3.7+¶
python3 --version
Install pip¶
sudo apt update sudo apt install python3-pip
Install Git¶
sudo apt install git
Install Neo4j (required for graph database)¶
wget -O - https://debian.neo4j.com/neotechnology.gpg.key|sudo apt-key add - echo 'deb https://debian.neo4j.com stable 4.4'|sudo tee /etc/apt/sources.list.d/neo4j.list sudo apt update sudo apt install neo4j
Install Java (required for Neo4j)¶
sudo apt install openjdk-11-jdk ```_
Clone und Install Stormspotter¶
```bash
Clone the repository¶
git clone https://github.com/Azure/Stormspotter.git cd Stormspotter
Install Python dependencies¶
pip3 install -r requirements.txt
Alternative: Install with pipenv¶
pip3 install pipenv pipenv install pipenv shell ```_
Docker Installation¶
```bash
Clone repository¶
git clone https://github.com/Azure/Stormspotter.git cd Stormspotter
Build Docker containers¶
docker-compose build
Start services¶
docker-compose up -d
Check status¶
docker-compose ps ```_
Bedienungsanleitung Neo4j Setup¶
```bash
Start Neo4j service¶
sudo systemctl start neo4j sudo systemctl enable neo4j
Check Neo4j status¶
sudo systemctl status neo4j
Access Neo4j browser (default: http://localhost:7474)¶
Default credentials: neo4j/neo4j (change on first login)¶
Configure Neo4j for Stormspotter¶
sudo nano /etc/neo4j/neo4j.conf
Uncomment and modify these lines:¶
dbms.default_listen_address=0.0.0.0¶
dbms.connector.bolt.listen_address=0.0.0.0:7687¶
dbms.connector.http.listen_address=0.0.0.0:7474¶
Restart Neo4j¶
sudo systemctl restart neo4j ```_
Konfiguration¶
Azure Authentication Setup¶
```bash
Install Azure CLI¶
curl -sL https://aka.ms/InstallAzureCLIDeb|sudo bash
Login to Azure¶
az login
List available subscriptions¶
az account list --output table
Set specific subscription¶
az account set --subscription "subscription-id"
Verify current context¶
az account show ```_
Service Hauptauthentifizierung¶
```bash
Create service principal for Stormspotter¶
az ad sp create-for-rbac --name "Stormspotter-SP" --role "Reader" --scopes "/subscriptions/your-subscription-id"
Note the output:¶
\\{¶
"appId": "app-id",¶
"displayName": "Stormspotter-SP",¶
"name": "app-id",¶
"password": "password",¶
"tenant": "tenant-id"¶
\\}¶
Set environment variables¶
export AZURE_CLIENT_ID="app-id" export AZURE_CLIENT_SECRET="password" export AZURE_TENANT_ID="tenant-id" export AZURE_SUBSCRIPTION_ID="subscription-id" ```_
Stormspotter Konfiguration¶
```bash
Create configuration file¶
cd Stormspotter cp config/config.json.example config/config.json
Edit configuration¶
nano config/config.json
_
json
\\{
"neo4j": \\{
"uri": "bolt://localhost:7687",
"username": "neo4j",
"password": "your-neo4j-password"
\\},
"azure": \\{
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"subscription_id": "your-subscription-id"
\\},
"logging": \\{
"level": "INFO",
"file": "logs/stormspotter.log"
\\}
\\}
```_
Basisnutzung¶
Datenerhebung¶
```bash
Basic collection from current subscription¶
python3 stormspotter.py collect
Collect from specific subscription¶
python3 stormspotter.py collect --subscription-id "subscription-id"
Collect from specific tenant¶
python3 stormspotter.py collect --tenant-id "tenant-id"
Collect with specific credentials¶
python3 stormspotter.py collect \ --client-id "app-id" \ --client-secret "password" \ --tenant-id "tenant-id"
Verbose collection¶
python3 stormspotter.py collect --verbose
Collect specific resource types¶
python3 stormspotter.py collect --resource-types "VirtualMachines,StorageAccounts" ```_
Datenimport nach Neo4j¶
```bash
Import collected data to Neo4j¶
python3 stormspotter.py import --data-file "output/azure_data.json"
Import with custom Neo4j connection¶
python3 stormspotter.py import \ --neo4j-uri "bolt://localhost:7687" \ --neo4j-user "neo4j" \ --neo4j-password "password"
Clear existing data before import¶
python3 stormspotter.py import --clear-database
Import multiple files¶
python3 stormspotter.py import --data-dir "output/" ```_
Web Interface¶
```bash
Start Stormspotter web interface¶
python3 stormspotter.py web
Start on specific port¶
python3 stormspotter.py web --port 8080
Start with custom host¶
python3 stormspotter.py web --host 0.0.0.0 --port 8080
Access web interface¶
Default: http://localhost:5000¶
```_
Erweiterte Sammlung¶
Multi-Tenant Sammlung¶
```bash
Collect from multiple tenants¶
python3 stormspotter.py collect-multi \ --tenants "tenant1-id,tenant2-id,tenant3-id" \ --output-dir "multi-tenant-data"
Collect with different credentials per tenant¶
python3 stormspotter.py collect-multi \ --config-file "multi-tenant-config.json" ```_
Umfassende Sammlung¶
```bash
Collect all available data¶
python3 stormspotter.py collect \ --comprehensive \ --include-rbac \ --include-resources \ --include-identities \ --include-policies
Collect with custom scope¶
python3 stormspotter.py collect \ --scope "management-groups,subscriptions,resource-groups" \ --depth 5
Collect with filters¶
python3 stormspotter.py collect \ --exclude-resource-types "NetworkSecurityGroups" \ --include-only-active-resources ```_
Geplante Erhebung¶
```bash
Create collection script¶
cat > collect_azure.sh << 'EOF'
!/bin/bash¶
DATE=\((date +%Y%m%d_%H%M%S) OUTPUT_DIR="/opt/stormspotter/data/\)DATE"
mkdir -p "$OUTPUT_DIR"
python3 /opt/stormspotter/stormspotter.py collect \ --output-dir "$OUTPUT_DIR" \ --comprehensive \ --verbose
Import to Neo4j¶
python3 /opt/stormspotter/stormspotter.py import \ --data-dir "$OUTPUT_DIR" \ --clear-database
echo "Collection completed: $DATE" EOF
chmod +x collect_azure.sh
Schedule with cron (daily at 2 AM)¶
echo "0 2 * * * /opt/stormspotter/collect_azure.sh"|crontab - ```_
Datenanalyse¶
Neo4j Cypher Abfragen¶
```cypher -- Find all Azure AD users MATCH (u:AzureADUser) RETURN u.displayName, u.userPrincipalName, u.enabled
-- Find privileged users MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole) WHERE r.displayName CONTAINS "Admin" RETURN u.displayName, r.displayName
-- Find virtual machines and their permissions MATCH (vm:VirtualMachine) OPTIONAL MATCH (vm)<-[:HAS_PERMISSION]-(p:Principal) RETURN vm.name, vm.location, collect(p.displayName) as permissions
-- Find storage accounts with public access MATCH (sa:StorageAccount) WHERE sa.allowBlobPublicAccess = true RETURN sa.name, sa.resourceGroup, sa.location
-- Find attack paths to high-value resources MATCH path = (u:AzureADUser)-[*1..5]->(vm:VirtualMachine) WHERE vm.name CONTAINS "prod" OR vm.name CONTAINS "critical" RETURN path
-- Find users with multiple high-privilege roles MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole) WHERE r.displayName IN ["Global Administrator", "Privileged Role Administrator", "Security Administrator"] WITH u, collect(r.displayName) as roles WHERE size(roles) > 1 RETURN u.displayName, roles
-- Find resources accessible from internet MATCH (r:Resource)-[:ALLOWS_ACCESS]->(nsg:NetworkSecurityGroup) WHERE nsg.rules CONTAINS "0.0.0.0/0" RETURN r.name, r.type, r.location ```_
Python Analyse Scripts¶
```python
!/usr/bin/env python3¶
import json from neo4j import GraphDatabase
class StormspotterAnalyzer: def init(self, uri, user, password): self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def find_privileged_users(self):
"""Find users with administrative privileges"""
with self.driver.session() as session:
result = session.run("""
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName as user, collect(r.displayName) as roles
""")
privileged_users = []
for record in result:
privileged_users.append(\\\\{
'user': record['user'],
'roles': record['roles']
\\\\})
return privileged_users
def find_attack_paths(self, target_resource_type="VirtualMachine"):
"""Find potential attack paths to target resources"""
with self.driver.session() as session:
result = session.run(f"""
MATCH path = (u:AzureADUser)-[*1..5]->(r:\\\\{target_resource_type\\\\})
RETURN path, length(path) as path_length
ORDER BY path_length
LIMIT 50
""")
attack_paths = []
for record in result:
attack_paths.append(\\\\{
'path': record['path'],
'length': record['path_length']
\\\\})
return attack_paths
def find_exposed_resources(self):
"""Find resources with potential internet exposure"""
with self.driver.session() as session:
result = session.run("""
MATCH (r:Resource)
WHERE r.publicIPAddress IS NOT NULL
OR r.allowBlobPublicAccess = true
OR r.publicNetworkAccess = "Enabled"
RETURN r.name, r.type, r.location, r.resourceGroup
""")
exposed_resources = []
for record in result:
exposed_resources.append(\\\\{
'name': record['r.name'],
'type': record['r.type'],
'location': record['r.location'],
'resource_group': record['r.resourceGroup']
\\\\})
return exposed_resources
def generate_report(self):
"""Generate comprehensive security report"""
report = \\\\{
'privileged_users': self.find_privileged_users(),
'attack_paths': self.find_attack_paths(),
'exposed_resources': self.find_exposed_resources()
\\\\}
return report
Usage¶
if name == "main": analyzer = StormspotterAnalyzer("bolt://localhost:7687", "neo4j", "password")
try:
report = analyzer.generate_report()
print("=== STORMSPOTTER SECURITY ANALYSIS ===")
print(f"Privileged Users: \\\\{len(report['privileged_users'])\\\\}")
print(f"Attack Paths Found: \\\\{len(report['attack_paths'])\\\\}")
print(f"Exposed Resources: \\\\{len(report['exposed_resources'])\\\\}")
# Save detailed report
with open('stormspotter_analysis.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
print("Detailed report saved to stormspotter_analysis.json")
finally:
analyzer.close()
```_
Analyse von Pfaden¶
Gemeinsame Angriffsszenarien¶
```cypher -- Scenario 1: User to VM via role assignments MATCH path = (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)-[:APPLIES_TO]->(rg:ResourceGroup)-[:CONTAINS]->(vm:VirtualMachine) WHERE r.displayName IN ["Contributor", "Owner", "Virtual Machine Contributor"] RETURN path
-- Scenario 2: Service Principal privilege escalation MATCH path = (sp:ServicePrincipal)-[:HAS_PERMISSION]->(sub:Subscription) WHERE sp.appRoles CONTAINS "Application.ReadWrite.All" RETURN path
-- Scenario 3: Storage account access via managed identity MATCH path = (mi:ManagedIdentity)-[:HAS_ACCESS]->(sa:StorageAccount) WHERE sa.allowBlobPublicAccess = true RETURN path
-- Scenario 4: Cross-tenant access MATCH path = (u:AzureADUser)-[:GUEST_IN]->(t:Tenant)-[:CONTAINS]->(r:Resource) RETURN path
-- Scenario 5: Conditional access bypass MATCH (u:AzureADUser)-[:SUBJECT_TO]->(ca:ConditionalAccessPolicy) WHERE ca.state = "disabled" OR ca.conditions CONTAINS "trusted" RETURN u, ca ```_
Risikobewertungsanfragen¶
```cypher -- High-risk users (multiple admin roles) MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole) WHERE r.displayName CONTAINS "Admin" WITH u, collect(r.displayName) as adminRoles WHERE size(adminRoles) >= 2 RETURN u.displayName, adminRoles, size(adminRoles) as riskScore ORDER BY riskScore DESC
-- Overprivileged service principals MATCH (sp:ServicePrincipal)-[:HAS_PERMISSION]->(scope) WITH sp, collect(scope) as scopes WHERE size(scopes) > 10 RETURN sp.displayName, size(scopes) as permissionCount ORDER BY permissionCount DESC
-- Resources without proper RBAC MATCH (r:Resource) WHERE NOT ®<-[:HAS_PERMISSION]-(:Principal) RETURN r.name, r.type, r.resourceGroup
-- Stale accounts with access MATCH (u:AzureADUser)-[:HAS_ACCESS]->(r:Resource) WHERE u.lastSignInDateTime < datetime() - duration(\\{days: 90\\}) RETURN u.displayName, u.lastSignInDateTime, r.name ```_
Automatisierung und Integration¶
Integration von PowerShell¶
```powershell
Stormspotter PowerShell wrapper¶
function Invoke-StormspotterCollection \\{ param( [string]\(SubscriptionId, [string]\)TenantId, [string]\(OutputPath = "C:\StormspotterData", [switch]\)Comprehensive )
# Ensure output directory exists
if (!(Test-Path $OutputPath)) \\\\{
New-Item -ItemType Directory -Path $OutputPath -Force
\\\\}
# Set timestamp
$timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
$outputDir = Join-Path $OutputPath $timestamp
# Build command
$cmd = "python3 stormspotter.py collect"
if ($SubscriptionId) \\\\{
$cmd += " --subscription-id `"$SubscriptionId`""
\\\\}
if ($TenantId) \\\\{
$cmd += " --tenant-id `"$TenantId`""
\\\\}
if ($Comprehensive) \\\\{
$cmd += " --comprehensive"
\\\\}
$cmd += " --output-dir `"$outputDir`""
try \\\\{
Write-Host "[+] Starting Stormspotter collection..."
Invoke-Expression $cmd
Write-Host "[+] Collection completed: $outputDir"
# Import to Neo4j
Write-Host "[+] Importing to Neo4j..."
$importCmd = "python3 stormspotter.py import --data-dir `"$outputDir`""
Invoke-Expression $importCmd
Write-Host "[+] Import completed successfully"
return $outputDir
\\\\} catch \\\\{
Write-Error "[-] Collection failed: $($_.Exception.Message)"
return $null
\\\\}
\\}
Usage¶
$result = Invoke-StormspotterCollection -SubscriptionId "your-sub-id" -Comprehensive ```_
SIEM Integration¶
```python
!/usr/bin/env python3¶
import json import requests from datetime import datetime
class StormspotterSIEMIntegration: def init(self, siem_endpoint, api_key): self.siem_endpoint = siem_endpoint self.api_key = api_key
def export_findings_to_siem(self, neo4j_uri, neo4j_user, neo4j_password):
"""Export Stormspotter findings to SIEM"""
from neo4j import GraphDatabase
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
try:
with driver.session() as session:
# Query for security findings
findings = []
# High-privilege users
result = session.run("""
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName as user, r.displayName as role
""")
for record in result:
findings.append(\\\\{
'timestamp': datetime.utcnow().isoformat(),
'source': 'Stormspotter',
'type': 'PrivilegedUser',
'severity': 'Medium',
'user': record['user'],
'role': record['role'],
'description': f"User \\\\{record['user']\\\\} has administrative role \\\\{record['role']\\\\}"
\\\\})
# Exposed resources
result = session.run("""
MATCH (r:Resource)
WHERE r.publicIPAddress IS NOT NULL
RETURN r.name as resource, r.type as type, r.publicIPAddress as ip
""")
for record in result:
findings.append(\\\\{
'timestamp': datetime.utcnow().isoformat(),
'source': 'Stormspotter',
'type': 'ExposedResource',
'severity': 'High',
'resource': record['resource'],
'resource_type': record['type'],
'public_ip': record['ip'],
'description': f"Resource \\\\{record['resource']\\\\} is exposed to internet"
\\\\})
# Send to SIEM
self.send_to_siem(findings)
finally:
driver.close()
def send_to_siem(self, findings):
"""Send findings to SIEM platform"""
headers = \\\\{
'Authorization': f'Bearer \\\\{self.api_key\\\\}',
'Content-Type': 'application/json'
\\\\}
for finding in findings:
try:
response = requests.post(
f"\\\\{self.siem_endpoint\\\\}/api/events",
headers=headers,
json=finding,
timeout=30
)
if response.status_code == 200:
print(f"[+] Sent finding: \\\\{finding['type']\\\\}")
else:
print(f"[-] Failed to send finding: \\\\{response.status_code\\\\}")
except Exception as e:
print(f"[-] Error sending finding: \\\\{e\\\\}")
Usage¶
siem = StormspotterSIEMIntegration("https://your-siem.com", "your-api-key") siem.export_findings_to_siem("bolt://localhost:7687", "neo4j", "password") ```_
Fehlerbehebung¶
Gemeinsame Themen¶
Authentication Probleme¶
```bash
Check Azure CLI authentication¶
az account show
Re-authenticate if needed¶
az login --tenant "tenant-id"
Verify service principal¶
az ad sp show --id "client-id"
Test permissions¶
az role assignment list --assignee "client-id" ```_
Neo4j Verbindungsprobleme¶
```bash
Check Neo4j status¶
sudo systemctl status neo4j
Check Neo4j logs¶
sudo journalctl -u neo4j -f
Test connection¶
cypher-shell -u neo4j -p password
Reset Neo4j password¶
sudo neo4j-admin set-initial-password newpassword ```_
Sammlungsfehler¶
```bash
Enable debug logging¶
export STORMSPOTTER_LOG_LEVEL=DEBUG
Check API rate limits¶
az rest --method get --url "https://management.azure.com/subscriptions/your-sub-id/providers/Microsoft.Resources/resources?api-version=2021-04-01" --query "value[0]"
Verify permissions¶
az role assignment list --scope "/subscriptions/your-sub-id" --assignee "your-principal-id"
Test specific resource collection¶
python3 stormspotter.py collect --resource-types "VirtualMachines" --verbose ```_
Leistungsfragen¶
```bash
Increase Neo4j memory¶
sudo nano /etc/neo4j/neo4j.conf
Add/modify these settings:¶
dbms.memory.heap.initial_size=2G dbms.memory.heap.max_size=4G dbms.memory.pagecache.size=1G
Restart Neo4j¶
sudo systemctl restart neo4j
Optimize collection¶
python3 stormspotter.py collect --batch-size 100 --parallel-workers 5 ```_
Datenvalidierung¶
```python
!/usr/bin/env python3¶
def validate_stormspotter_data(): """Validate collected Stormspotter data""" from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
try:
with driver.session() as session:
# Check data completeness
checks = \\\\{
'users': session.run("MATCH (u:AzureADUser) RETURN count(u) as count").single()['count'],
'resources': session.run("MATCH (r:Resource) RETURN count(r) as count").single()['count'],
'roles': session.run("MATCH (r:AzureADRole) RETURN count(r) as count").single()['count'],
'relationships': session.run("MATCH ()-[r]->() RETURN count(r) as count").single()['count']
\\\\}
print("=== STORMSPOTTER DATA VALIDATION ===")
for check_type, count in checks.items():
status = "✓" if count > 0 else "✗"
print(f"\\\\{status\\\\} \\\\{check_type.capitalize()\\\\}: \\\\{count\\\\}")
# Check for orphaned nodes
orphaned = session.run("""
MATCH (n)
WHERE NOT (n)--()
RETURN labels(n) as labels, count(n) as count
""")
print("\n=== ORPHANED NODES ===")
for record in orphaned:
print(f"- \\\\{record['labels']\\\\}: \\\\{record['count']\\\\}")
return checks
finally:
driver.close()
Run validation¶
validate_stormspotter_data() ```_
Ressourcen¶
- Official Stormspotter Repository
- (LINK_7_)
- Neo4j Cypher Dokumentation
- (LINK_7_)
- (LINK_7)
- [BloodHound for Azure AD](LINK_7_
- (LINK_7)
--
*Dieses Betrügereiblatt bietet eine umfassende Referenz für die Verwendung von Stormspotter für die Sicherheitsbewertung von Azure. Stellen Sie immer sicher, dass Sie eine richtige Berechtigung haben, bevor Sie dieses Tool in jeder Umgebung verwenden. *