コンテンツにスキップ

Stormspotter チートシート

概要

Stormspotterは、AzureとAzure ADオブジェクトをグラフ化するAzure Red Teamツールです。もともとMicrosoftのAzure Red Teamによって開発されたStormspotterは、Azureリソース、ID、および権限間の関係を収集および視覚化することで、Azureの包括的な攻撃対象領域マップを作成します。セキュリティ専門家がAzure環境における潜在的な攻撃パスや権限昇格の機会を特定するのに役立ちます。

⚠️ 警告: Stormspotterは、所有または明示的な許可を得た環境でのみ使用してください。不正な使用は、サービス利用規約や現地の法律に違反する可能性があります。

Would you like me to continue with the remaining sections once they are populated?```bash

Install Python 3.7+

python3 —version

Install pip

sudo apt update sudo apt install python3-pip

Install Git

sudo apt install git

Install Neo4j (required for graph database)

wget -O - https://debian.neo4j.com/neotechnology.gpg.key|sudo apt-key add - echo ‘deb https://debian.neo4j.com stable 4.4’|sudo tee /etc/apt/sources.list.d/neo4j.list sudo apt update sudo apt install neo4j

Install Java (required for Neo4j)

sudo apt install openjdk-11-jdk


### Clone and Install Stormspotter
```bash
# Clone the repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter

# Install Python dependencies
pip3 install -r requirements.txt

# Alternative: Install with pipenv
pip3 install pipenv
pipenv install
pipenv shell

Docker Installation

# Clone repository
git clone https://github.com/Azure/Stormspotter.git
cd Stormspotter

# Build Docker containers
docker-compose build

# Start services
docker-compose up -d

# Check status
docker-compose ps

Manual Neo4j Setup

# Start Neo4j service
sudo systemctl start neo4j
sudo systemctl enable neo4j

# Check Neo4j status
sudo systemctl status neo4j

# Access Neo4j browser (default: http://localhost:7474)
# Default credentials: neo4j/neo4j (change on first login)

# Configure Neo4j for Stormspotter
sudo nano /etc/neo4j/neo4j.conf

# Uncomment and modify these lines:
# dbms.default_listen_address=0.0.0.0
# dbms.connector.bolt.listen_address=0.0.0.0:7687
# dbms.connector.http.listen_address=0.0.0.0:7474

# Restart Neo4j
sudo systemctl restart neo4j

Configuration

Azure Authentication Setup

# Install Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb|sudo bash

# Login to Azure
az login

# List available subscriptions
az account list --output table

# Set specific subscription
az account set --subscription "subscription-id"

# Verify current context
az account show

Service Principal Authentication

# Create service principal for Stormspotter
az ad sp create-for-rbac --name "Stormspotter-SP" --role "Reader" --scopes "/subscriptions/your-subscription-id"

# Note the output:
# \\\\{
#   "appId": "app-id",
#   "displayName": "Stormspotter-SP",
#   "name": "app-id",
#   "password": "password",
#   "tenant": "tenant-id"
# \\\\}

# Set environment variables
export AZURE_CLIENT_ID="app-id"
export AZURE_CLIENT_SECRET="password"
export AZURE_TENANT_ID="tenant-id"
export AZURE_SUBSCRIPTION_ID="subscription-id"

Stormspotter Configuration

# Create configuration file
cd Stormspotter
cp config/config.json.example config/config.json

# Edit configuration
nano config/config.json
\\\\{
  "neo4j": \\\\{
    "uri": "bolt://localhost:7687",
    "username": "neo4j",
    "password": "your-neo4j-password"
  \\\\},
  "azure": \\\\{
    "tenant_id": "your-tenant-id",
    "client_id": "your-client-id",
    "client_secret": "your-client-secret",
    "subscription_id": "your-subscription-id"
  \\\\},
  "logging": \\\\{
    "level": "INFO",
    "file": "logs/stormspotter.log"
  \\\\}
\\\\}

Basic Usage

Data Collection

# Basic collection from current subscription
python3 stormspotter.py collect

# Collect from specific subscription
python3 stormspotter.py collect --subscription-id "subscription-id"

# Collect from specific tenant
python3 stormspotter.py collect --tenant-id "tenant-id"

# Collect with specific credentials
python3 stormspotter.py collect \
  --client-id "app-id" \
  --client-secret "password" \
  --tenant-id "tenant-id"

