Zum Inhalt

Über uns Cheat Sheet

generieren

Überblick

CloudMapper ist ein leistungsstarkes Open-Source-Tool, um Sicherheitsexperten und Cloud-Administratoren zu helfen, Amazon Web Services (AWS) Umgebungen für Sicherheitsbewertung und Compliance zu visualisieren und zu analysieren. Entwickelt von Duo Security (jetzt Teil von Cisco) bietet CloudMapper umfassende Fähigkeiten zur Kartierung von AWS-Infrastruktur, zur Erkennung von Sicherheitsfehlern und zur Erstellung detaillierter Berichte über Cloud-Ressourcenbeziehungen und potenzielle Sicherheitsrisiken. Das Tool zeichnet sich durch die Erstellung von visuellen Netzwerkdiagrammen aus, die die komplexen Zusammenhänge zwischen AWS-Diensten illustrieren, wodurch es einfacher wird, Angriffspfade, Datenflüsse und Sicherheitsgrenzen innerhalb von Cloud-Umgebungen zu verstehen.

Die Kernfunktionalität von CloudMapper dreht sich um seine Fähigkeit, detaillierte Informationen über AWS-Ressourcen über die AWS-API zu sammeln und diese Daten in sinnvolle Visualisierungen und Sicherheitsbewertungen zu transformieren. Das Tool kann Virtual Private Clouds (VPCs), Subnets, Sicherheitsgruppen, EC2-Instanzen, Load Balancer, Datenbanken und zahlreiche andere AWS-Dienste abbilden und bietet einen umfassenden Blick auf die Cloud-Infrastruktur. Die Stärke von CloudMapper liegt in der Fähigkeit, potenzielle Sicherheitsprobleme wie überfällige Sicherheitsgruppen, öffentlich zugängliche Ressourcen, unverschlüsselte Datenspeicher und komplexe Netzwerkkonfigurationen zu identifizieren, die Sicherheitslücken einführen könnten.

CloudMapper unterstützt mehrere Ausgabeformate einschließlich interaktiver HTML-Berichte, JSON-Datenexporte und grafischer Netzwerkdiagramme, wodurch es sowohl für die technische Analyse als auch für die Executive Reporting geeignet ist. Die modulare Architektur des Werkzeugs ermöglicht die Anpassung und Erweiterung, während seine Kommandozeilenschnittstelle die Integration in automatisierte Sicherheitsbewertungsabläufe und kontinuierliche Monitoring-Pipelines ermöglicht. Mit dem Fokus auf AWS Security Best Practices und Compliance Frameworks ist CloudMapper zu einem unverzichtbaren Werkzeug für Cloud Security Teams, Penetration Tester und Compliance Auditoren, die mit AWS-Umgebungen arbeiten.

Installation

Python Paket Installation

Cloud installieren Mapper mit pip:

```bash

Install CloudMapper

pip install cloudmapper

Alternative: Install from source

git clone https://github.com/duo-labs/cloudmapper.git cd cloudmapper pip install -r requirements.txt python setup.py install

Verify installation

cloudmapper --help

Install additional dependencies for visualization

pip install graphviz pip install pydot

Install AWS CLI (required for authentication)

pip install awscli

Configure AWS credentials

aws configure ```_

Docker Installation

```bash

Create CloudMapper Docker environment

cat > Dockerfile << 'EOF' FROM python:3.8-slim

Install system dependencies

RUN apt-get update && apt-get install -y \ git \ graphviz \ && rm -rf /var/lib/apt/lists/*

Install CloudMapper

RUN pip install cloudmapper

Create working directory

WORKDIR /cloudmapper

Copy AWS credentials (mount at runtime)

VOLUME ["/root/.aws"]

CMD ["cloudmapper", "--help"] EOF

Build container

docker build -t cloudmapper .

Run CloudMapper

docker run -it -v ~/.aws:/root/.aws -v $(pwd)/output:/cloudmapper/output cloudmapper

Example usage

docker run -it -v ~/.aws:/root/.aws cloudmapper cloudmapper collect --account-name myaccount ```_

Virtual Environment Setup

```bash

Create virtual environment

python3 -m venv cloudmapper-env source cloudmapper-env/bin/activate

Install CloudMapper

pip install cloudmapper

Install additional tools

pip install boto3 botocore

Create configuration directory

mkdir -p ~/.cloudmapper

Verify setup

cloudmapper --version ```_

AWS Berechtigungen Setup

