Zum Inhalt

Stormspotter Cheat Sheet

generieren

Überblick

Stormspotter ist ein Azure Red Team-Tool zur Graphik von Azure und Azure AD Objekten. Ursprünglich vom Microsoft Azure Red Team entwickelt, erstellt Stormspotter eine umfassende Angriffs-Oberflächenkarte von Azure-Umgebungen, indem er Beziehungen zwischen Azure-Ressourcen, Identitäten und Berechtigungen sammelt und visualisiert. Es hilft Sicherheitsexperten, potenzielle Angriffspfade und Privileg Eskalationsmöglichkeiten in Azure-Umgebungen zu identifizieren.

ZEIT Warning: Verwenden Sie Stormspotter nur in Umgebungen, die Sie besitzen oder eine ausdrückliche Erlaubnis zum Testen haben. Unberechtigte Nutzung kann gegen Nutzungsbedingungen oder lokale Gesetze verstoßen.

Installation

Voraussetzungen

```bash

Install Python 3.7+

python3 --version

Install pip

sudo apt update sudo apt install python3-pip

Install Git

sudo apt install git

Install Neo4j (required for graph database)

wget -O - https://debian.neo4j.com/neotechnology.gpg.key|sudo apt-key add - echo 'deb https://debian.neo4j.com stable 4.4'|sudo tee /etc/apt/sources.list.d/neo4j.list sudo apt update sudo apt install neo4j

Install Java (required for Neo4j)

sudo apt install openjdk-11-jdk ```_

Clone und Install Stormspotter

```bash

Clone the repository

git clone https://github.com/Azure/Stormspotter.git cd Stormspotter

Install Python dependencies

pip3 install -r requirements.txt

Alternative: Install with pipenv

pip3 install pipenv pipenv install pipenv shell ```_

Docker Installation

```bash

Clone repository

git clone https://github.com/Azure/Stormspotter.git cd Stormspotter

Build Docker containers

docker-compose build

Start services

docker-compose up -d

Check status

docker-compose ps ```_

Bedienungsanleitung Neo4j Setup

```bash

Start Neo4j service

sudo systemctl start neo4j sudo systemctl enable neo4j

Check Neo4j status

sudo systemctl status neo4j

Access Neo4j browser (default: http://localhost:7474)

Default credentials: neo4j/neo4j (change on first login)

Configure Neo4j for Stormspotter

sudo nano /etc/neo4j/neo4j.conf

Uncomment and modify these lines:

dbms.default_listen_address=0.0.0.0

dbms.connector.bolt.listen_address=0.0.0.0:7687

dbms.connector.http.listen_address=0.0.0.0:7474

Restart Neo4j

sudo systemctl restart neo4j ```_

Konfiguration

Azure Authentication Setup

```bash

Install Azure CLI

curl -sL https://aka.ms/InstallAzureCLIDeb|sudo bash

Login to Azure

az login

List available subscriptions

az account list --output table

Set specific subscription

az account set --subscription "subscription-id"

Verify current context

az account show ```_

Service Hauptauthentifizierung

```bash

Create service principal for Stormspotter

az ad sp create-for-rbac --name "Stormspotter-SP" --role "Reader" --scopes "/subscriptions/your-subscription-id"

Note the output:

\\{

"appId": "app-id",

"displayName": "Stormspotter-SP",

"name": "app-id",

"password": "password",

"tenant": "tenant-id"

\\}

Set environment variables

export AZURE_CLIENT_ID="app-id" export AZURE_CLIENT_SECRET="password" export AZURE_TENANT_ID="tenant-id" export AZURE_SUBSCRIPTION_ID="subscription-id" ```_

Stormspotter Konfiguration

```bash

Create configuration file

cd Stormspotter cp config/config.json.example config/config.json

Edit configuration

nano config/config.json _json \\{ "neo4j": \\{ "uri": "bolt://localhost:7687", "username": "neo4j", "password": "your-neo4j-password" \\}, "azure": \\{ "tenant_id": "your-tenant-id", "client_id": "your-client-id", "client_secret": "your-client-secret", "subscription_id": "your-subscription-id" \\}, "logging": \\{ "level": "INFO", "file": "logs/stormspotter.log" \\} \\} ```_

Basisnutzung

Datenerhebung

