콘텐츠로 이동

Stormspotter 치트 시트

개요

Stormspotter는 Azure 및 Azure AD 객체를 그래핑하기 위한 Azure Red Team 도구입니다. 원래 Microsoft의 Azure Red Team에서 개발한 Stormspotter는 Azure 리소스, 자격 증명 및 권한 간의 관계를 수집하고 시각화하여 Azure 환경의 포괄적인 공격 표면 맵을 생성합니다. 이를 통해 보안 전문가는 Azure 환경에서 잠재적인 공격 경로와 권한 상승 기회를 식별할 수 있습니다.

⚠️ 경고: Stormspotter는 소유하거나 명시적인 허가를 받은 환경에서만 사용하세요. 무단 사용은 서비스 약관이나 현지 법률을 위반할 수 있습니다.

설치

필수 조건

# Install Python 3.7+
python3 --version

# Install pip
sudo apt update
sudo apt install python3-pip

# Install Git
sudo apt install git

# Install Neo4j (required for graph database)
wget -O - https://debian.neo4j.com/neotechnology.gpg.key|sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4'|sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j

# Install Java (required for Neo4j)
sudo apt install openjdk-11-jdk

Stormspotter 클론 및 설치

# Clone the repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter

# Install Python dependencies
pip3 install -r requirements.txt

# Alternative: Install with pipenv
pip3 install pipenv
pipenv install
pipenv shell

Docker 설치

# Clone repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter

# Build Docker containers
docker-compose build

# Start services
docker-compose up -d

# Check status
docker-compose ps

수동 Neo4j 설정

# Start Neo4j service
sudo systemctl start neo4j
sudo systemctl enable neo4j

# Check Neo4j status
sudo systemctl status neo4j

# Access Neo4j browser (default: http://localhost:7474)
# Default credentials: neo4j/neo4j (change on first login)

# Configure Neo4j for Stormspotter
sudo nano /etc/neo4j/neo4j.conf

# Uncomment and modify these lines:
# dbms.default_listen_address=0.0.0.0
# dbms.connector.bolt.listen_address=0.0.0.0:7687
# dbms.connector.http.listen_address=0.0.0.0:7474

# Restart Neo4j
sudo systemctl restart neo4j

구성

Azure 인증 설정

# Install Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb|sudo bash

# Login to Azure
az login

# List available subscriptions
az account list --output table

# Set specific subscription
az account set --subscription "subscription-id"

# Verify current context
az account show

서비스 주체 인증

# Create service principal for Stormspotter
az ad sp create-for-rbac --name "Stormspotter-SP" --role "Reader" --scopes "/subscriptions/your-subscription-id"

# Note the output:
# \\\\{
#   "appId": "app-id",
#   "displayName": "Stormspotter-SP",
#   "name": "app-id",
#   "password": "password",
#   "tenant": "tenant-id"
# \\\\}

# Set environment variables
export AZURE_CLIENT_ID="app-id"
export AZURE_CLIENT_SECRET="password"
export AZURE_TENANT_ID="tenant-id"
export AZURE_SUBSCRIPTION_ID="subscription-id"

Stormspotter 구성

# Create configuration file
cd Stormspotter
cp config/config.json.example config/config.json

# Edit configuration
nano config/config.json
\\\\{
  "neo4j": \\\\{
    "uri": "bolt://localhost:7687",
    "username": "neo4j",
    "password": "your-neo4j-password"
  \\\\},
  "azure": \\\\{
    "tenant_id": "your-tenant-id",
    "client_id": "your-client-id",
    "client_secret": "your-client-secret",
    "subscription_id": "your-subscription-id"
  \\\\},
  "logging": \\\\{
    "level": "INFO",
    "file": "logs/stormspotter.log"
  \\\\}
\\\\}

기본 사용법

데이터 수집

# Basic collection from current subscription
python3 stormspotter.py collect

# Collect from specific subscription
python3 stormspotter.py collect --subscription-id "subscription-id"

# Collect from specific tenant
python3 stormspotter.py collect --tenant-id "tenant-id"

# Collect with specific credentials
python3 stormspotter.py collect \
  --client-id "app-id" \
  --client-secret "password" \
  --tenant-id "tenant-id"

# Verbose collection
python3 stormspotter.py collect --verbose