```bash

Create IAM policy for CloudMapper

cat > cloudmapper-policy.json << 'EOF' \\{ "Version": "2012-10-17", "Statement": [ \\{ "Effect": "Allow", "Action": [ "ec2:Describe", "elasticloadbalancing:Describe", "rds:Describe", "s3:GetBucketLocation", "s3:GetBucketLogging", "s3:GetBucketPolicy", "s3:GetBucketVersioning", "s3:GetBucketWebsite", "s3:GetBucketAcl", "s3:ListAllMyBuckets", "cloudfront:ListDistributions", "cloudfront:GetDistribution", "route53:ListHostedZones", "route53:ListResourceRecordSets", "iam:GetAccountSummary", "iam:ListUsers", "iam:ListRoles", "iam:ListGroups", "iam:ListPolicies", "iam:GetRole", "iam:GetRolePolicy", "iam:ListRolePolicies", "iam:ListAttachedRolePolicies", "iam:GetPolicy", "iam:GetPolicyVersion", "lambda:ListFunctions", "lambda:GetFunction", "apigateway:GET" ], "Resource": "" \\} ] \\} EOF

Create IAM user and attach policy

aws iam create-user --user-name cloudmapper-user aws iam put-user-policy --user-name cloudmapper-user --policy-name CloudMapperPolicy --policy-document file://cloudmapper-policy.json

Create access keys

aws iam create-access-key --user-name cloudmapper-user ```_

Basisnutzung

Konfigurieren des Kontos

AWS-Konten für CloudMapper einrichten:

```bash

Configure AWS account

cloudmapper configure add-account --config-file config.json --name myaccount --id 123456789012

Alternative: Manual configuration

cat > config.json << 'EOF' \\{ "accounts": [ \\{ "id": "123456789012", "name": "production", "default": true \\}, \\{ "id": "123456789013", "name": "staging", "default": false \\} ] \\} EOF

List configured accounts

cloudmapper configure list-accounts --config config.json

Set default account

cloudmapper configure set-default --config config.json --name production ```_

Datenerhebung

Erfassung von AWS-Umgebungsdaten:

```bash

Collect data for default account

cloudmapper collect --config config.json

Collect data for specific account

cloudmapper collect --config config.json --account-name production

Collect data for specific regions

cloudmapper collect --config config.json --regions us-east-1,us-west-2

Collect with specific profile

cloudmapper collect --config config.json --profile myprofile

Collect with custom output directory

cloudmapper collect --config config.json --output-dir ./aws-data

Verbose collection

cloudmapper collect --config config.json --verbose

Collect specific services only

cloudmapper collect --config config.json --services ec2,s3,iam ```_

Basisvisualisierung

Erstellen grundlegende Netzwerkvisualisierungen:

```bash

Generate network diagram

cloudmapper visualize --config config.json --account-name production

Generate diagram for specific regions

cloudmapper visualize --config config.json --regions us-east-1

Generate simplified diagram

cloudmapper visualize --config config.json --simplified

Generate diagram with custom output

cloudmapper visualize --config config.json --output-file network-diagram.html

Generate PNG diagram

cloudmapper visualize --config config.json --output-format png

Generate with custom styling

cloudmapper visualize --config config.json --style-file custom-style.css ```_

Sicherheitsanalyse

Laufende Sicherheitsbewertungen:

```bash

Run security audit

cloudmapper audit --config config.json --account-name production

Run specific audit checks

cloudmapper audit --config config.json --checks security_groups,s3_buckets

Generate audit report

cloudmapper audit --config config.json --output-file security-audit.json

Run compliance checks

cloudmapper audit --config config.json --compliance-framework cis

Audit with custom severity

cloudmapper audit --config config.json --min-severity medium

Export audit results

cloudmapper audit --config config.json --format csv --output-file audit-results.csv ```_

Erweiterte Funktionen

Individuelle Visualisierung

Erstellung fortschrittlicher Netzwerkvisualisierungen:

```bash

Generate detailed network map

cloudmapper visualize --config config.json \ --account-name production \ --regions us-east-1,us-west-2 \ --include-services ec2,rds,elb,s3 \ --output-file detailed-network.html

Create VPC-focused diagram

cloudmapper visualize --config config.json \ --vpc-only \ --show-security-groups \ --show-nacls \ --output-file vpc-diagram.html

Generate public resource map

cloudmapper public --config config.json \ --account-name production \ --output-file public-resources.html

Create cross-region visualization

cloudmapper visualize --config config.json \ --all-regions \ --cross-region-connections \ --output-file global-network.html

Generate service dependency map

cloudmapper dependencies --config config.json \ --service ec2 \ --output-file ec2-dependencies.html ```_

Sicherheitsbewertung

Umfassende Sicherheitsanalyse:

