Aller au contenu

Feuille de chaleur Stormspotter

=" Copier toutes les commandes Générer PDF

Aperçu général

Stormspotter est un outil Azure Red Team pour le graphique des objets Azure et Azure AD. Initialement développé par Azure Red Team de Microsoft, Stormspotter crée une carte de surface d'attaque complète des environnements Azure en recueillant et en visualisant les relations entre les ressources Azure, les identités et les permissions. Il aide les professionnels de la sécurité à identifier les voies d'attaque potentielles et les possibilités d'escalade de privilèges dans les environnements Azure.

C'est pas vrai. Attention : N'utilisez Stormspotter que dans les environnements que vous possédez ou avez la permission explicite de tester. Une utilisation non autorisée peut violer les conditions de service ou les lois locales.

Installation

Préalables

# Install Python 3.7+
python3 --version

# Install pip
sudo apt update
sudo apt install python3-pip

# Install Git
sudo apt install git

# Install Neo4j (required for graph database)
wget -O - https://debian.neo4j.com/neotechnology.gpg.key|sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4'|sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j

# Install Java (required for Neo4j)
sudo apt install openjdk-11-jdk

Clone et installe Stormspotter

# Clone the repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter

# Install Python dependencies
pip3 install -r requirements.txt

# Alternative: Install with pipenv
pip3 install pipenv
pipenv install
pipenv shell
```_

### Installation Docker
```bash
# Clone repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter

# Build Docker containers
docker-compose build

# Start services
docker-compose up -d

# Check status
docker-compose ps
```_

### Configuration manuelle Neo4j
```bash
# Start Neo4j service
sudo systemctl start neo4j
sudo systemctl enable neo4j

# Check Neo4j status
sudo systemctl status neo4j

# Access Neo4j browser (default: http://localhost:7474)
# Default credentials: neo4j/neo4j (change on first login)

# Configure Neo4j for Stormspotter
sudo nano /etc/neo4j/neo4j.conf

# Uncomment and modify these lines:
# dbms.default_listen_address=0.0.0.0
# dbms.connector.bolt.listen_address=0.0.0.0:7687
# dbms.connector.http.listen_address=0.0.0.0:7474

# Restart Neo4j
sudo systemctl restart neo4j

Configuration

Authentification d'azur Configuration

# Install Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb|sudo bash

# Login to Azure
az login

# List available subscriptions
az account list --output table

# Set specific subscription
az account set --subscription "subscription-id"

# Verify current context
az account show

Authentification principale du service

# Create service principal for Stormspotter
az ad sp create-for-rbac --name "Stormspotter-SP" --role "Reader" --scopes "/subscriptions/your-subscription-id"

# Note the output:
# \\\\{
#   "appId": "app-id",
#   "displayName": "Stormspotter-SP",
#   "name": "app-id",
#   "password": "password",
#   "tenant": "tenant-id"
# \\\\}

# Set environment variables
export AZURE_CLIENT_ID="app-id"
export AZURE_CLIENT_SECRET="password"
export AZURE_TENANT_ID="tenant-id"
export AZURE_SUBSCRIPTION_ID="subscription-id"

Configuration de Stormspotter

# Create configuration file
cd Stormspotter
cp config/config.json.example config/config.json

# Edit configuration
nano config/config.json
\\\\{
  "neo4j": \\\\{
    "uri": "bolt://localhost:7687",
    "username": "neo4j",
    "password": "your-neo4j-password"
  \\\\},
  "azure": \\\\{
    "tenant_id": "your-tenant-id",
    "client_id": "your-client-id",
    "client_secret": "your-client-secret",
    "subscription_id": "your-subscription-id"
  \\\\},
  "logging": \\\\{
    "level": "INFO",
    "file": "logs/stormspotter.log"
  \\\\}
\\\\}

Utilisation de base

Collecte de données

# Basic collection from current subscription
python3 stormspotter.py collect

# Collect from specific subscription
python3 stormspotter.py collect --subscription-id "subscription-id"

# Collect from specific tenant
python3 stormspotter.py collect --tenant-id "tenant-id"

# Collect with specific credentials
python3 stormspotter.py collect \
  --client-id "app-id" \
  --client-secret "password" \
  --tenant-id "tenant-id"