# Verbose collection
python3 stormspotter.py collect --verbose

# Collect specific resource types
python3 stormspotter.py collect --resource-types "VirtualMachines,StorageAccounts"

Data Import to Neo4j

# Import collected data to Neo4j
python3 stormspotter.py import --data-file "output/azure_data.json"

# Import with custom Neo4j connection
python3 stormspotter.py import \
  --neo4j-uri "bolt://localhost:7687" \
  --neo4j-user "neo4j" \
  --neo4j-password "password"

# Clear existing data before import
python3 stormspotter.py import --clear-database

# Import multiple files
python3 stormspotter.py import --data-dir "output/"

Web Interface

# Start Stormspotter web interface
python3 stormspotter.py web

# Start on specific port
python3 stormspotter.py web --port 8080

# Start with custom host
python3 stormspotter.py web --host 0.0.0.0 --port 8080

# Access web interface
# Default: http://localhost:5000

Advanced Collection

Multi-Tenant Collection

# Collect from multiple tenants
python3 stormspotter.py collect-multi \
  --tenants "tenant1-id,tenant2-id,tenant3-id" \
  --output-dir "multi-tenant-data"

# Collect with different credentials per tenant
python3 stormspotter.py collect-multi \
  --config-file "multi-tenant-config.json"

Comprehensive Collection

# Collect all available data
python3 stormspotter.py collect \
  --comprehensive \
  --include-rbac \
  --include-resources \
  --include-identities \
  --include-policies

# Collect with custom scope
python3 stormspotter.py collect \
  --scope "management-groups,subscriptions,resource-groups" \
  --depth 5

# Collect with filters
python3 stormspotter.py collect \
  --exclude-resource-types "NetworkSecurityGroups" \
  --include-only-active-resources

Scheduled Collection

# Create collection script
cat > collect_azure.sh << 'EOF'
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
OUTPUT_DIR="/opt/stormspotter/data/$DATE"

mkdir -p "$OUTPUT_DIR"

python3 /opt/stormspotter/stormspotter.py collect \
  --output-dir "$OUTPUT_DIR" \
  --comprehensive \
  --verbose

# Import to Neo4j
python3 /opt/stormspotter/stormspotter.py import \
  --data-dir "$OUTPUT_DIR" \
  --clear-database

echo "Collection completed: $DATE"
EOF

chmod +x collect_azure.sh

# Schedule with cron (daily at 2 AM)
echo "0 2 * * * /opt/stormspotter/collect_azure.sh"|crontab -

Data Analysis

Neo4j Cypher Queries

-- Find all Azure AD users
MATCH (u:AzureADUser)
RETURN u.displayName, u.userPrincipalName, u.enabled

-- Find privileged users
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
RETURN u.displayName, r.displayName

-- Find virtual machines and their permissions
MATCH (vm:VirtualMachine)
OPTIONAL MATCH (vm)<-[:HAS_PERMISSION]-(p:Principal)
RETURN vm.name, vm.location, collect(p.displayName) as permissions