```bash

Basic collection from current subscription

python3 stormspotter.py collect

Collect from specific subscription

python3 stormspotter.py collect --subscription-id "subscription-id"

Collect from specific tenant

python3 stormspotter.py collect --tenant-id "tenant-id"

Collect with specific credentials

python3 stormspotter.py collect \ --client-id "app-id" \ --client-secret "password" \ --tenant-id "tenant-id"

Verbose collection

python3 stormspotter.py collect --verbose

Collect specific resource types

python3 stormspotter.py collect --resource-types "VirtualMachines,StorageAccounts" ```_

Datenimport nach Neo4j

```bash

Import collected data to Neo4j

python3 stormspotter.py import --data-file "output/azure_data.json"

Import with custom Neo4j connection

python3 stormspotter.py import \ --neo4j-uri "bolt://localhost:7687" \ --neo4j-user "neo4j" \ --neo4j-password "password"

Clear existing data before import

python3 stormspotter.py import --clear-database

Import multiple files

python3 stormspotter.py import --data-dir "output/" ```_

Web Interface

```bash

Start Stormspotter web interface

python3 stormspotter.py web

Start on specific port

python3 stormspotter.py web --port 8080

Start with custom host

python3 stormspotter.py web --host 0.0.0.0 --port 8080

Access web interface

Default: http://localhost:5000

```_

Erweiterte Sammlung

Multi-Tenant Sammlung

```bash

Collect from multiple tenants

python3 stormspotter.py collect-multi \ --tenants "tenant1-id,tenant2-id,tenant3-id" \ --output-dir "multi-tenant-data"

Collect with different credentials per tenant

python3 stormspotter.py collect-multi \ --config-file "multi-tenant-config.json" ```_

Umfassende Sammlung

```bash

Collect all available data

python3 stormspotter.py collect \ --comprehensive \ --include-rbac \ --include-resources \ --include-identities \ --include-policies

Collect with custom scope

python3 stormspotter.py collect \ --scope "management-groups,subscriptions,resource-groups" \ --depth 5

Collect with filters

python3 stormspotter.py collect \ --exclude-resource-types "NetworkSecurityGroups" \ --include-only-active-resources ```_

Geplante Erhebung

```bash

Create collection script

cat > collect_azure.sh << 'EOF'

!/bin/bash

DATE=$(date +%Y%m%d_%H%M%S) OUTPUT_DIR="/opt/stormspotter/data/$DATE"

mkdir -p "$OUTPUT_DIR"

python3 /opt/stormspotter/stormspotter.py collect \ --output-dir "$OUTPUT_DIR" \ --comprehensive \ --verbose

Import to Neo4j

python3 /opt/stormspotter/stormspotter.py import \ --data-dir "$OUTPUT_DIR" \ --clear-database

echo "Collection completed: $DATE" EOF

chmod +x collect_azure.sh

Schedule with cron (daily at 2 AM)

echo "0 2 * * * /opt/stormspotter/collect_azure.sh"|crontab - ```_

Datenanalyse

Neo4j Cypher Abfragen

```cypher -- Find all Azure AD users MATCH (u:AzureADUser) RETURN u.displayName, u.userPrincipalName, u.enabled

-- Find privileged users MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole) WHERE r.displayName CONTAINS "Admin" RETURN u.displayName, r.displayName

-- Find virtual machines and their permissions MATCH (vm:VirtualMachine) OPTIONAL MATCH (vm)<-[:HAS_PERMISSION]-(p:Principal) RETURN vm.name, vm.location, collect(p.displayName) as permissions

-- Find storage accounts with public access MATCH (sa:StorageAccount) WHERE sa.allowBlobPublicAccess = true RETURN sa.name, sa.resourceGroup, sa.location

-- Find attack paths to high-value resources MATCH path = (u:AzureADUser)-[*1..5]->(vm:VirtualMachine) WHERE vm.name CONTAINS "prod" OR vm.name CONTAINS "critical" RETURN path

-- Find users with multiple high-privilege roles MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole) WHERE r.displayName IN ["Global Administrator", "Privileged Role Administrator", "Security Administrator"] WITH u, collect(r.displayName) as roles WHERE size(roles) > 1 RETURN u.displayName, roles

-- Find resources accessible from internet MATCH (r:Resource)-[:ALLOWS_ACCESS]->(nsg:NetworkSecurityGroup) WHERE nsg.rules CONTAINS "0.0.0.0/0" RETURN r.name, r.type, r.location ```_