# Collect specific resource types
python3 stormspotter.py collect --resource-types "VirtualMachines,StorageAccounts"

Neo4j로 데이터 가져오기

# Import collected data to Neo4j
python3 stormspotter.py import --data-file "output/azure_data.json"

# Import with custom Neo4j connection
python3 stormspotter.py import \
  --neo4j-uri "bolt://localhost:7687" \
  --neo4j-user "neo4j" \
  --neo4j-password "password"

# Clear existing data before import
python3 stormspotter.py import --clear-database

# Import multiple files
python3 stormspotter.py import --data-dir "output/"

웹 인터페이스

# Start Stormspotter web interface
python3 stormspotter.py web

# Start on specific port
python3 stormspotter.py web --port 8080

# Start with custom host
python3 stormspotter.py web --host 0.0.0.0 --port 8080

# Access web interface
# Default: http://localhost:5000

고급 수집

다중 테넌트 수집

# Collect from multiple tenants
python3 stormspotter.py collect-multi \
  --tenants "tenant1-id,tenant2-id,tenant3-id" \
  --output-dir "multi-tenant-data"

# Collect with different credentials per tenant
python3 stormspotter.py collect-multi \
  --config-file "multi-tenant-config.json"

포괄적 수집

# Collect all available data
python3 stormspotter.py collect \
  --comprehensive \
  --include-rbac \
  --include-resources \
  --include-identities \
  --include-policies

# Collect with custom scope
python3 stormspotter.py collect \
  --scope "management-groups,subscriptions,resource-groups" \
  --depth 5

# Collect with filters
python3 stormspotter.py collect \
  --exclude-resource-types "NetworkSecurityGroups" \
  --include-only-active-resources

예약된 수집

# Create collection script
cat > collect_azure.sh << 'EOF'
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
OUTPUT_DIR="/opt/stormspotter/data/$DATE"

mkdir -p "$OUTPUT_DIR"

python3 /opt/stormspotter/stormspotter.py collect \
  --output-dir "$OUTPUT_DIR" \
  --comprehensive \
  --verbose

# Import to Neo4j
python3 /opt/stormspotter/stormspotter.py import \
  --data-dir "$OUTPUT_DIR" \
  --clear-database

echo "Collection completed: $DATE"
EOF

chmod +x collect_azure.sh

# Schedule with cron (daily at 2 AM)
echo "0 2 * * * /opt/stormspotter/collect_azure.sh"|crontab -

데이터 분석

Neo4j Cypher 쿼리

-- Find all Azure AD users
MATCH (u:AzureADUser)
RETURN u.displayName, u.userPrincipalName, u.enabled

-- Find privileged users
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName, r.displayName

-- Find virtual machines and their permissions
MATCH (vm:VirtualMachine)
OPTIONAL MATCH (vm)<-[:HAS_PERMISSION]-(p:Principal)
RETURN vm.name, vm.location, collect(p.displayName) as permissions