```bash

Run comprehensive security audit

cloudmapper audit --config config.json \ --account-name production \ --all-checks \ --output-file comprehensive-audit.json

Check for public resources

cloudmapper public --config config.json \ --account-name production \ --services s3,ec2,rds,elb \ --output-file public-exposure.json

Analyze security groups

cloudmapper sg-audit --config config.json \ --account-name production \ --check-unused \ --check-overpermissive \ --output-file sg-audit.json

Check IAM permissions

cloudmapper iam-audit --config config.json \ --account-name production \ --check-admin-users \ --check-unused-roles \ --output-file iam-audit.json

Analyze network ACLs

cloudmapper nacl-audit --config config.json \ --account-name production \ --check-default-allow \ --output-file nacl-audit.json ```_

Multi-Account Analyse

Analyse mehrerer AWS-Konten:

```bash

Configure multiple accounts

cat > multi-account-config.json << 'EOF' \\{ "accounts": [ \\{ "id": "111111111111", "name": "production", "profile": "prod-profile" \\}, \\{ "id": "222222222222", "name": "staging", "profile": "staging-profile" \\}, \\{ "id": "333333333333", "name": "development", "profile": "dev-profile" \\} ] \\} EOF

Collect data from all accounts

for account in production staging development; do cloudmapper collect --config multi-account-config.json --account-name $account done

Generate cross-account visualization

cloudmapper visualize --config multi-account-config.json \ --all-accounts \ --cross-account-connections \ --output-file cross-account-network.html

Run security audit across all accounts

cloudmapper audit --config multi-account-config.json \ --all-accounts \ --output-file multi-account-audit.json

Compare security posture across accounts

cloudmapper compare --config multi-account-config.json \ --accounts production,staging,development \ --output-file account-comparison.html ```_

Berichterstattung

Benutzerdefinierte Berichte und Exporte erstellen:

```bash

Generate executive summary

cloudmapper report --config config.json \ --account-name production \ --template executive \ --output-file executive-summary.html

Create technical report

cloudmapper report --config config.json \ --account-name production \ --template technical \ --include-recommendations \ --output-file technical-report.html

Export raw data

cloudmapper export --config config.json \ --account-name production \ --format json \ --output-file raw-data.json

Generate CSV exports

cloudmapper export --config config.json \ --account-name production \ --format csv \ --services ec2,s3,rds \ --output-dir csv-exports/

Create compliance report

cloudmapper compliance --config config.json \ --account-name production \ --framework cis-aws \ --output-file cis-compliance.html ```_

Automatisierungsskripte

Automatisierte AWS Sicherheitsbewertung