# Verbose collection
python3 stormspotter.py collect --verbose

# Collect specific resource types
python3 stormspotter.py collect --resource-types "VirtualMachines,StorageAccounts"

Importation de données à Neo4j

# Import collected data to Neo4j
python3 stormspotter.py import --data-file "output/azure_data.json"

# Import with custom Neo4j connection
python3 stormspotter.py import \
  --neo4j-uri "bolt://localhost:7687" \
  --neo4j-user "neo4j" \
  --neo4j-password "password"

# Clear existing data before import
python3 stormspotter.py import --clear-database

# Import multiple files
python3 stormspotter.py import --data-dir "output/"

Interface Web

# Start Stormspotter web interface
python3 stormspotter.py web

# Start on specific port
python3 stormspotter.py web --port 8080

# Start with custom host
python3 stormspotter.py web --host 0.0.0.0 --port 8080

# Access web interface
# Default: http://localhost:5000

Collection avancée

Collection multi-tendants

# Collect from multiple tenants
python3 stormspotter.py collect-multi \
  --tenants "tenant1-id,tenant2-id,tenant3-id" \
  --output-dir "multi-tenant-data"

# Collect with different credentials per tenant
python3 stormspotter.py collect-multi \
  --config-file "multi-tenant-config.json"

Collection complète

# Collect all available data
python3 stormspotter.py collect \
  --comprehensive \
  --include-rbac \
  --include-resources \
  --include-identities \
  --include-policies

# Collect with custom scope
python3 stormspotter.py collect \
  --scope "management-groups,subscriptions,resource-groups" \
  --depth 5

# Collect with filters
python3 stormspotter.py collect \
  --exclude-resource-types "NetworkSecurityGroups" \
  --include-only-active-resources

Collecte prévue

# Create collection script
cat > collect_azure.sh << 'EOF'
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
OUTPUT_DIR="/opt/stormspotter/data/$DATE"

mkdir -p "$OUTPUT_DIR"

python3 /opt/stormspotter/stormspotter.py collect \
  --output-dir "$OUTPUT_DIR" \
  --comprehensive \
  --verbose

# Import to Neo4j
python3 /opt/stormspotter/stormspotter.py import \
  --data-dir "$OUTPUT_DIR" \
  --clear-database

echo "Collection completed: $DATE"
EOF

chmod +x collect_azure.sh

# Schedule with cron (daily at 2 AM)
echo "0 2 * * * /opt/stormspotter/collect_azure.sh"|crontab -

Analyse des données

Neo4j Enquêtes Cypher

-- Find all Azure AD users
MATCH (u:AzureADUser)
RETURN u.displayName, u.userPrincipalName, u.enabled

-- Find privileged users
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName, r.displayName

-- Find virtual machines and their permissions
MATCH (vm:VirtualMachine)
OPTIONAL MATCH (vm)<-[:HAS_PERMISSION]-(p:Principal)
RETURN vm.name, vm.location, collect(p.displayName) as permissions