-- Find storage accounts with public access
MATCH (sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN sa.name, sa.resourceGroup, sa.location

-- Find attack paths to high-value resources
MATCH path = (u:AzureADUser)-[*1..5]->(vm:VirtualMachine)
WHERE vm.name CONTAINS "prod" OR vm.name CONTAINS "critical"
RETURN path

-- Find users with multiple high-privilege roles
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName IN ["Global Administrator", "Privileged Role Administrator", "Security Administrator"]
WITH u, collect(r.displayName) as roles
WHERE size(roles) > 1
RETURN u.displayName, roles

-- Find resources accessible from internet
MATCH (r:Resource)-[:ALLOWS_ACCESS]->(nsg:NetworkSecurityGroup)
WHERE nsg.rules CONTAINS "0.0.0.0/0"
RETURN r.name, r.type, r.location

Python 분석 스크립트

#!/usr/bin/env python3
import json
from neo4j import GraphDatabase

class StormspotterAnalyzer:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def find_privileged_users(self):
        """Find users with administrative privileges"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                WHERE r.displayName CONTAINS "Admin"
                RETURN u.displayName as user, collect(r.displayName) as roles
            """)

            privileged_users = []
            for record in result:
                privileged_users.append(\\\\{
                    'user': record['user'],
                    'roles': record['roles']
                \\\\})

            return privileged_users

    def find_attack_paths(self, target_resource_type="VirtualMachine"):
        """Find potential attack paths to target resources"""
        with self.driver.session() as session:
            result = session.run(f"""
                MATCH path = (u:AzureADUser)-[*1..5]->(r:\\\\{target_resource_type\\\\})
                RETURN path, length(path) as path_length
                ORDER BY path_length
                LIMIT 50
            """)

            attack_paths = []
            for record in result:
                attack_paths.append(\\\\{
                    'path': record['path'],
                    'length': record['path_length']
                \\\\})

            return attack_paths

    def find_exposed_resources(self):
        """Find resources with potential internet exposure"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH (r:Resource)
                WHERE r.publicIPAddress IS NOT NULL
                   OR r.allowBlobPublicAccess = true
                   OR r.publicNetworkAccess = "Enabled"
                RETURN r.name, r.type, r.location, r.resourceGroup
            """)

            exposed_resources = []
            for record in result:
                exposed_resources.append(\\\\{
                    'name': record['r.name'],
                    'type': record['r.type'],
                    'location': record['r.location'],
                    'resource_group': record['r.resourceGroup']
                \\\\})

            return exposed_resources

    def generate_report(self):
        """Generate comprehensive security report"""
        report = \\\\{
            'privileged_users': self.find_privileged_users(),
            'attack_paths': self.find_attack_paths(),
            'exposed_resources': self.find_exposed_resources()
        \\\\}

        return report

# Usage
if __name__ == "__main__":
    analyzer = StormspotterAnalyzer("bolt://localhost:7687", "neo4j", "password")

    try:
        report = analyzer.generate_report()

        print("=== STORMSPOTTER SECURITY ANALYSIS ===")
        print(f"Privileged Users: \\\\{len(report['privileged_users'])\\\\}")
        print(f"Attack Paths Found: \\\\{len(report['attack_paths'])\\\\}")
        print(f"Exposed Resources: \\\\{len(report['exposed_resources'])\\\\}")

        # Save detailed report
        with open('stormspotter_analysis.json', 'w') as f:
            json.dump(report, f, indent=2, default=str)

        print("Detailed report saved to stormspotter_analysis.json")

    finally:
        analyzer.close()

공격 경로 분석

일반적인 공격 시나리오

-- Scenario 1: User to VM via role assignments
MATCH path = (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)-[:APPLIES_TO]->(rg:ResourceGroup)-[:CONTAINS]->(vm:VirtualMachine)
WHERE r.displayName IN ["Contributor", "Owner", "Virtual Machine Contributor"]
RETURN path

-- Scenario 2: Service Principal privilege escalation
MATCH path = (sp:ServicePrincipal)-[:HAS_PERMISSION]->(sub:Subscription)
WHERE sp.appRoles CONTAINS "Application.ReadWrite.All"
RETURN path