```python

!/usr/bin/env python3

Automated AWS security assessment with CloudMapper

import subprocess import json import os import boto3 from datetime import datetime import logging

class CloudMapperAutomation: def init(self, config_file="config.json"): self.config_file = config_file self.output_dir = f"cloudmapper-output-\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\}" self.setup_logging()

    # Create output directory
    os.makedirs(self.output_dir, exist_ok=True)

def setup_logging(self):
    """Setup logging configuration"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(f'\\\\{self.output_dir\\\\}/cloudmapper.log'),
            logging.StreamHandler()
        ]
    )
    self.logger = logging.getLogger(__name__)

def load_config(self):
    """Load CloudMapper configuration"""
    try:
        with open(self.config_file, 'r') as f:
            config = json.load(f)
        return config
    except FileNotFoundError:
        self.logger.error(f"Configuration file \\\\{self.config_file\\\\} not found")
        return None
    except json.JSONDecodeError:
        self.logger.error(f"Invalid JSON in configuration file \\\\{self.config_file\\\\}")
        return None

def run_cloudmapper_command(self, command):
    """Execute CloudMapper command"""
    try:
        self.logger.info(f"Running command: \\\\{' '.join(command)\\\\}")
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            timeout=1800  # 30 minute timeout
        )

        if result.returncode == 0:
            self.logger.info("Command completed successfully")
            return True, result.stdout
        else:
            self.logger.error(f"Command failed: \\\\{result.stderr\\\\}")
            return False, result.stderr

    except subprocess.TimeoutExpired:
        self.logger.error("Command timed out")
        return False, "Command timed out"
    except Exception as e:
        self.logger.error(f"Error running command: \\\\{e\\\\}")
        return False, str(e)

def collect_account_data(self, account_name):
    """Collect data for specific account"""
    self.logger.info(f"Collecting data for account: \\\\{account_name\\\\}")

    command = [
        "cloudmapper", "collect",
        "--config", self.config_file,
        "--account-name", account_name
    ]

    success, output = self.run_cloudmapper_command(command)

    if success:
        self.logger.info(f"Data collection completed for \\\\{account_name\\\\}")
    else:
        self.logger.error(f"Data collection failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

    return success

def generate_visualization(self, account_name):
    """Generate network visualization"""
    self.logger.info(f"Generating visualization for account: \\\\{account_name\\\\}")

    output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-network.html")

    command = [
        "cloudmapper", "visualize",
        "--config", self.config_file,
        "--account-name", account_name,
        "--output-file", output_file
    ]

    success, output = self.run_cloudmapper_command(command)

    if success:
        self.logger.info(f"Visualization generated: \\\\{output_file\\\\}")
    else:
        self.logger.error(f"Visualization failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

    return success, output_file if success else None

def run_security_audit(self, account_name):
    """Run security audit"""
    self.logger.info(f"Running security audit for account: \\\\{account_name\\\\}")

    output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-audit.json")

    command = [
        "cloudmapper", "audit",
        "--config", self.config_file,
        "--account-name", account_name,
        "--output-file", output_file
    ]

    success, output = self.run_cloudmapper_command(command)

    if success:
        self.logger.info(f"Security audit completed: \\\\{output_file\\\\}")
        return success, output_file
    else:
        self.logger.error(f"Security audit failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
        return success, None

def analyze_public_resources(self, account_name):
    """Analyze public resources"""
    self.logger.info(f"Analyzing public resources for account: \\\\{account_name\\\\}")

    output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-public.html")

    command = [
        "cloudmapper", "public",
        "--config", self.config_file,
        "--account-name", account_name,
        "--output-file", output_file
    ]

    success, output = self.run_cloudmapper_command(command)

    if success:
        self.logger.info(f"Public resources analysis completed: \\\\{output_file\\\\}")
    else:
        self.logger.error(f"Public resources analysis failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

    return success, output_file if success else None

def generate_reports(self, account_name):
    """Generate comprehensive reports"""
    self.logger.info(f"Generating reports for account: \\\\{account_name\\\\}")

    reports = \\\\{\\\\}

    # Executive summary
    exec_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-executive.html")
    command = [
        "cloudmapper", "report",
        "--config", self.config_file,
        "--account-name", account_name,
        "--template", "executive",
        "--output-file", exec_output
    ]

    success, _ = self.run_cloudmapper_command(command)
    if success:
        reports["executive"] = exec_output

    # Technical report
    tech_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-technical.html")
    command = [
        "cloudmapper", "report",
        "--config", self.config_file,
        "--account-name", account_name,
        "--template", "technical",
        "--output-file", tech_output
    ]

    success, _ = self.run_cloudmapper_command(command)
    if success:
        reports["technical"] = tech_output

    return reports

def analyze_account(self, account_name):
    """Complete analysis for single account"""
    self.logger.info(f"Starting complete analysis for account: \\\\{account_name\\\\}")

    results = \\\\{
        "account_name": account_name,
        "timestamp": datetime.now().isoformat(),
        "success": True,
        "outputs": \\\\{\\\\}
    \\\\}

    # Collect data
    if not self.collect_account_data(account_name):
        results["success"] = False
        return results

    # Generate visualization
    success, viz_file = self.generate_visualization(account_name)
    if success:
        results["outputs"]["visualization"] = viz_file

    # Run security audit
    success, audit_file = self.run_security_audit(account_name)
    if success:
        results["outputs"]["audit"] = audit_file

    # Analyze public resources
    success, public_file = self.analyze_public_resources(account_name)
    if success:
        results["outputs"]["public_analysis"] = public_file

    # Generate reports
    reports = self.generate_reports(account_name)
    results["outputs"]["reports"] = reports

    self.logger.info(f"Analysis completed for account: \\\\{account_name\\\\}")
    return results

def run_multi_account_analysis(self):
    """Run analysis across all configured accounts"""
    config = self.load_config()
    if not config:
        return None

    accounts = config.get("accounts", [])
    if not accounts:
        self.logger.error("No accounts configured")
        return None

    self.logger.info(f"Starting multi-account analysis for \\\\{len(accounts)\\\\} accounts")

    all_results = []

    for account in accounts:
        account_name = account.get("name")
        if account_name:
            result = self.analyze_account(account_name)
            all_results.append(result)

    # Generate summary report
    summary = self.generate_summary_report(all_results)

    self.logger.info("Multi-account analysis completed")
    return all_results, summary

def generate_summary_report(self, results):
    """Generate summary report for all accounts"""
    summary = \\\\{
        "analysis_summary": \\\\{
            "total_accounts": len(results),
            "successful_accounts": len([r for r in results if r["success"]]),
            "failed_accounts": len([r for r in results if not r["success"]]),
            "timestamp": datetime.now().isoformat()
        \\\\},
        "account_results": results
    \\\\}

    summary_file = os.path.join(self.output_dir, "analysis-summary.json")
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2)

    self.logger.info(f"Summary report generated: \\\\{summary_file\\\\}")
    return summary

def cleanup_old_data(self, days_old=7):
    """Clean up old CloudMapper data"""
    self.logger.info(f"Cleaning up data older than \\\\{days_old\\\\} days")

    # This would implement cleanup logic
    # for old CloudMapper data files
    pass

Usage

if name == "main": automation = CloudMapperAutomation("config.json") results, summary = automation.run_multi_account_analysis()

if results:
    print(f"Analysis completed for \\\\{len(results)\\\\} accounts")
    print(f"Results saved in: \\\\{automation.output_dir\\\\}")

```_