-- Find storage accounts with public access
MATCH (sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN sa.name, sa.resourceGroup, sa.location

-- Find attack paths to high-value resources
MATCH path = (u:AzureADUser)-[*1..5]->(vm:VirtualMachine)
WHERE vm.name CONTAINS "prod" OR vm.name CONTAINS "critical"
RETURN path

-- Find users with multiple high-privilege roles
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName IN ["Global Administrator", "Privileged Role Administrator", "Security Administrator"]
WITH u, collect(r.displayName) as roles
WHERE size(roles) > 1
RETURN u.displayName, roles

-- Find resources accessible from internet
MATCH (r:Resource)-[:ALLOWS_ACCESS]->(nsg:NetworkSecurityGroup)
WHERE nsg.rules CONTAINS "0.0.0.0/0"
RETURN r.name, r.type, r.location

Scripts d'analyse de Python

#!/usr/bin/env python3
import json
from neo4j import GraphDatabase

class StormspotterAnalyzer:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def find_privileged_users(self):
        """Find users with administrative privileges"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                WHERE r.displayName CONTAINS "Admin"
                RETURN u.displayName as user, collect(r.displayName) as roles
            """)

            privileged_users = []
            for record in result:
                privileged_users.append(\\\\{
                    'user': record['user'],
                    'roles': record['roles']
                \\\\})

            return privileged_users

    def find_attack_paths(self, target_resource_type="VirtualMachine"):
        """Find potential attack paths to target resources"""
        with self.driver.session() as session:
            result = session.run(f"""
                MATCH path = (u:AzureADUser)-[*1..5]->(r:\\\\{target_resource_type\\\\})
                RETURN path, length(path) as path_length
                ORDER BY path_length
                LIMIT 50
            """)

            attack_paths = []
            for record in result:
                attack_paths.append(\\\\{
                    'path': record['path'],
                    'length': record['path_length']
                \\\\})

            return attack_paths

    def find_exposed_resources(self):
        """Find resources with potential internet exposure"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH (r:Resource)
                WHERE r.publicIPAddress IS NOT NULL
                   OR r.allowBlobPublicAccess = true
                   OR r.publicNetworkAccess = "Enabled"
                RETURN r.name, r.type, r.location, r.resourceGroup
            """)

            exposed_resources = []
            for record in result:
                exposed_resources.append(\\\\{
                    'name': record['r.name'],
                    'type': record['r.type'],
                    'location': record['r.location'],
                    'resource_group': record['r.resourceGroup']
                \\\\})

            return exposed_resources

    def generate_report(self):
        """Generate comprehensive security report"""
        report = \\\\{
            'privileged_users': self.find_privileged_users(),
            'attack_paths': self.find_attack_paths(),
            'exposed_resources': self.find_exposed_resources()
        \\\\}

        return report

# Usage
if __name__ == "__main__":
    analyzer = StormspotterAnalyzer("bolt://localhost:7687", "neo4j", "password")

    try:
        report = analyzer.generate_report()

        print("=== STORMSPOTTER SECURITY ANALYSIS ===")
        print(f"Privileged Users: \\\\{len(report['privileged_users'])\\\\}")
        print(f"Attack Paths Found: \\\\{len(report['attack_paths'])\\\\}")
        print(f"Exposed Resources: \\\\{len(report['exposed_resources'])\\\\}")

        # Save detailed report
        with open('stormspotter_analysis.json', 'w') as f:
            json.dump(report, f, indent=2, default=str)

        print("Detailed report saved to stormspotter_analysis.json")

    finally:
        analyzer.close()

Analyse du chemin d'attaque

Scénarios d'attaque communs

-- Scenario 1: User to VM via role assignments
MATCH path = (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)-[:APPLIES_TO]->(rg:ResourceGroup)-[:CONTAINS]->(vm:VirtualMachine)
WHERE r.displayName IN ["Contributor", "Owner", "Virtual Machine Contributor"]
RETURN path

-- Scenario 2: Service Principal privilege escalation
MATCH path = (sp:ServicePrincipal)-[:HAS_PERMISSION]->(sub:Subscription)
WHERE sp.appRoles CONTAINS "Application.ReadWrite.All"
RETURN path

