Stormspotter 치트 시트
개요
Stormspotter는 Azure 및 Azure AD 객체를 그래핑하기 위한 Azure Red Team 도구입니다. 원래 Microsoft의 Azure Red Team에서 개발한 Stormspotter는 Azure 리소스, 자격 증명 및 권한 간의 관계를 수집하고 시각화하여 Azure 환경의 포괄적인 공격 표면 맵을 생성합니다. 이를 통해 보안 전문가는 Azure 환경에서 잠재적인 공격 경로와 권한 상승 기회를 식별할 수 있습니다.
⚠️ 경고: Stormspotter는 소유하거나 명시적인 허가를 받은 환경에서만 사용하세요. 무단 사용은 서비스 약관이나 현지 법률을 위반할 수 있습니다.
설치
필수 조건
# Install Python 3.7+
python3 --version
# Install pip
sudo apt update
sudo apt install python3-pip
# Install Git
sudo apt install git
# Install Neo4j (required for graph database)
wget -O - https://debian.neo4j.com/neotechnology.gpg.key|sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4'|sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j
# Install Java (required for Neo4j)
sudo apt install openjdk-11-jdk
Stormspotter 클론 및 설치
# Clone the repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter
# Install Python dependencies
pip3 install -r requirements.txt
# Alternative: Install with pipenv
pip3 install pipenv
pipenv install
pipenv shell
Docker 설치
# Clone repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter
# Build Docker containers
docker-compose build
# Start services
docker-compose up -d
# Check status
docker-compose ps
수동 Neo4j 설정
# Start Neo4j service
sudo systemctl start neo4j
sudo systemctl enable neo4j
# Check Neo4j status
sudo systemctl status neo4j
# Access Neo4j browser (default: http://localhost:7474)
# Default credentials: neo4j/neo4j (change on first login)
# Configure Neo4j for Stormspotter
sudo nano /etc/neo4j/neo4j.conf
# Uncomment and modify these lines:
# dbms.default_listen_address=0.0.0.0
# dbms.connector.bolt.listen_address=0.0.0.0:7687
# dbms.connector.http.listen_address=0.0.0.0:7474
# Restart Neo4j
sudo systemctl restart neo4j
구성
Azure 인증 설정
# Install Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb|sudo bash
# Login to Azure
az login
# List available subscriptions
az account list --output table
# Set specific subscription
az account set --subscription "subscription-id"
# Verify current context
az account show
서비스 주체 인증
# Create service principal for Stormspotter
az ad sp create-for-rbac --name "Stormspotter-SP" --role "Reader" --scopes "/subscriptions/your-subscription-id"
# Note the output:
# \\\\{
# "appId": "app-id",
# "displayName": "Stormspotter-SP",
# "name": "app-id",
# "password": "password",
# "tenant": "tenant-id"
# \\\\}
# Set environment variables
export AZURE_CLIENT_ID="app-id"
export AZURE_CLIENT_SECRET="password"
export AZURE_TENANT_ID="tenant-id"
export AZURE_SUBSCRIPTION_ID="subscription-id"
Stormspotter 구성
# Create configuration file
cd Stormspotter
cp config/config.json.example config/config.json
# Edit configuration
nano config/config.json
\\\\{
"neo4j": \\\\{
"uri": "bolt://localhost:7687",
"username": "neo4j",
"password": "your-neo4j-password"
\\\\},
"azure": \\\\{
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"subscription_id": "your-subscription-id"
\\\\},
"logging": \\\\{
"level": "INFO",
"file": "logs/stormspotter.log"
\\\\}
\\\\}
기본 사용법
데이터 수집
# Basic collection from current subscription
python3 stormspotter.py collect
# Collect from specific subscription
python3 stormspotter.py collect --subscription-id "subscription-id"
# Collect from specific tenant
python3 stormspotter.py collect --tenant-id "tenant-id"
# Collect with specific credentials
python3 stormspotter.py collect \
--client-id "app-id" \
--client-secret "password" \
--tenant-id "tenant-id"
# Verbose collection
python3 stormspotter.py collect --verbose
# Collect specific resource types
python3 stormspotter.py collect --resource-types "VirtualMachines,StorageAccounts"
Neo4j로 데이터 가져오기
# Import collected data to Neo4j
python3 stormspotter.py import --data-file "output/azure_data.json"
# Import with custom Neo4j connection
python3 stormspotter.py import \
--neo4j-uri "bolt://localhost:7687" \
--neo4j-user "neo4j" \
--neo4j-password "password"
# Clear existing data before import
python3 stormspotter.py import --clear-database
# Import multiple files
python3 stormspotter.py import --data-dir "output/"
웹 인터페이스
# Start Stormspotter web interface
python3 stormspotter.py web
# Start on specific port
python3 stormspotter.py web --port 8080
# Start with custom host
python3 stormspotter.py web --host 0.0.0.0 --port 8080
# Access web interface
# Default: http://localhost:5000
고급 수집
다중 테넌트 수집
# Collect from multiple tenants
python3 stormspotter.py collect-multi \
--tenants "tenant1-id,tenant2-id,tenant3-id" \
--output-dir "multi-tenant-data"
# Collect with different credentials per tenant
python3 stormspotter.py collect-multi \
--config-file "multi-tenant-config.json"
포괄적 수집
# Collect all available data
python3 stormspotter.py collect \
--comprehensive \
--include-rbac \
--include-resources \
--include-identities \
--include-policies
# Collect with custom scope
python3 stormspotter.py collect \
--scope "management-groups,subscriptions,resource-groups" \
--depth 5
# Collect with filters
python3 stormspotter.py collect \
--exclude-resource-types "NetworkSecurityGroups" \
--include-only-active-resources
예약된 수집
# Create collection script
cat > collect_azure.sh << 'EOF'
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
OUTPUT_DIR="/opt/stormspotter/data/$DATE"
mkdir -p "$OUTPUT_DIR"
python3 /opt/stormspotter/stormspotter.py collect \
--output-dir "$OUTPUT_DIR" \
--comprehensive \
--verbose
# Import to Neo4j
python3 /opt/stormspotter/stormspotter.py import \
--data-dir "$OUTPUT_DIR" \
--clear-database
echo "Collection completed: $DATE"
EOF
chmod +x collect_azure.sh
# Schedule with cron (daily at 2 AM)
echo "0 2 * * * /opt/stormspotter/collect_azure.sh"|crontab -
데이터 분석
Neo4j Cypher 쿼리
-- Find all Azure AD users
MATCH (u:AzureADUser)
RETURN u.displayName, u.userPrincipalName, u.enabled
-- Find privileged users
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName, r.displayName
-- Find virtual machines and their permissions
MATCH (vm:VirtualMachine)
OPTIONAL MATCH (vm)<-[:HAS_PERMISSION]-(p:Principal)
RETURN vm.name, vm.location, collect(p.displayName) as permissions
-- Find storage accounts with public access
MATCH (sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN sa.name, sa.resourceGroup, sa.location
-- Find attack paths to high-value resources
MATCH path = (u:AzureADUser)-[*1..5]->(vm:VirtualMachine)
WHERE vm.name CONTAINS "prod" OR vm.name CONTAINS "critical"
RETURN path
-- Find users with multiple high-privilege roles
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName IN ["Global Administrator", "Privileged Role Administrator", "Security Administrator"]
WITH u, collect(r.displayName) as roles
WHERE size(roles) > 1
RETURN u.displayName, roles
-- Find resources accessible from internet
MATCH (r:Resource)-[:ALLOWS_ACCESS]->(nsg:NetworkSecurityGroup)
WHERE nsg.rules CONTAINS "0.0.0.0/0"
RETURN r.name, r.type, r.location
Python 분석 스크립트
#!/usr/bin/env python3
import json
from neo4j import GraphDatabase
class StormspotterAnalyzer:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def find_privileged_users(self):
"""Find users with administrative privileges"""
with self.driver.session() as session:
result = session.run("""
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName as user, collect(r.displayName) as roles
""")
privileged_users = []
for record in result:
privileged_users.append(\\\\{
'user': record['user'],
'roles': record['roles']
\\\\})
return privileged_users
def find_attack_paths(self, target_resource_type="VirtualMachine"):
"""Find potential attack paths to target resources"""
with self.driver.session() as session:
result = session.run(f"""
MATCH path = (u:AzureADUser)-[*1..5]->(r:\\\\{target_resource_type\\\\})
RETURN path, length(path) as path_length
ORDER BY path_length
LIMIT 50
""")
attack_paths = []
for record in result:
attack_paths.append(\\\\{
'path': record['path'],
'length': record['path_length']
\\\\})
return attack_paths
def find_exposed_resources(self):
"""Find resources with potential internet exposure"""
with self.driver.session() as session:
result = session.run("""
MATCH (r:Resource)
WHERE r.publicIPAddress IS NOT NULL
OR r.allowBlobPublicAccess = true
OR r.publicNetworkAccess = "Enabled"
RETURN r.name, r.type, r.location, r.resourceGroup
""")
exposed_resources = []
for record in result:
exposed_resources.append(\\\\{
'name': record['r.name'],
'type': record['r.type'],
'location': record['r.location'],
'resource_group': record['r.resourceGroup']
\\\\})
return exposed_resources
def generate_report(self):
"""Generate comprehensive security report"""
report = \\\\{
'privileged_users': self.find_privileged_users(),
'attack_paths': self.find_attack_paths(),
'exposed_resources': self.find_exposed_resources()
\\\\}
return report
# Usage
if __name__ == "__main__":
analyzer = StormspotterAnalyzer("bolt://localhost:7687", "neo4j", "password")
try:
report = analyzer.generate_report()
print("=== STORMSPOTTER SECURITY ANALYSIS ===")
print(f"Privileged Users: \\\\{len(report['privileged_users'])\\\\}")
print(f"Attack Paths Found: \\\\{len(report['attack_paths'])\\\\}")
print(f"Exposed Resources: \\\\{len(report['exposed_resources'])\\\\}")
# Save detailed report
with open('stormspotter_analysis.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
print("Detailed report saved to stormspotter_analysis.json")
finally:
analyzer.close()
공격 경로 분석
일반적인 공격 시나리오
-- Scenario 1: User to VM via role assignments
MATCH path = (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)-[:APPLIES_TO]->(rg:ResourceGroup)-[:CONTAINS]->(vm:VirtualMachine)
WHERE r.displayName IN ["Contributor", "Owner", "Virtual Machine Contributor"]
RETURN path
-- Scenario 2: Service Principal privilege escalation
MATCH path = (sp:ServicePrincipal)-[:HAS_PERMISSION]->(sub:Subscription)
WHERE sp.appRoles CONTAINS "Application.ReadWrite.All"
RETURN path
-- Scenario 3: Storage account access via managed identity
MATCH path = (mi:ManagedIdentity)-[:HAS_ACCESS]->(sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN path
-- Scenario 4: Cross-tenant access
MATCH path = (u:AzureADUser)-[:GUEST_IN]->(t:Tenant)-[:CONTAINS]->(r:Resource)
RETURN path
-- Scenario 5: Conditional access bypass
MATCH (u:AzureADUser)-[:SUBJECT_TO]->(ca:ConditionalAccessPolicy)
WHERE ca.state = "disabled" OR ca.conditions CONTAINS "trusted"
RETURN u, ca
위험 평가 쿼리
-- High-risk users (multiple admin roles)
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
WITH u, collect(r.displayName) as adminRoles
WHERE size(adminRoles) >= 2
RETURN u.displayName, adminRoles, size(adminRoles) as riskScore
ORDER BY riskScore DESC
-- Overprivileged service principals
MATCH (sp:ServicePrincipal)-[:HAS_PERMISSION]->(scope)
WITH sp, collect(scope) as scopes
WHERE size(scopes) > 10
RETURN sp.displayName, size(scopes) as permissionCount
ORDER BY permissionCount DESC
-- Resources without proper RBAC
MATCH (r:Resource)
WHERE NOT (r)<-[:HAS_PERMISSION]-(:Principal)
RETURN r.name, r.type, r.resourceGroup
-- Stale accounts with access
MATCH (u:AzureADUser)-[:HAS_ACCESS]->(r:Resource)
WHERE u.lastSignInDateTime < datetime() - duration(\\\\{days: 90\\\\})
RETURN u.displayName, u.lastSignInDateTime, r.name
자동화 및 통합
PowerShell 통합
# Stormspotter PowerShell wrapper
function Invoke-StormspotterCollection \\\\{
param(
[string]$SubscriptionId,
[string]$TenantId,
[string]$OutputPath = "C:\StormspotterData",
[switch]$Comprehensive
)
# Ensure output directory exists
if (!(Test-Path $OutputPath)) \\\\{
New-Item -ItemType Directory -Path $OutputPath -Force
\\\\}
# Set timestamp
$timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
$outputDir = Join-Path $OutputPath $timestamp
# Build command
$cmd = "python3 stormspotter.py collect"
if ($SubscriptionId) \\\\{
$cmd += " --subscription-id `"$SubscriptionId`""
\\\\}
if ($TenantId) \\\\{
$cmd += " --tenant-id `"$TenantId`""
\\\\}
if ($Comprehensive) \\\\{
$cmd += " --comprehensive"
\\\\}
$cmd += " --output-dir `"$outputDir`""
try \\\\{
Write-Host "[+] Starting Stormspotter collection..."
Invoke-Expression $cmd
Write-Host "[+] Collection completed: $outputDir"
# Import to Neo4j
Write-Host "[+] Importing to Neo4j..."
$importCmd = "python3 stormspotter.py import --data-dir `"$outputDir`""
Invoke-Expression $importCmd
Write-Host "[+] Import completed successfully"
return $outputDir
\\\\} catch \\\\{
Write-Error "[-] Collection failed: $($_.Exception.Message)"
return $null
\\\\}
\\\\}
# Usage
$result = Invoke-StormspotterCollection -SubscriptionId "your-sub-id" -Comprehensive
SIEM 통합```python
#!/usr/bin/env python3 import json import requests from datetime import datetime
class StormspotterSIEMIntegration: def init(self, siem_endpoint, api_key): self.siem_endpoint = siem_endpoint self.api_key = api_key
def export_findings_to_siem(self, neo4j_uri, neo4j_user, neo4j_password):
"""Export Stormspotter findings to SIEM"""
from neo4j import GraphDatabase
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
try:
with driver.session() as session:
# Query for security findings
findings = []
# High-privilege users
result = session.run("""
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName as user, r.displayName as role
""")
for record in result:
findings.append(\\\\{
'timestamp': datetime.utcnow().isoformat(),
'source': 'Stormspotter',
'type': 'PrivilegedUser',
'severity': 'Medium',
'user': record['user'],
'role': record['role'],
'description': f"User \\\\{record['user']\\\\} has administrative role \\\\{record['role']\\\\}"
\\\\})
# Exposed resources
result = session.run("""
MATCH (r:Resource)
WHERE r.publicIPAddress IS NOT NULL
RETURN r.name as resource, r.type as type, r.publicIPAddress as ip
""")
for record in result:
findings.append(\\\\{
'timestamp': datetime.utcnow().isoformat(),
'source': 'Stormspotter',
'type': 'ExposedResource',
'severity': 'High',
'resource': record['resource'],
'resource_type': record['type'],
'public_ip': record['ip'],
'description': f"Resource \\\\{record['resource']\\\\} is exposed to internet"
\\\\})
# Send to SIEM
self.send_to_siem(findings)
finally:
driver.close()
def send_to_siem(self, findings):
"""Send findings to SIEM platform"""
headers = \\\\{
'Authorization': f'Bearer \\\\{self.api_key\\\\}',
'Content-Type': 'application/json'
\\\\}
for finding in findings:
try:
response = requests.post(
f"\\\\{self.siem_endpoint\\\\}/api/events",
headers=headers,
json=finding,
timeout=30
)
if response.status_code == 200:
print(f"[+] Sent finding: \\\\{finding['type']\\\\}")
else:
print(f"[-] Failed to send finding: \\\\{response.status_code\\\\}")
except Exception as e:
print(f"[-] Error sending finding: \\\\{e\\\\}")
Usage
siem = StormspotterSIEMIntegration(“https://your-siem.com”, “your-api-key”) siem.export_findings_to_siem(“bolt://localhost:7687”, “neo4j”, “password”)
```bash
# Check Azure CLI authentication
az account show
# Re-authenticate if needed
az login --tenant "tenant-id"
# Verify service principal
az ad sp show --id "client-id"
# Test permissions
az role assignment list --assignee "client-id"
```#### Neo4j 연결 문제
```bash
# Check Neo4j status
sudo systemctl status neo4j
# Check Neo4j logs
sudo journalctl -u neo4j -f
# Test connection
cypher-shell -u neo4j -p password
# Reset Neo4j password
sudo neo4j-admin set-initial-password newpassword
```#### 수집 실패
```bash
# Enable debug logging
export STORMSPOTTER_LOG_LEVEL=DEBUG
# Check API rate limits
az rest --method get --url "https://management.azure.com/subscriptions/your-sub-id/providers/Microsoft.Resources/resources?api-version=2021-04-01" --query "value[0]"
# Verify permissions
az role assignment list --scope "/subscriptions/your-sub-id" --assignee "your-principal-id"
# Test specific resource collection
python3 stormspotter.py collect --resource-types "VirtualMachines" --verbose
```#### 성능 문제
```bash
# Increase Neo4j memory
sudo nano /etc/neo4j/neo4j.conf
# Add/modify these settings:
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=4G
dbms.memory.pagecache.size=1G
# Restart Neo4j
sudo systemctl restart neo4j
# Optimize collection
python3 stormspotter.py collect --batch-size 100 --parallel-workers 5
```### 데이터 검증
```python
#!/usr/bin/env python3
def validate_stormspotter_data():
"""Validate collected Stormspotter data"""
from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
try:
with driver.session() as session:
# Check data completeness
checks = \\\\{
'users': session.run("MATCH (u:AzureADUser) RETURN count(u) as count").single()['count'],
'resources': session.run("MATCH (r:Resource) RETURN count(r) as count").single()['count'],
'roles': session.run("MATCH (r:AzureADRole) RETURN count(r) as count").single()['count'],
'relationships': session.run("MATCH ()-[r]->() RETURN count(r) as count").single()['count']
\\\\}
print("=== STORMSPOTTER DATA VALIDATION ===")
for check_type, count in checks.items():
status = "✓" if count > 0 else "✗"
print(f"\\\\{status\\\\} \\\\{check_type.capitalize()\\\\}: \\\\{count\\\\}")
# Check for orphaned nodes
orphaned = session.run("""
MATCH (n)
WHERE NOT (n)--()
RETURN labels(n) as labels, count(n) as count
""")
print("\n=== ORPHANED NODES ===")
for record in orphaned:
print(f"- \\\\{record['labels']\\\\}: \\\\{record['count']\\\\}")
return checks
finally:
driver.close()
# Run validation
validate_stormspotter_data()
```## 리소스
https://github.com/Azure/Stormspotter- [공식 Stormspotter 저장소](
https://docs.microsoft.com/en-us/azure/security/- [Azure Red Team 문서](
https://neo4j.com/docs/cypher-manual/current/- [Neo4j Cypher 문서](
https://docs.microsoft.com/en-us/azure/role-based-access-control/- [Azure RBAC 문서](
https://docs.microsoft.com/en-us/azure/security/fundamentals/best-practices-and-patterns- [Azure 보안 모범 사례](
https://github.com/BloodHoundAD/AzureHound- [Azure AD용 BloodHound](
https://cloudbrothers.info/en/azure-attack-paths/- [Azure 공격 경로](
### 공통 문제
#### 인증 문제
---
*이 치트 시트는 Azure 보안 평가를 위해 Stormspotter를 사용하는 포괄적인 참조를 제공합니다. 항상 이 도구를 모든 환경에서 사용하기 전에 적절한 권한이 있는지 확인하세요.*