Continuous Monitoring Script

```python

!/usr/bin/env python3

Continuous AWS monitoring with CloudMapper

import schedule import time import json import os from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import boto3

class CloudMapperMonitoring: def init(self, config_file="monitoring-config.json"): self.config_file = config_file self.load_monitoring_config() self.last_results = \\{\\}

def load_monitoring_config(self):
    """Load monitoring configuration"""
    with open(self.config_file, 'r') as f:
        self.config = json.load(f)

    self.accounts = self.config.get("accounts", [])
    self.monitoring_settings = self.config.get("monitoring", \\\\{\\\\})
    self.notification_settings = self.config.get("notifications", \\\\{\\\\})

def run_scheduled_assessment(self):
    """Run scheduled security assessment"""
    print(f"Running scheduled assessment at \\\\{datetime.now()\\\\}")

    current_results = \\\\{\\\\}

    for account in self.accounts:
        account_name = account.get("name")

        # Run CloudMapper audit
        audit_result = self.run_audit(account_name)
        current_results[account_name] = audit_result

        # Compare with previous results
        if account_name in self.last_results:
            changes = self.compare_results(
                self.last_results[account_name],
                audit_result
            )

            if changes:
                self.send_change_notification(account_name, changes)

    # Update last results
    self.last_results = current_results

    # Save results
    self.save_monitoring_results(current_results)

def run_audit(self, account_name):
    """Run CloudMapper audit for account"""
    output_file = f"monitoring-audit-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\\\}.json"

    command = [
        "cloudmapper", "audit",
        "--config", "config.json",
        "--account-name", account_name,
        "--output-file", output_file
    ]

    try:
        result = subprocess.run(command, capture_output=True, text=True)

        if result.returncode == 0:
            with open(output_file, 'r') as f:
                audit_data = json.load(f)

            # Clean up temporary file
            os.remove(output_file)

            return audit_data
        else:
            print(f"Audit failed for \\\\{account_name\\\\}: \\\\{result.stderr\\\\}")
            return None

    except Exception as e:
        print(f"Error running audit for \\\\{account_name\\\\}: \\\\{e\\\\}")
        return None

def compare_results(self, previous, current):
    """Compare audit results to detect changes"""
    changes = []

    if not previous or not current:
        return changes

    # Compare security findings
    prev_findings = previous.get("findings", [])
    curr_findings = current.get("findings", [])

    # New findings
    prev_finding_ids = \\\\{f.get("id") for f in prev_findings\\\\}
    new_findings = [f for f in curr_findings if f.get("id") not in prev_finding_ids]

    if new_findings:
        changes.append(\\\\{
            "type": "new_findings",
            "count": len(new_findings),
            "findings": new_findings
        \\\\})

    # Resolved findings
    curr_finding_ids = \\\\{f.get("id") for f in curr_findings\\\\}
    resolved_findings = [f for f in prev_findings if f.get("id") not in curr_finding_ids]

    if resolved_findings:
        changes.append(\\\\{
            "type": "resolved_findings",
            "count": len(resolved_findings),
            "findings": resolved_findings
        \\\\})

    return changes

def send_change_notification(self, account_name, changes):
    """Send notification about changes"""
    if not self.notification_settings.get("enabled", False):
        return

    subject = f"CloudMapper Alert: Changes detected in \\\\{account_name\\\\}"

    body = f"Changes detected in AWS account \\\\{account_name\\\\}:\n\n"

    for change in changes:
        change_type = change["type"]
        count = change["count"]

        if change_type == "new_findings":
            body += f"New security findings: \\\\{count\\\\}\n"
            for finding in change["findings"][:5]:  # Limit to first 5
                body += f"  - \\\\{finding.get('title', 'Unknown')\\\\}\n"

        elif change_type == "resolved_findings":
            body += f"Resolved security findings: \\\\{count\\\\}\n"

    self.send_email(subject, body)

def send_email(self, subject, body):
    """Send email notification"""
    email_config = self.notification_settings.get("email", \\\\{\\\\})

    if not email_config.get("enabled", False):
        return

    try:
        msg = MIMEMultipart()
        msg['From'] = email_config["from"]
        msg['To'] = email_config["to"]
        msg['Subject'] = subject

        msg.attach(MIMEText(body, 'plain'))

        server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
        server.starttls()
        server.login(email_config["username"], email_config["password"])

        text = msg.as_string()
        server.sendmail(email_config["from"], email_config["to"], text)
        server.quit()

        print(f"Notification sent: \\\\{subject\\\\}")

    except Exception as e:
        print(f"Failed to send email notification: \\\\{e\\\\}")

def save_monitoring_results(self, results):
    """Save monitoring results to file"""
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    filename = f"monitoring-results-\\\\{timestamp\\\\}.json"

    with open(filename, 'w') as f:
        json.dump(\\\\{
            "timestamp": datetime.now().isoformat(),
            "results": results
        \\\\}, f, indent=2)

def start_monitoring(self):
    """Start continuous monitoring"""
    print("Starting CloudMapper continuous monitoring...")

    # Schedule assessments
    interval = self.monitoring_settings.get("interval_hours", 24)
    schedule.every(interval).hours.do(self.run_scheduled_assessment)

    # Run initial assessment
    self.run_scheduled_assessment()

    # Keep running
    while True:
        schedule.run_pending()
        time.sleep(60)  # Check every minute

Usage

if name == "main": monitoring = CloudMapperMonitoring() monitoring.start_monitoring() ```_