Python Analyse Scripts

```python

!/usr/bin/env python3

import json from neo4j import GraphDatabase

class StormspotterAnalyzer: def init(self, uri, user, password): self.driver = GraphDatabase.driver(uri, auth=(user, password))

def close(self):
    self.driver.close()

def find_privileged_users(self):
    """Find users with administrative privileges"""
    with self.driver.session() as session:
        result = session.run("""
            MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
            WHERE r.displayName CONTAINS "Admin"
            RETURN u.displayName as user, collect(r.displayName) as roles
        """)

        privileged_users = []
        for record in result:
            privileged_users.append(\\\\{
                'user': record['user'],
                'roles': record['roles']
            \\\\})

        return privileged_users

def find_attack_paths(self, target_resource_type="VirtualMachine"):
    """Find potential attack paths to target resources"""
    with self.driver.session() as session:
        result = session.run(f"""
            MATCH path = (u:AzureADUser)-[*1..5]->(r:\\\\{target_resource_type\\\\})
            RETURN path, length(path) as path_length
            ORDER BY path_length
            LIMIT 50
        """)

        attack_paths = []
        for record in result:
            attack_paths.append(\\\\{
                'path': record['path'],
                'length': record['path_length']
            \\\\})

        return attack_paths

def find_exposed_resources(self):
    """Find resources with potential internet exposure"""
    with self.driver.session() as session:
        result = session.run("""
            MATCH (r:Resource)
            WHERE r.publicIPAddress IS NOT NULL
               OR r.allowBlobPublicAccess = true
               OR r.publicNetworkAccess = "Enabled"
            RETURN r.name, r.type, r.location, r.resourceGroup
        """)

        exposed_resources = []
        for record in result:
            exposed_resources.append(\\\\{
                'name': record['r.name'],
                'type': record['r.type'],
                'location': record['r.location'],
                'resource_group': record['r.resourceGroup']
            \\\\})

        return exposed_resources

def generate_report(self):
    """Generate comprehensive security report"""
    report = \\\\{
        'privileged_users': self.find_privileged_users(),
        'attack_paths': self.find_attack_paths(),
        'exposed_resources': self.find_exposed_resources()
    \\\\}

    return report

Usage

if name == "main": analyzer = StormspotterAnalyzer("bolt://localhost:7687", "neo4j", "password")

try:
    report = analyzer.generate_report()

    print("=== STORMSPOTTER SECURITY ANALYSIS ===")
    print(f"Privileged Users: \\\\{len(report['privileged_users'])\\\\}")
    print(f"Attack Paths Found: \\\\{len(report['attack_paths'])\\\\}")
    print(f"Exposed Resources: \\\\{len(report['exposed_resources'])\\\\}")

    # Save detailed report
    with open('stormspotter_analysis.json', 'w') as f:
        json.dump(report, f, indent=2, default=str)

    print("Detailed report saved to stormspotter_analysis.json")

finally:
    analyzer.close()

```_

Analyse von Pfaden

Gemeinsame Angriffsszenarien

```cypher -- Scenario 1: User to VM via role assignments MATCH path = (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)-[:APPLIES_TO]->(rg:ResourceGroup)-[:CONTAINS]->(vm:VirtualMachine) WHERE r.displayName IN ["Contributor", "Owner", "Virtual Machine Contributor"] RETURN path

-- Scenario 2: Service Principal privilege escalation MATCH path = (sp:ServicePrincipal)-[:HAS_PERMISSION]->(sub:Subscription) WHERE sp.appRoles CONTAINS "Application.ReadWrite.All" RETURN path

-- Scenario 3: Storage account access via managed identity MATCH path = (mi:ManagedIdentity)-[:HAS_ACCESS]->(sa:StorageAccount) WHERE sa.allowBlobPublicAccess = true RETURN path

-- Scenario 4: Cross-tenant access MATCH path = (u:AzureADUser)-[:GUEST_IN]->(t:Tenant)-[:CONTAINS]->(r:Resource) RETURN path

-- Scenario 5: Conditional access bypass MATCH (u:AzureADUser)-[:SUBJECT_TO]->(ca:ConditionalAccessPolicy) WHERE ca.state = "disabled" OR ca.conditions CONTAINS "trusted" RETURN u, ca ```_

Risikobewertungsanfragen