-- Scenario 3: Storage account access via managed identity
MATCH path = (mi:ManagedIdentity)-[:HAS_ACCESS]->(sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN path

-- Scenario 4: Cross-tenant access
MATCH path = (u:AzureADUser)-[:GUEST_IN]->(t:Tenant)-[:CONTAINS]->(r:Resource)
RETURN path

-- Scenario 5: Conditional access bypass
MATCH (u:AzureADUser)-[:SUBJECT_TO]->(ca:ConditionalAccessPolicy)
WHERE ca.state = "disabled" OR ca.conditions CONTAINS "trusted"
RETURN u, ca

Demandes d'évaluation des risques

-- High-risk users (multiple admin roles)
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
WITH u, collect(r.displayName) as adminRoles
WHERE size(adminRoles) >= 2
RETURN u.displayName, adminRoles, size(adminRoles) as riskScore
ORDER BY riskScore DESC

-- Overprivileged service principals
MATCH (sp:ServicePrincipal)-[:HAS_PERMISSION]->(scope)
WITH sp, collect(scope) as scopes
WHERE size(scopes) > 10
RETURN sp.displayName, size(scopes) as permissionCount
ORDER BY permissionCount DESC

-- Resources without proper RBAC
MATCH (r:Resource)
WHERE NOT (r)<-[:HAS_PERMISSION]-(:Principal)
RETURN r.name, r.type, r.resourceGroup

-- Stale accounts with access
MATCH (u:AzureADUser)-[:HAS_ACCESS]->(r:Resource)
WHERE u.lastSignInDateTime < datetime() - duration(\\\\{days: 90\\\\})
RETURN u.displayName, u.lastSignInDateTime, r.name

Automatisation et intégration

Intégration PowerShell

# Stormspotter PowerShell wrapper
function Invoke-StormspotterCollection \\\\{
    param(
        [string]$SubscriptionId,
        [string]$TenantId,
        [string]$OutputPath = "C:\StormspotterData",
        [switch]$Comprehensive
    )

    # Ensure output directory exists
    if (!(Test-Path $OutputPath)) \\\\{
        New-Item -ItemType Directory -Path $OutputPath -Force
    \\\\}

    # Set timestamp
    $timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
    $outputDir = Join-Path $OutputPath $timestamp

    # Build command
    $cmd = "python3 stormspotter.py collect"

    if ($SubscriptionId) \\\\{
        $cmd += " --subscription-id `"$SubscriptionId`""
    \\\\}

    if ($TenantId) \\\\{
        $cmd += " --tenant-id `"$TenantId`""
    \\\\}

    if ($Comprehensive) \\\\{
        $cmd += " --comprehensive"
    \\\\}

    $cmd += " --output-dir `"$outputDir`""

    try \\\\{
        Write-Host "[+] Starting Stormspotter collection..."
        Invoke-Expression $cmd

        Write-Host "[+] Collection completed: $outputDir"

        # Import to Neo4j
        Write-Host "[+] Importing to Neo4j..."
        $importCmd = "python3 stormspotter.py import --data-dir `"$outputDir`""
        Invoke-Expression $importCmd

        Write-Host "[+] Import completed successfully"

        return $outputDir

    \\\\} catch \\\\{
        Write-Error "[-] Collection failed: $($_.Exception.Message)"
        return $null
    \\\\}
\\\\}

# Usage
$result = Invoke-StormspotterCollection -SubscriptionId "your-sub-id" -Comprehensive

SIEM Intégration

#!/usr/bin/env python3
import json
import requests
from datetime import datetime

class StormspotterSIEMIntegration:
    def __init__(self, siem_endpoint, api_key):
        self.siem_endpoint = siem_endpoint
        self.api_key = api_key

    def export_findings_to_siem(self, neo4j_uri, neo4j_user, neo4j_password):
        """Export Stormspotter findings to SIEM"""
        from neo4j import GraphDatabase

        driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

        try:
            with driver.session() as session:
                # Query for security findings
                findings = []

                # High-privilege users
                result = session.run("""
                    MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                    WHERE r.displayName CONTAINS "Admin"
                    RETURN u.displayName as user, r.displayName as role
                """)

                for record in result:
                    findings.append(\\\\{
                        'timestamp': datetime.utcnow().isoformat(),
                        'source': 'Stormspotter',
                        'type': 'PrivilegedUser',
                        'severity': 'Medium',
                        'user': record['user'],
                        'role': record['role'],
                        'description': f"User \\\\{record['user']\\\\} has administrative role \\\\{record['role']\\\\}"
                    \\\\})

                # Exposed resources
                result = session.run("""
                    MATCH (r:Resource)
                    WHERE r.publicIPAddress IS NOT NULL
                    RETURN r.name as resource, r.type as type, r.publicIPAddress as ip
                """)

                for record in result:
                    findings.append(\\\\{
                        'timestamp': datetime.utcnow().isoformat(),
                        'source': 'Stormspotter',
                        'type': 'ExposedResource',
                        'severity': 'High',
                        'resource': record['resource'],
                        'resource_type': record['type'],
                        'public_ip': record['ip'],
                        'description': f"Resource \\\\{record['resource']\\\\} is exposed to internet"
                    \\\\})

                # Send to SIEM
                self.send_to_siem(findings)

        finally:
            driver.close()

    def send_to_siem(self, findings):
        """Send findings to SIEM platform"""
        headers = \\\\{
            'Authorization': f'Bearer \\\\{self.api_key\\\\}',
            'Content-Type': 'application/json'
        \\\\}

        for finding in findings:
            try:
                response = requests.post(
                    f"\\\\{self.siem_endpoint\\\\}/api/events",
                    headers=headers,
                    json=finding,
                    timeout=30
                )

                if response.status_code == 200:
                    print(f"[+] Sent finding: \\\\{finding['type']\\\\}")
                else:
                    print(f"[-] Failed to send finding: \\\\{response.status_code\\\\}")

            except Exception as e:
                print(f"[-] Error sending finding: \\\\{e\\\\}")