Compliance Reporting Script

```python

!/usr/bin/env python3

CloudMapper compliance reporting

import json import subprocess from datetime import datetime import pandas as pd

class CloudMapperCompliance: def init(self, config_file="config.json"): self.config_file = config_file self.compliance_frameworks = \\{ "cis": "CIS AWS Foundations Benchmark", "nist": "NIST Cybersecurity Framework", "pci": "PCI DSS", "sox": "Sarbanes-Oxley Act", "hipaa": "HIPAA Security Rule" \\}

def run_compliance_audit(self, account_name, framework="cis"):
    """Run compliance audit for specific framework"""
    output_file = f"compliance-\\\\{framework\\\\}-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"

    command = [
        "cloudmapper", "audit",
        "--config", self.config_file,
        "--account-name", account_name,
        "--compliance-framework", framework,
        "--output-file", output_file
    ]

    try:
        result = subprocess.run(command, capture_output=True, text=True)

        if result.returncode == 0:
            with open(output_file, 'r') as f:
                compliance_data = json.load(f)
            return compliance_data
        else:
            print(f"Compliance audit failed: \\\\{result.stderr\\\\}")
            return None

    except Exception as e:
        print(f"Error running compliance audit: \\\\{e\\\\}")
        return None

def generate_compliance_report(self, account_name, frameworks=None):
    """Generate comprehensive compliance report"""
    if frameworks is None:
        frameworks = ["cis", "nist"]

    report_data = \\\\{
        "account_name": account_name,
        "report_date": datetime.now().isoformat(),
        "frameworks": \\\\{\\\\}
    \\\\}

    for framework in frameworks:
        print(f"Running \\\\{framework.upper()\\\\} compliance audit...")

        audit_result = self.run_compliance_audit(account_name, framework)

        if audit_result:
            compliance_score = self.calculate_compliance_score(audit_result)

            report_data["frameworks"][framework] = \\\\{
                "name": self.compliance_frameworks.get(framework, framework),
                "score": compliance_score,
                "findings": audit_result.get("findings", []),
                "recommendations": audit_result.get("recommendations", [])
            \\\\}

    # Generate HTML report
    html_report = self.generate_html_compliance_report(report_data)

    # Save reports
    json_file = f"compliance-report-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"
    html_file = f"compliance-report-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.html"

    with open(json_file, 'w') as f:
        json.dump(report_data, f, indent=2)

    with open(html_file, 'w') as f:
        f.write(html_report)

    print(f"Compliance report generated: \\\\{html_file\\\\}")
    return report_data

def calculate_compliance_score(self, audit_result):
    """Calculate compliance score from audit results"""
    findings = audit_result.get("findings", [])

    if not findings:
        return 100.0

    total_checks = len(findings)
    passed_checks = len([f for f in findings if f.get("status") == "PASS"])

    score = (passed_checks / total_checks) * 100
    return round(score, 2)

def generate_html_compliance_report(self, report_data):
    """Generate HTML compliance report"""
    html_template = """
CloudMapper Compliance Report

Compliance Report

Account: \\\\{account_name\\\\}

Report Date: \\\\{report_date\\\\}

\\\\{frameworks_html\\\\}
    """

    frameworks_html = ""

    for framework_id, framework_data in report_data["frameworks"].items():
        score = framework_data["score"]
        score_class = "pass" if score >= 80 else "fail"

        findings_html = ""
        for finding in framework_data["findings"][:10]:  # Limit to first 10
            status = finding.get("status", "UNKNOWN")
            status_class = "pass" if status == "PASS" else "fail"

            findings_html += f"""
            <div class="finding">
                <strong class="\\\\{status_class\\\\}">\\\\{status\\\\}</strong>: \\\\{finding.get("title", "Unknown")\\\\}
                <br><small>\\\\{finding.get("description", "")\\\\}</small>
            </div>
            """

        frameworks_html += f"""
        <div class="framework">
            <h2>\\\\{framework_data["name"]\\\\}</h2>
            <div class="score \\\\{score_class\\\\}">Compliance Score: \\\\{score\\\\}%</div>
            <h3>Key Findings:</h3>
            \\\\{findings_html\\\\}
        </div>
        """

    return html_template.format(
        account_name=report_data["account_name"],
        report_date=report_data["report_date"],
        frameworks_html=frameworks_html
    )

Usage

if name == "main": compliance = CloudMapperCompliance() report = compliance.generate_compliance_report("production", ["cis", "nist"]) ```_