```cypher -- High-risk users (multiple admin roles) MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole) WHERE r.displayName CONTAINS "Admin" WITH u, collect(r.displayName) as adminRoles WHERE size(adminRoles) >= 2 RETURN u.displayName, adminRoles, size(adminRoles) as riskScore ORDER BY riskScore DESC

-- Overprivileged service principals MATCH (sp:ServicePrincipal)-[:HAS_PERMISSION]->(scope) WITH sp, collect(scope) as scopes WHERE size(scopes) > 10 RETURN sp.displayName, size(scopes) as permissionCount ORDER BY permissionCount DESC

-- Resources without proper RBAC MATCH (r:Resource) WHERE NOT (r)<-[:HAS_PERMISSION]-(:Principal) RETURN r.name, r.type, r.resourceGroup

-- Stale accounts with access MATCH (u:AzureADUser)-[:HAS_ACCESS]->(r:Resource) WHERE u.lastSignInDateTime < datetime() - duration(\\{days: 90\\}) RETURN u.displayName, u.lastSignInDateTime, r.name ```_

Automatisierung und Integration

Integration von PowerShell

```powershell

Stormspotter PowerShell wrapper

function Invoke-StormspotterCollection \\{ param( [string]$SubscriptionId, [string]$TenantId, [string]$OutputPath = "C:\StormspotterData", [switch]$Comprehensive )

# Ensure output directory exists
if (!(Test-Path $OutputPath)) \\\\{
    New-Item -ItemType Directory -Path $OutputPath -Force
\\\\}

# Set timestamp
$timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
$outputDir = Join-Path $OutputPath $timestamp

# Build command
$cmd = "python3 stormspotter.py collect"

if ($SubscriptionId) \\\\{
    $cmd += " --subscription-id `"$SubscriptionId`""
\\\\}