-- Find storage accounts with public access
MATCH (sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN sa.name, sa.resourceGroup, sa.location

-- Find attack paths to high-value resources
MATCH path = (u:AzureADUser)-[*1..5]->(vm:VirtualMachine)
WHERE vm.name CONTAINS "prod" OR vm.name CONTAINS "critical"
RETURN path

-- Find users with multiple high-privilege roles
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName IN ["Global Administrator", "Privileged Role Administrator", "Security Administrator"]
WITH u, collect(r.displayName) as roles
WHERE size(roles) > 1
RETURN u.displayName, roles

-- Find resources accessible from internet
MATCH (r:Resource)-[:ALLOWS_ACCESS]->(nsg:NetworkSecurityGroup)
WHERE nsg.rules CONTAINS "0.0.0.0/0"
RETURN r.name, r.type, r.location

Python Analysis Scripts

#!/usr/bin/env python3
import json
from neo4j import GraphDatabase

class StormspotterAnalyzer:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def find_privileged_users(self):
        """Find users with administrative privileges"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                WHERE r.displayName CONTAINS "Admin"
                RETURN u.displayName as user, collect(r.displayName) as roles
            """)

            privileged_users = []
            for record in result:
                privileged_users.append(\\\\{
                    'user': record['user'],
                    'roles': record['roles']
                \\\\})

            return privileged_users

    def find_attack_paths(self, target_resource_type="VirtualMachine"):
        """Find potential attack paths to target resources"""
        with self.driver.session() as session:
            result = session.run(f"""
                MATCH path = (u:AzureADUser)-[*1..5]->(r:\\\\{target_resource_type\\\\})
                RETURN path, length(path) as path_length
                ORDER BY path_length
                LIMIT 50
            """)

            attack_paths = []
            for record in result:
                attack_paths.append(\\\\{
                    'path': record['path'],
                    'length': record['path_length']
                \\\\})

            return attack_paths

    def find_exposed_resources(self):
        """Find resources with potential internet exposure"""
        with self.driver.session() as session:
            result = session.run("""
                MATCH (r:Resource)
                WHERE r.publicIPAddress IS NOT NULL
                   OR r.allowBlobPublicAccess = true
                   OR r.publicNetworkAccess = "Enabled"
                RETURN r.name, r.type, r.location, r.resourceGroup
            """)

            exposed_resources = []
            for record in result:
                exposed_resources.append(\\\\{
                    'name': record['r.name'],
                    'type': record['r.type'],
                    'location': record['r.location'],
                    'resource_group': record['r.resourceGroup']
                \\\\})

            return exposed_resources

    def generate_report(self):
        """Generate comprehensive security report"""
        report = \\\\{
            'privileged_users': self.find_privileged_users(),
            'attack_paths': self.find_attack_paths(),
            'exposed_resources': self.find_exposed_resources()
        \\\\}

        return report

# Usage
if __name__ == "__main__":
    analyzer = StormspotterAnalyzer("bolt://localhost:7687", "neo4j", "password")

    try:
        report = analyzer.generate_report()

        print("=== STORMSPOTTER SECURITY ANALYSIS ===")
        print(f"Privileged Users: \\\\{len(report['privileged_users'])\\\\}")
        print(f"Attack Paths Found: \\\\{len(report['attack_paths'])\\\\}")
        print(f"Exposed Resources: \\\\{len(report['exposed_resources'])\\\\}")

        # Save detailed report
        with open('stormspotter_analysis.json', 'w') as f:
            json.dump(report, f, indent=2, default=str)

        print("Detailed report saved to stormspotter_analysis.json")

    finally:
        analyzer.close()

Attack Path Analysis

Common Attack Scenarios

-- Scenario 1: User to VM via role assignments
MATCH path = (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)-[:APPLIES_TO]->(rg:ResourceGroup)-[:CONTAINS]->(vm:VirtualMachine)
WHERE r.displayName IN ["Contributor", "Owner", "Virtual Machine Contributor"]
RETURN path

-- Scenario 2: Service Principal privilege escalation
MATCH path = (sp:ServicePrincipal)-[:HAS_PERMISSION]->(sub:Subscription)
WHERE sp.appRoles CONTAINS "Application.ReadWrite.All"
RETURN path

-- Scenario 3: Storage account access via managed identity
MATCH path = (mi:ManagedIdentity)-[:HAS_ACCESS]->(sa:StorageAccount)
WHERE sa.allowBlobPublicAccess = true
RETURN path

-- Scenario 4: Cross-tenant access
MATCH path = (u:AzureADUser)-[:GUEST_IN]->(t:Tenant)-[:CONTAINS]->(r:Resource)
RETURN path

-- Scenario 5: Conditional access bypass
MATCH (u:AzureADUser)-[:SUBJECT_TO]->(ca:ConditionalAccessPolicy)
WHERE ca.state = "disabled" OR ca.conditions CONTAINS "trusted"
RETURN u, ca

Risk Assessment Queries

-- High-risk users (multiple admin roles)
MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
WHERE r.displayName CONTAINS "Admin"
WITH u, collect(r.displayName) as adminRoles
WHERE size(adminRoles) >= 2
RETURN u.displayName, adminRoles, size(adminRoles) as riskScore
ORDER BY riskScore DESC

-- Overprivileged service principals
MATCH (sp:ServicePrincipal)-[:HAS_PERMISSION]->(scope)
WITH sp, collect(scope) as scopes
WHERE size(scopes) > 10
RETURN sp.displayName, size(scopes) as permissionCount
ORDER BY permissionCount DESC