Integrationsbeispiele

SIEM Integration

```python

!/usr/bin/env python3

CloudMapper SIEM integration

import json import requests from datetime import datetime

class CloudMapperSIEMIntegration: def init(self, siem_config): self.siem_config = siem_config self.siem_type = siem_config.get("type", "splunk")

def send_to_splunk(self, data):
    """Send CloudMapper data to Splunk"""
    splunk_config = self.siem_config.get("splunk", \\\\{\\\\})

    headers = \\\\{
        "Authorization": f"Splunk \\\\{splunk_config['token']\\\\}",
        "Content-Type": "application/json"
    \\\\}

    # Format data for Splunk
    events = []
    for finding in data.get("findings", []):
        event = \\\\{
            "time": datetime.now().timestamp(),
            "source": "cloudmapper",
            "sourcetype": "aws:security:finding",
            "event": finding
        \\\\}
        events.append(event)

    # Send to Splunk HEC
    response = requests.post(
        f"\\\\{splunk_config['url']\\\\}/services/collector/event",
        headers=headers,
        json=\\\\{"event": events\\\\}
    )

    return response.status_code == 200

def send_to_elasticsearch(self, data):
    """Send CloudMapper data to Elasticsearch"""
    es_config = self.siem_config.get("elasticsearch", \\\\{\\\\})

    # Format data for Elasticsearch
    for finding in data.get("findings", []):
        doc = \\\\{
            "timestamp": datetime.now().isoformat(),
            "source": "cloudmapper",
            "finding": finding
        \\\\}

        # Index document
        response = requests.post(
            f"\\\\{es_config['url']\\\\}/cloudmapper-findings/_doc",
            auth=(es_config['username'], es_config['password']),
            json=doc
        )

def send_findings(self, cloudmapper_data):
    """Send CloudMapper findings to SIEM"""
    if self.siem_type == "splunk":
        return self.send_to_splunk(cloudmapper_data)
    elif self.siem_type == "elasticsearch":
        return self.send_to_elasticsearch(cloudmapper_data)
    else:
        print(f"Unsupported SIEM type: \\\\{self.siem_type\\\\}")
        return False

Usage

siem_config = \\{ "type": "splunk", "splunk": \\{ "url": "https://splunk.example.com:8088", "token": "your-hec-token" \\} \\}

siem_integration = CloudMapperSIEMIntegration(siem_config) ```_

Integration von Terrain