-- Scenario 3: Storage account access via managed identity
MATCH path = (mi:ManagedIdentity)-[:HAS_ACCESS]->(sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN path

-- Scenario 4: Cross-tenant access
MATCH path = (u:AzureADUser)-[:GUEST_IN]->(t:Tenant)-[:CONTAINS]->(r:Resource)
RETURN path

-- Scenario 5: Conditional access bypass
MATCH (u:AzureADUser)-[:SUBJECT_TO]->(ca:ConditionalAccessPolicy)
WHERE ca.state = "disabled" OR ca.conditions CONTAINS "trusted"
RETURN u, ca

위험 평가 쿼리

-- High-risk users (multiple admin roles)
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
WITH u, collect(r.displayName) as adminRoles
WHERE size(adminRoles) >= 2
RETURN u.displayName, adminRoles, size(adminRoles) as riskScore
ORDER BY riskScore DESC

-- Overprivileged service principals
MATCH (sp:ServicePrincipal)-[:HAS_PERMISSION]->(scope)
WITH sp, collect(scope) as scopes
WHERE size(scopes) > 10
RETURN sp.displayName, size(scopes) as permissionCount
ORDER BY permissionCount DESC

-- Resources without proper RBAC
MATCH (r:Resource)
WHERE NOT (r)<-[:HAS_PERMISSION]-(:Principal)
RETURN r.name, r.type, r.resourceGroup

-- Stale accounts with access
MATCH (u:AzureADUser)-[:HAS_ACCESS]->(r:Resource)
WHERE u.lastSignInDateTime < datetime() - duration(\\\\{days: 90\\\\})
RETURN u.displayName, u.lastSignInDateTime, r.name

자동화 및 통합

PowerShell 통합

# Stormspotter PowerShell wrapper
function Invoke-StormspotterCollection \\\\{
    param(
        [string]$SubscriptionId,
        [string]$TenantId,
        [string]$OutputPath = "C:\StormspotterData",
        [switch]$Comprehensive
    )

    # Ensure output directory exists
    if (!(Test-Path $OutputPath)) \\\\{
        New-Item -ItemType Directory -Path $OutputPath -Force
    \\\\}

    # Set timestamp
    $timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
    $outputDir = Join-Path $OutputPath $timestamp

    # Build command
    $cmd = "python3 stormspotter.py collect"

    if ($SubscriptionId) \\\\{
        $cmd += " --subscription-id `"$SubscriptionId`""
    \\\\}

    if ($TenantId) \\\\{
        $cmd += " --tenant-id `"$TenantId`""
    \\\\}

    if ($Comprehensive) \\\\{
        $cmd += " --comprehensive"
    \\\\}

    $cmd += " --output-dir `"$outputDir`""

    try \\\\{
        Write-Host "[+] Starting Stormspotter collection..."
        Invoke-Expression $cmd

        Write-Host "[+] Collection completed: $outputDir"

        # Import to Neo4j
        Write-Host "[+] Importing to Neo4j..."
        $importCmd = "python3 stormspotter.py import --data-dir `"$outputDir`""
        Invoke-Expression $importCmd

        Write-Host "[+] Import completed successfully"

        return $outputDir

    \\\\} catch \\\\{
        Write-Error "[-] Collection failed: $($_.Exception.Message)"
        return $null
    \\\\}
\\\\}

# Usage
$result = Invoke-StormspotterCollection -SubscriptionId "your-sub-id" -Comprehensive

SIEM 통합```python

#!/usr/bin/env python3 import json import requests from datetime import datetime

class StormspotterSIEMIntegration: def init(self, siem_endpoint, api_key): self.siem_endpoint = siem_endpoint self.api_key = api_key

def export_findings_to_siem(self, neo4j_uri, neo4j_user, neo4j_password):
    """Export Stormspotter findings to SIEM"""
    from neo4j import GraphDatabase

    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

    try:
        with driver.session() as session:
            # Query for security findings
            findings = []

            # High-privilege users
            result = session.run("""
                MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                WHERE r.displayName CONTAINS "Admin"
                RETURN u.displayName as user, r.displayName as role
            """)

            for record in result:
                findings.append(\\\\{
                    'timestamp': datetime.utcnow().isoformat(),
                    'source': 'Stormspotter',
                    'type': 'PrivilegedUser',
                    'severity': 'Medium',
                    'user': record['user'],
                    'role': record['role'],
                    'description': f"User \\\\{record['user']\\\\} has administrative role \\\\{record['role']\\\\}"
                \\\\})

            # Exposed resources
            result = session.run("""
                MATCH (r:Resource)
                WHERE r.publicIPAddress IS NOT NULL
                RETURN r.name as resource, r.type as type, r.publicIPAddress as ip
            """)

            for record in result:
                findings.append(\\\\{
                    'timestamp': datetime.utcnow().isoformat(),
                    'source': 'Stormspotter',
                    'type': 'ExposedResource',
                    'severity': 'High',
                    'resource': record['resource'],
                    'resource_type': record['type'],
                    'public_ip': record['ip'],
                    'description': f"Resource \\\\{record['resource']\\\\} is exposed to internet"
                \\\\})

            # Send to SIEM
            self.send_to_siem(findings)

    finally:
        driver.close()

def send_to_siem(self, findings):
    """Send findings to SIEM platform"""
    headers = \\\\{
        'Authorization': f'Bearer \\\\{self.api_key\\\\}',
        'Content-Type': 'application/json'
    \\\\}

    for finding in findings:
        try:
            response = requests.post(
                f"\\\\{self.siem_endpoint\\\\}/api/events",
                headers=headers,
                json=finding,
                timeout=30
            )

            if response.status_code == 200:
                print(f"[+] Sent finding: \\\\{finding['type']\\\\}")
            else:
                print(f"[-] Failed to send finding: \\\\{response.status_code\\\\}")

        except Exception as e:
            print(f"[-] Error sending finding: \\\\{e\\\\}")

Usage

siem = StormspotterSIEMIntegration(“https://your-siem.com”, “your-api-key”) siem.export_findings_to_siem(“bolt://localhost:7687”, “neo4j”, “password”)

```bash
# Check Azure CLI authentication
az account show