# Usage
siem = StormspotterSIEMIntegration("https://your-siem.com", "your-api-key")
siem.export_findings_to_siem("bolt://localhost:7687", "neo4j", "password")

Dépannage

Questions communes

Problèmes d'authentification

# Check Azure CLI authentication
az account show

# Re-authenticate if needed
az login --tenant "tenant-id"

# Verify service principal
az ad sp show --id "client-id"

# Test permissions
az role assignment list --assignee "client-id"

Problèmes de connexion Neo4j

# Check Neo4j status
sudo systemctl status neo4j

# Check Neo4j logs
sudo journalctl -u neo4j -f

# Test connection
cypher-shell -u neo4j -p password

# Reset Neo4j password
sudo neo4j-admin set-initial-password newpassword

Défauts de recouvrement

# Enable debug logging
export STORMSPOTTER_LOG_LEVEL=DEBUG

# Check API rate limits
az rest --method get --url "https://management.azure.com/subscriptions/your-sub-id/providers/Microsoft.Resources/resources?api-version=2021-04-01" --query "value[0]"

# Verify permissions
az role assignment list --scope "/subscriptions/your-sub-id" --assignee "your-principal-id"

# Test specific resource collection
python3 stormspotter.py collect --resource-types "VirtualMachines" --verbose

Problèmes de performance

# Increase Neo4j memory
sudo nano /etc/neo4j/neo4j.conf

# Add/modify these settings:
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=4G
dbms.memory.pagecache.size=1G

# Restart Neo4j
sudo systemctl restart neo4j

# Optimize collection
python3 stormspotter.py collect --batch-size 100 --parallel-workers 5

Validation des données

#!/usr/bin/env python3
def validate_stormspotter_data():
    """Validate collected Stormspotter data"""
    from neo4j import GraphDatabase

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

    try:
        with driver.session() as session:
            # Check data completeness
            checks = \\\\{
                'users': session.run("MATCH (u:AzureADUser) RETURN count(u) as count").single()['count'],
                'resources': session.run("MATCH (r:Resource) RETURN count(r) as count").single()['count'],
                'roles': session.run("MATCH (r:AzureADRole) RETURN count(r) as count").single()['count'],
                'relationships': session.run("MATCH ()-[r]->() RETURN count(r) as count").single()['count']
            \\\\}

            print("=== STORMSPOTTER DATA VALIDATION ===")
            for check_type, count in checks.items():
                status = "✓" if count > 0 else "✗"
                print(f"\\\\{status\\\\} \\\\{check_type.capitalize()\\\\}: \\\\{count\\\\}")

            # Check for orphaned nodes
            orphaned = session.run("""
                MATCH (n)
                WHERE NOT (n)--()
                RETURN labels(n) as labels, count(n) as count
            """)

            print("\n=== ORPHANED NODES ===")
            for record in orphaned:
                print(f"- \\\\{record['labels']\\\\}: \\\\{record['count']\\\\}")

            return checks

    finally:
        driver.close()

# Run validation
validate_stormspotter_data()

Ressources

  • [Résistoire officiel des points de tempête] (LINK_7)
  • [Documentation de l'équipe rouge de l'Azure] (LINK_7)
  • [Documentation Cyphère Neo4j] (LINK_7)
  • [Documentation Azure RBAC] (LINK_7)
  • Meilleures pratiques de sécurité de l'Azure
  • [BloodHound pour Azure AD] (LINK_7)
  • [Pistes d'attaque Azure] (LINK_7)

*Cette feuille de triche fournit une référence complète pour l'utilisation de Stormspotter pour l'évaluation de sécurité Azure. Assurez-vous toujours d'avoir une autorisation appropriée avant d'utiliser cet outil dans n'importe quel environnement. *