```python

!/usr/bin/env python3

CloudMapper and Terraform integration

import json import subprocess import os

class CloudMapperTerraformIntegration: def init(self, terraform_dir): self.terraform_dir = terraform_dir

def analyze_terraform_plan(self, plan_file):
    """Analyze Terraform plan with CloudMapper"""

    # Parse Terraform plan
    with open(plan_file, 'r') as f:
        plan_data = json.load(f)

    # Extract AWS resources
    aws_resources = self.extract_aws_resources(plan_data)

    # Generate CloudMapper configuration
    config = self.generate_cloudmapper_config(aws_resources)

    return config

def extract_aws_resources(self, plan_data):
    """Extract AWS resources from Terraform plan"""
    aws_resources = []

    for resource_change in plan_data.get("resource_changes", []):
        resource_type = resource_change.get("type", "")

        if resource_type.startswith("aws_"):
            aws_resources.append(\\\\{
                "type": resource_type,
                "name": resource_change.get("name", ""),
                "change": resource_change.get("change", \\\\{\\\\}),
                "values": resource_change.get("change", \\\\{\\\\}).get("after", \\\\{\\\\})
            \\\\})

    return aws_resources

def generate_cloudmapper_config(self, aws_resources):
    """Generate CloudMapper configuration from Terraform resources"""

    # This would generate appropriate CloudMapper configuration
    # based on the Terraform resources

    config = \\\\{
        "accounts": [
            \\\\{
                "id": "123456789012",  # Would be extracted from Terraform
                "name": "terraform-managed"
            \\\\}
        ]
    \\\\}

    return config

Usage

terraform_integration = CloudMapperTerraformIntegration("/path/to/terraform") ```_

Fehlerbehebung

Gemeinsame Themen

Authentifizierungsprobleme: ```bash

Check AWS credentials

aws sts get-caller-identity

Verify IAM permissions

aws iam simulate-principal-policy \ --policy-source-arn arn:aws:iam::123456789012:user/cloudmapper-user \ --action-names ec2:DescribeInstances \ --resource-arns "*"

Test specific service access

aws ec2 describe-instances --region us-east-1 aws s3 ls

Check CloudMapper configuration

cloudmapper configure list-accounts --config config.json ```_

Datenerhebung: ```bash

Enable verbose logging

cloudmapper collect --config config.json --verbose

Test specific regions

cloudmapper collect --config config.json --regions us-east-1

Check for rate limiting

cloudmapper collect --config config.json --rate-limit 10

Verify account access

aws organizations describe-account --account-id 123456789012 ```_

Visualisierungsprobleme: ```bash

Check dependencies

pip install graphviz pydot

Install system graphviz

sudo apt install graphviz # Ubuntu/Debian brew install graphviz # macOS

Generate simplified diagram

cloudmapper visualize --config config.json --simplified

Check output directory permissions

ls -la output/ chmod 755 output/ ```_

Leistungsoptimierung

Optimierung der Cloud Mapper Performance:

```bash

Use specific regions

cloudmapper collect --config config.json --regions us-east-1,us-west-2

Limit services

cloudmapper collect --config config.json --services ec2,s3,iam

Parallel collection

cloudmapper collect --config config.json --threads 4

Use caching

cloudmapper collect --config config.json --cache-dir ./cache

Optimize memory usage

export PYTHONHASHSEED=0 ulimit -v 4194304 # Limit virtual memory ```_

Sicherheitsüberlegungen

Datenschutz

Sensitive Datenverarbeitung: - CloudMapper sammelt detaillierte AWS-Konfigurationsdaten - Speichern Sie Ausgabedateien sicher mit entsprechenden Zugriffssteuerungen - Verschlüsseln Sie sensible Berichte und Auditergebnisse - Umsetzung von Datenschutzbestimmungen für gesammelte Informationen - Verwenden Sie sichere Kanäle für die Übertragung von Berichten

Access Controls: - Limit CloudMapper IAM Berechtigungen auf Mindestanforderungen - Verwenden Sie temporäre Anmeldeinformationen, wenn möglich - Multifaktor-Authentifizierung für AWS-Zugriff implementieren - Regelmäßige Rotation der Zugangsschlüssel und Anmeldeinformationen - Monitor Cloud Mapper Nutzung durch CloudTrail

Operationelle Sicherheit

** Sichere Nutzungspraktiken:** - Run Cloud Mapper aus sicheren, überwachten Umgebungen - Validierung von Konfigurationsdateien vor der Ausführung - Regelmäßige Updates zu CloudMapper und Abhängigkeiten - Monitor für unberechtigte Nutzungs- oder Konfigurationsänderungen - Durchführung der Protokollierung und Prüfung von CloudMapper-Aktivitäten

Netzwerksicherheit: - Verwenden Sie VPN oder sichere Netzwerke für CloudMapper-Ausführung - Netzwerksegmentierung für Analyseumgebungen implementieren - Überwachen Sie den Netzwerkverkehr während der Datenerfassung - Verwenden Sie sichere Protokolle für alle Kommunikationen - Regelmäßige Sicherheitsbewertungen der CloudMapper Infrastruktur

Referenzen

  1. [CloudMapper GitHub Repository](https://__LINK_5___
  2. (LINK_5)
  3. CIS AWS Foundations Benchmark
  4. (LINK_5)
  5. [NIST Cybersecurity Framework](__LINK_5___