# Re-authenticate if needed
az login --tenant "tenant-id"

# Verify service principal
az ad sp show --id "client-id"

# Test permissions
az role assignment list --assignee "client-id"
```#### Neo4j 연결 문제
```bash
# Check Neo4j status
sudo systemctl status neo4j

# Check Neo4j logs
sudo journalctl -u neo4j -f

# Test connection
cypher-shell -u neo4j -p password

# Reset Neo4j password
sudo neo4j-admin set-initial-password newpassword
```#### 수집 실패
```bash
# Enable debug logging
export STORMSPOTTER_LOG_LEVEL=DEBUG

# Check API rate limits
az rest --method get --url "https://management.azure.com/subscriptions/your-sub-id/providers/Microsoft.Resources/resources?api-version=2021-04-01" --query "value[0]"

# Verify permissions
az role assignment list --scope "/subscriptions/your-sub-id" --assignee "your-principal-id"

# Test specific resource collection
python3 stormspotter.py collect --resource-types "VirtualMachines" --verbose
```#### 성능 문제
```bash
# Increase Neo4j memory
sudo nano /etc/neo4j/neo4j.conf

# Add/modify these settings:
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=4G
dbms.memory.pagecache.size=1G

# Restart Neo4j
sudo systemctl restart neo4j

# Optimize collection
python3 stormspotter.py collect --batch-size 100 --parallel-workers 5
```### 데이터 검증
```python
#!/usr/bin/env python3
def validate_stormspotter_data():
    """Validate collected Stormspotter data"""
    from neo4j import GraphDatabase

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

    try:
        with driver.session() as session:
            # Check data completeness
            checks = \\\\{
                'users': session.run("MATCH (u:AzureADUser) RETURN count(u) as count").single()['count'],
                'resources': session.run("MATCH (r:Resource) RETURN count(r) as count").single()['count'],
                'roles': session.run("MATCH (r:AzureADRole) RETURN count(r) as count").single()['count'],
                'relationships': session.run("MATCH ()-[r]->() RETURN count(r) as count").single()['count']
            \\\\}

            print("=== STORMSPOTTER DATA VALIDATION ===")
            for check_type, count in checks.items():
                status = "✓" if count > 0 else "✗"
                print(f"\\\\{status\\\\} \\\\{check_type.capitalize()\\\\}: \\\\{count\\\\}")

            # Check for orphaned nodes
            orphaned = session.run("""
                MATCH (n)
                WHERE NOT (n)--()
                RETURN labels(n) as labels, count(n) as count
            """)

            print("\n=== ORPHANED NODES ===")
            for record in orphaned:
                print(f"- \\\\{record['labels']\\\\}: \\\\{record['count']\\\\}")

            return checks

    finally:
        driver.close()

# Run validation
validate_stormspotter_data()
```## 리소스
https://github.com/Azure/Stormspotter- [공식 Stormspotter 저장소](
https://docs.microsoft.com/en-us/azure/security/- [Azure Red Team 문서](
https://neo4j.com/docs/cypher-manual/current/- [Neo4j Cypher 문서](
https://docs.microsoft.com/en-us/azure/role-based-access-control/- [Azure RBAC 문서](
https://docs.microsoft.com/en-us/azure/security/fundamentals/best-practices-and-patterns- [Azure 보안 모범 사례](
https://github.com/BloodHoundAD/AzureHound- [Azure AD용 BloodHound](
https://cloudbrothers.info/en/azure-attack-paths/- [Azure 공격 경로](

### 공통 문제

#### 인증 문제

---

*이 치트 시트는 Azure 보안 평가를 위해 Stormspotter를 사용하는 포괄적인 참조를 제공합니다. 항상 이 도구를 모든 환경에서 사용하기 전에 적절한 권한이 있는지 확인하세요.*