if ($TenantId) \\\\{
    $cmd += " --tenant-id `"$TenantId`""
\\\\}

if ($Comprehensive) \\\\{
    $cmd += " --comprehensive"
\\\\}

$cmd += " --output-dir `"$outputDir`""

try \\\\{
    Write-Host "[+] Starting Stormspotter collection..."
    Invoke-Expression $cmd

    Write-Host "[+] Collection completed: $outputDir"

    # Import to Neo4j
    Write-Host "[+] Importing to Neo4j..."
    $importCmd = "python3 stormspotter.py import --data-dir `"$outputDir`""
    Invoke-Expression $importCmd

    Write-Host "[+] Import completed successfully"

    return $outputDir

\\\\} catch \\\\{
    Write-Error "[-] Collection failed: $($_.Exception.Message)"
    return $null
\\\\}

\\}

Usage

$result = Invoke-StormspotterCollection -SubscriptionId "your-sub-id" -Comprehensive ```_

SIEM Integration

```python

!/usr/bin/env python3

import json import requests from datetime import datetime

class StormspotterSIEMIntegration: def init(self, siem_endpoint, api_key): self.siem_endpoint = siem_endpoint self.api_key = api_key

def export_findings_to_siem(self, neo4j_uri, neo4j_user, neo4j_password):
    """Export Stormspotter findings to SIEM"""
    from neo4j import GraphDatabase

    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

    try:
        with driver.session() as session:
            # Query for security findings
            findings = []

            # High-privilege users
            result = session.run("""
                MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                WHERE r.displayName CONTAINS "Admin"
                RETURN u.displayName as user, r.displayName as role
            """)

            for record in result:
                findings.append(\\\\{
                    'timestamp': datetime.utcnow().isoformat(),
                    'source': 'Stormspotter',
                    'type': 'PrivilegedUser',
                    'severity': 'Medium',
                    'user': record['user'],
                    'role': record['role'],
                    'description': f"User \\\\{record['user']\\\\} has administrative role \\\\{record['role']\\\\}"
                \\\\})

            # Exposed resources
            result = session.run("""
                MATCH (r:Resource)
                WHERE r.publicIPAddress IS NOT NULL
                RETURN r.name as resource, r.type as type, r.publicIPAddress as ip
            """)

            for record in result:
                findings.append(\\\\{
                    'timestamp': datetime.utcnow().isoformat(),
                    'source': 'Stormspotter',
                    'type': 'ExposedResource',
                    'severity': 'High',
                    'resource': record['resource'],
                    'resource_type': record['type'],
                    'public_ip': record['ip'],
                    'description': f"Resource \\\\{record['resource']\\\\} is exposed to internet"
                \\\\})

            # Send to SIEM
            self.send_to_siem(findings)

    finally:
        driver.close()

def send_to_siem(self, findings):
    """Send findings to SIEM platform"""
    headers = \\\\{
        'Authorization': f'Bearer \\\\{self.api_key\\\\}',
        'Content-Type': 'application/json'
    \\\\}

    for finding in findings:
        try:
            response = requests.post(
                f"\\\\{self.siem_endpoint\\\\}/api/events",
                headers=headers,
                json=finding,
                timeout=30
            )

            if response.status_code == 200:
                print(f"[+] Sent finding: \\\\{finding['type']\\\\}")
            else:
                print(f"[-] Failed to send finding: \\\\{response.status_code\\\\}")

        except Exception as e:
            print(f"[-] Error sending finding: \\\\{e\\\\}")

Usage

siem = StormspotterSIEMIntegration("https://your-siem.com", "your-api-key") siem.export_findings_to_siem("bolt://localhost:7687", "neo4j", "password") ```_

Fehlerbehebung

Gemeinsame Themen

Authentication Probleme

```bash

Check Azure CLI authentication

az account show

Re-authenticate if needed

az login --tenant "tenant-id"

Verify service principal

az ad sp show --id "client-id"

Test permissions

az role assignment list --assignee "client-id" ```_

Neo4j Verbindungsprobleme

```bash

Check Neo4j status

sudo systemctl status neo4j

Check Neo4j logs

sudo journalctl -u neo4j -f

Test connection

cypher-shell -u neo4j -p password

Reset Neo4j password

sudo neo4j-admin set-initial-password newpassword ```_

Sammlungsfehler

```bash

Enable debug logging

export STORMSPOTTER_LOG_LEVEL=DEBUG

Check API rate limits

az rest --method get --url "https://management.azure.com/subscriptions/your-sub-id/providers/Microsoft.Resources/resources?api-version=2021-04-01" --query "value[0]"

Verify permissions

az role assignment list --scope "/subscriptions/your-sub-id" --assignee "your-principal-id"

Test specific resource collection

python3 stormspotter.py collect --resource-types "VirtualMachines" --verbose ```_

Leistungsfragen

```bash

Increase Neo4j memory

sudo nano /etc/neo4j/neo4j.conf

Add/modify these settings:

dbms.memory.heap.initial_size=2G dbms.memory.heap.max_size=4G dbms.memory.pagecache.size=1G

Restart Neo4j

sudo systemctl restart neo4j

Optimize collection

python3 stormspotter.py collect --batch-size 100 --parallel-workers 5 ```_

Datenvalidierung

```python

!/usr/bin/env python3

def validate_stormspotter_data(): """Validate collected Stormspotter data""" from neo4j import GraphDatabase

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

try:
    with driver.session() as session:
        # Check data completeness
        checks = \\\\{
            'users': session.run("MATCH (u:AzureADUser) RETURN count(u) as count").single()['count'],
            'resources': session.run("MATCH (r:Resource) RETURN count(r) as count").single()['count'],
            'roles': session.run("MATCH (r:AzureADRole) RETURN count(r) as count").single()['count'],
            'relationships': session.run("MATCH ()-[r]->() RETURN count(r) as count").single()['count']
        \\\\}

        print("=== STORMSPOTTER DATA VALIDATION ===")
        for check_type, count in checks.items():
            status = "✓" if count > 0 else "✗"
            print(f"\\\\{status\\\\} \\\\{check_type.capitalize()\\\\}: \\\\{count\\\\}")

        # Check for orphaned nodes
        orphaned = session.run("""
            MATCH (n)
            WHERE NOT (n)--()
            RETURN labels(n) as labels, count(n) as count
        """)

        print("\n=== ORPHANED NODES ===")
        for record in orphaned:
            print(f"- \\\\{record['labels']\\\\}: \\\\{record['count']\\\\}")

        return checks

finally:
    driver.close()

Run validation

validate_stormspotter_data() ```_

Ressourcen

--

*Dieses Betrügereiblatt bietet eine umfassende Referenz für die Verwendung von Stormspotter für die Sicherheitsbewertung von Azure. Stellen Sie immer sicher, dass Sie eine richtige Berechtigung haben, bevor Sie dieses Tool in jeder Umgebung verwenden. *