-- Resources without proper RBAC
MATCH (r:Resource)
WHERE NOT (r)<-[:HAS_PERMISSION]-(:Principal)
RETURN r.name, r.type, r.resourceGroup

-- Stale accounts with access
MATCH (u:AzureADUser)-[:HAS_ACCESS]->(r:Resource)
WHERE u.lastSignInDateTime < datetime() - duration(\\\\{days: 90\\\\})
RETURN u.displayName, u.lastSignInDateTime, r.name

Automation and Integration

PowerShell Integration

# Stormspotter PowerShell wrapper
function Invoke-StormspotterCollection \\\\{
    param(
        [string]$SubscriptionId,
        [string]$TenantId,
        [string]$OutputPath = "C:\StormspotterData",
        [switch]$Comprehensive
    )

    # Ensure output directory exists
    if (!(Test-Path $OutputPath)) \\\\{
        New-Item -ItemType Directory -Path $OutputPath -Force
    \\\\}

    # Set timestamp
    $timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
    $outputDir = Join-Path $OutputPath $timestamp

    # Build command
    $cmd = "python3 stormspotter.py collect"

    if ($SubscriptionId) \\\\{
        $cmd += " --subscription-id `"$SubscriptionId`""
    \\\\}

    if ($TenantId) \\\\{
        $cmd += " --tenant-id `"$TenantId`""
    \\\\}

    if ($Comprehensive) \\\\{
        $cmd += " --comprehensive"
    \\\\}

    $cmd += " --output-dir `"$outputDir`""

    try \\\\{
        Write-Host "[+] Starting Stormspotter collection..."
        Invoke-Expression $cmd

        Write-Host "[+] Collection completed: $outputDir"

        # Import to Neo4j
        Write-Host "[+] Importing to Neo4j..."
        $importCmd = "python3 stormspotter.py import --data-dir `"$outputDir`""
        Invoke-Expression $importCmd

        Write-Host "[+] Import completed successfully"

        return $outputDir

    \\\\} catch \\\\{
        Write-Error "[-] Collection failed: $($_.Exception.Message)"
        return $null
    \\\\}
\\\\}

# Usage
$result = Invoke-StormspotterCollection -SubscriptionId "your-sub-id" -Comprehensive

SIEM Integration

#!/usr/bin/env python3
import json
import requests
from datetime import datetime

class StormspotterSIEMIntegration:
    def __init__(self, siem_endpoint, api_key):
        self.siem_endpoint = siem_endpoint
        self.api_key = api_key

    def export_findings_to_siem(self, neo4j_uri, neo4j_user, neo4j_password):
        """Export Stormspotter findings to SIEM"""
        from neo4j import GraphDatabase

        driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

        try:
            with driver.session() as session:
                # Query for security findings
                findings = []

                # High-privilege users
                result = session.run("""
                    MATCH (u:AzureADUser)-[:HAS_ROLE]->(r:AzureADRole)
                    WHERE r.displayName CONTAINS "Admin"
                    RETURN u.displayName as user, r.displayName as role
                """)

                for record in result:
                    findings.append(\\\\{
                        'timestamp': datetime.utcnow().isoformat(),
                        'source': 'Stormspotter',
                        'type': 'PrivilegedUser',
                        'severity': 'Medium',
                        'user': record['user'],
                        'role': record['role'],
                        'description': f"User \\\\{record['user']\\\\} has administrative role \\\\{record['role']\\\\}"
                    \\\\})

                # Exposed resources
                result = session.run("""
                    MATCH (r:Resource)
                    WHERE r.publicIPAddress IS NOT NULL
                    RETURN r.name as resource, r.type as type, r.publicIPAddress as ip
                """)

                for record in result:
                    findings.append(\\\\{
                        'timestamp': datetime.utcnow().isoformat(),
                        'source': 'Stormspotter',
                        'type': 'ExposedResource',
                        'severity': 'High',
                        'resource': record['resource'],
                        'resource_type': record['type'],
                        'public_ip': record['ip'],
                        'description': f"Resource \\\\{record['resource']\\\\} is exposed to internet"
                    \\\\})

                # Send to SIEM
                self.send_to_siem(findings)

        finally:
            driver.close()

    def send_to_siem(self, findings):
        """Send findings to SIEM platform"""
        headers = \\\\{
            'Authorization': f'Bearer \\\\{self.api_key\\\\}',
            'Content-Type': 'application/json'
        \\\\}

        for finding in findings:
            try:
                response = requests.post(
                    f"\\\\{self.siem_endpoint\\\\}/api/events",
                    headers=headers,
                    json=finding,
                    timeout=30
                )

                if response.status_code == 200:
                    print(f"[+] Sent finding: \\\\{finding['type']\\\\}")
                else:
                    print(f"[-] Failed to send finding: \\\\{response.status_code\\\\}")

            except Exception as e:
                print(f"[-] Error sending finding: \\\\{e\\\\}")

# Usage
siem = StormspotterSIEMIntegration("https://your-siem.com", "your-api-key")
siem.export_findings_to_siem("bolt://localhost:7687", "neo4j", "password")
```## トラブルシューティング
```bash
# Check Azure CLI authentication
az account show

# Re-authenticate if needed
az login --tenant "tenant-id"

# Verify service principal
az ad sp show --id "client-id"

# Test permissions
az role assignment list --assignee "client-id"
```#### Neo4j 接続の問題
```bash
# Check Neo4j status
sudo systemctl status neo4j

# Check Neo4j logs
sudo journalctl -u neo4j -f

# Test connection
cypher-shell -u neo4j -p password

# Reset Neo4j password
sudo neo4j-admin set-initial-password newpassword
```#### コレクション失敗
```bash
# Enable debug logging
export STORMSPOTTER_LOG_LEVEL=DEBUG

# Check API rate limits
az rest --method get --url "https://management.azure.com/subscriptions/your-sub-id/providers/Microsoft.Resources/resources?api-version=2021-04-01" --query "value[0]"

# Verify permissions
az role assignment list --scope "/subscriptions/your-sub-id" --assignee "your-principal-id"

# Test specific resource collection
python3 stormspotter.py collect --resource-types "VirtualMachines" --verbose
```#### パフォーマンスの問題
```bash
# Increase Neo4j memory
sudo nano /etc/neo4j/neo4j.conf

# Add/modify these settings:
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=4G
dbms.memory.pagecache.size=1G

# Restart Neo4j
sudo systemctl restart neo4j

# Optimize collection
python3 stormspotter.py collect --batch-size 100 --parallel-workers 5
```### データ検証
```python
#!/usr/bin/env python3
def validate_stormspotter_data():
    """Validate collected Stormspotter data"""
    from neo4j import GraphDatabase

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

    try:
        with driver.session() as session:
            # Check data completeness
            checks = \\\\{
                'users': session.run("MATCH (u:AzureADUser) RETURN count(u) as count").single()['count'],
                'resources': session.run("MATCH (r:Resource) RETURN count(r) as count").single()['count'],
                'roles': session.run("MATCH (r:AzureADRole) RETURN count(r) as count").single()['count'],
                'relationships': session.run("MATCH ()-[r]->() RETURN count(r) as count").single()['count']
            \\\\}

            print("=== STORMSPOTTER DATA VALIDATION ===")
            for check_type, count in checks.items():
                status = "✓" if count > 0 else "✗"
                print(f"\\\\{status\\\\} \\\\{check_type.capitalize()\\\\}: \\\\{count\\\\}")

            # Check for orphaned nodes
            orphaned = session.run("""
                MATCH (n)
                WHERE NOT (n)--()
                RETURN labels(n) as labels, count(n) as count
            """)

            print("\n=== ORPHANED NODES ===")
            for record in orphaned:
                print(f"- \\\\{record['labels']\\\\}: \\\\{record['count']\\\\}")

            return checks

    finally:
        driver.close()

# Run validation
validate_stormspotter_data()
```(No text to translate)
https://github.com/Azure/Stormspotter- [公式 Stormspotter リポジトリ](
https://docs.microsoft.com/en-us/azure/security/- [Azure Red Team ドキュメント](
https://neo4j.com/docs/cypher-manual/current/- [Neo4j Cypher ドキュメント](
https://docs.microsoft.com/en-us/azure/role-based-access-control/- [Azure RBAC ドキュメント](
https://docs.microsoft.com/en-us/azure/security/fundamentals/best-practices-and-patterns- [Azure セキュリティベストプラクティス](
https://github.com/BloodHoundAD/AzureHound- [Azure AD 用 BloodHound](
https://cloudbrothers.info/en/azure-attack-paths/- [Azure 攻撃パス](