Aller au contenu

Aide-mémoire CloudMapper

## Aperçu

CloudMapper est un outil open-source puissant conçu pour aider les professionnels de la sécurité et les administrateurs cloud à visualiser et analyser les environnements Amazon Web Services (AWS) à des fins d’évaluation de sécurité et de conformité. Développé par Duo Security (maintenant partie de Cisco), CloudMapper offre des capacités complètes pour cartographier l’infrastructure AWS, identifier les mauvaises configurations de sécurité et générer des rapports détaillés sur les relations entre ressources cloud et les risques de sécurité potentiels. L’outil excelle dans la création de diagrammes réseau visuels qui illustrent les relations complexes entre les services AWS, facilitant la compréhension des chemins d’attaque, des flux de données et des limites de sécurité dans les environnements cloud.

La fonctionnalité principale de CloudMapper repose sur sa capacité à collecter des informations détaillées sur les ressources AWS via l’API AWS et à transformer ces données en visualisations et évaluations de sécurité significatives. L’outil peut cartographier les Virtual Private Clouds (VPCs), les sous-réseaux, les groupes de sécurité, les instances EC2, les équilibreurs de charge, les bases de données et de nombreux autres services AWS, offrant une vue d’ensemble de l’infrastructure cloud. La force de CloudMapper réside dans sa capacité à identifier les problèmes de sécurité potentiels tels que les groupes de sécurité trop permissifs, les ressources accessibles publiquement, les magasins de données non cryptés et les configurations réseau complexes qui pourraient introduire des vulnérabilités de sécurité.

CloudMapper prend en charge plusieurs formats de sortie, notamment des rapports HTML interactifs, des exportations de données JSON et des diagrammes réseau graphiques, ce qui le rend adapté à l’analyse technique et aux rapports exécutifs. L’architecture modulaire de l’outil permet la personnalisation et l’extension, tandis que son interface en ligne de commande permet l’intégration dans des workflows d’évaluation de sécurité automatisés et des pipelines de surveillance continue. Avec son accent mis sur les meilleures pratiques de sécurité AWS et les cadres de conformité, CloudMapper est devenu un outil essentiel pour les équipes de sécurité cloud, les testeurs de pénétration et les auditeurs de conformité travaillant avec des environnements AWS.

# Install CloudMapper
pip install cloudmapper

# Alternative: Install from source
git clone https://github.com/duo-labs/cloudmapper.git
cd cloudmapper
pip install -r requirements.txt
python setup.py install

# Verify installation
cloudmapper --help

# Install additional dependencies for visualization
pip install graphviz
pip install pydot

# Install AWS CLI (required for authentication)
pip install awscli

# Configure AWS credentials
aws configure
```### Installation du package Python

Installation de CloudMapper en utilisant pip:
```bash
# Create CloudMapper Docker environment
cat > Dockerfile << 'EOF'
FROM python:3.8-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    graphviz \
    && rm -rf /var/lib/apt/lists/*

# Install CloudMapper
RUN pip install cloudmapper

# Create working directory
WORKDIR /cloudmapper

# Copy AWS credentials (mount at runtime)
VOLUME ["/root/.aws"]

CMD ["cloudmapper", "--help"]
EOF

# Build container
docker build -t cloudmapper .

# Run CloudMapper
docker run -it -v ~/.aws:/root/.aws -v $(pwd)/output:/cloudmapper/output cloudmapper

# Example usage
docker run -it -v ~/.aws:/root/.aws cloudmapper cloudmapper collect --account-name myaccount
```### Installation Docker
```bash
# Create virtual environment
python3 -m venv cloudmapper-env
source cloudmapper-env/bin/activate

# Install CloudMapper
pip install cloudmapper

# Install additional tools
pip install boto3 botocore

# Create configuration directory
mkdir -p ~/.cloudmapper

# Verify setup
cloudmapper --version
```### Configuration de l'environnement virtuel
```bash
# Create IAM policy for CloudMapper
cat > cloudmapper-policy.json << 'EOF'
\\\\{
    "Version": "2012-10-17",
    "Statement": [
        \\\\{
            "Effect": "Allow",
            "Action": [
                "ec2:Describe*",
                "elasticloadbalancing:Describe*",
                "rds:Describe*",
                "s3:GetBucketLocation",
                "s3:GetBucketLogging",
                "s3:GetBucketPolicy",
                "s3:GetBucketVersioning",
                "s3:GetBucketWebsite",
                "s3:GetBucketAcl",
                "s3:ListAllMyBuckets",
                "cloudfront:ListDistributions",
                "cloudfront:GetDistribution",
                "route53:ListHostedZones",
                "route53:ListResourceRecordSets",
                "iam:GetAccountSummary",
                "iam:ListUsers",
                "iam:ListRoles",
                "iam:ListGroups",
                "iam:ListPolicies",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListRolePolicies",
                "iam:ListAttachedRolePolicies",
                "iam:GetPolicy",
                "iam:GetPolicyVersion",
                "lambda:ListFunctions",
                "lambda:GetFunction",
                "apigateway:GET"
            ],
            "Resource": "*"
        \\\\}
    ]
\\\\}
EOF

# Create IAM user and attach policy
aws iam create-user --user-name cloudmapper-user
aws iam put-user-policy --user-name cloudmapper-user --policy-name CloudMapperPolicy --policy-document file://cloudmapper-policy.json

# Create access keys
aws iam create-access-key --user-name cloudmapper-user
```### Configuration des permissions AWS
```bash
# Configure AWS account
cloudmapper configure add-account --config-file config.json --name myaccount --id 123456789012

# Alternative: Manual configuration
cat > config.json << 'EOF'
\\\\{
    "accounts": [
        \\\\{
            "id": "123456789012",
            "name": "production",
            "default": true
        \\\\},
        \\\\{
            "id": "123456789013",
            "name": "staging",
            "default": false
        \\\\}
    ]
\\\\}
EOF

# List configured accounts
cloudmapper configure list-accounts --config config.json

# Set default account
cloudmapper configure set-default --config config.json --name production
```### Configuration des comptes

Configuration des comptes AWS pour CloudMapper:
```bash
# Collect data for default account
cloudmapper collect --config config.json

# Collect data for specific account
cloudmapper collect --config config.json --account-name production

# Collect data for specific regions
cloudmapper collect --config config.json --regions us-east-1,us-west-2

# Collect with specific profile
cloudmapper collect --config config.json --profile myprofile

# Collect with custom output directory
cloudmapper collect --config config.json --output-dir ./aws-data

# Verbose collection
cloudmapper collect --config config.json --verbose

# Collect specific services only
cloudmapper collect --config config.json --services ec2,s3,iam
```### Collecte de données

Collecte des données de l'environnement AWS:
```bash
# Generate network diagram
cloudmapper visualize --config config.json --account-name production

# Generate diagram for specific regions
cloudmapper visualize --config config.json --regions us-east-1

# Generate simplified diagram
cloudmapper visualize --config config.json --simplified

# Generate diagram with custom output
cloudmapper visualize --config config.json --output-file network-diagram.html

# Generate PNG diagram
cloudmapper visualize --config config.json --output-format png

# Generate with custom styling
cloudmapper visualize --config config.json --style-file custom-style.css
```### Visualisation de base

Création de visualisations réseau de base:
```bash
# Run security audit
cloudmapper audit --config config.json --account-name production

# Run specific audit checks
cloudmapper audit --config config.json --checks security_groups,s3_buckets

# Generate audit report
cloudmapper audit --config config.json --output-file security-audit.json

# Run compliance checks
cloudmapper audit --config config.json --compliance-framework cis

# Audit with custom severity
cloudmapper audit --config config.json --min-severity medium

# Export audit results
cloudmapper audit --config config.json --format csv --output-file audit-results.csv
```### Analyse de sécurité

Exécution d'évaluations de sécurité:
```bash
# Generate detailed network map
cloudmapper visualize --config config.json \
    --account-name production \
    --regions us-east-1,us-west-2 \
    --include-services ec2,rds,elb,s3 \
    --output-file detailed-network.html

# Create VPC-focused diagram
cloudmapper visualize --config config.json \
    --vpc-only \
    --show-security-groups \
    --show-nacls \
    --output-file vpc-diagram.html

# Generate public resource map
cloudmapper public --config config.json \
    --account-name production \
    --output-file public-resources.html

# Create cross-region visualization
cloudmapper visualize --config config.json \
    --all-regions \
    --cross-region-connections \
    --output-file global-network.html

# Generate service dependency map
cloudmapper dependencies --config config.json \
    --service ec2 \
    --output-file ec2-dependencies.html
```### Visualisation personnalisée

Création de visualisations réseau avancées:
```bash
# Run comprehensive security audit
cloudmapper audit --config config.json \
    --account-name production \
    --all-checks \
    --output-file comprehensive-audit.json

# Check for public resources
cloudmapper public --config config.json \
    --account-name production \
    --services s3,ec2,rds,elb \
    --output-file public-exposure.json

# Analyze security groups
cloudmapper sg-audit --config config.json \
    --account-name production \
    --check-unused \
    --check-overpermissive \
    --output-file sg-audit.json

# Check IAM permissions
cloudmapper iam-audit --config config.json \
    --account-name production \
    --check-admin-users \
    --check-unused-roles \
    --output-file iam-audit.json

# Analyze network ACLs
cloudmapper nacl-audit --config config.json \
    --account-name production \
    --check-default-allow \
    --output-file nacl-audit.json
```### Évaluation de sécurité

Analyse de sécurité complète:
```bash
# Configure multiple accounts
cat > multi-account-config.json << 'EOF'
\\\\{
    "accounts": [
        \\\\{
            "id": "111111111111",
            "name": "production",
            "profile": "prod-profile"
        \\\\},
        \\\\{
            "id": "222222222222",
            "name": "staging",
            "profile": "staging-profile"
        \\\\},
        \\\\{
            "id": "333333333333",
            "name": "development",
            "profile": "dev-profile"
        \\\\}
    ]
\\\\}
EOF

# Collect data from all accounts
for account in production staging development; do
    cloudmapper collect --config multi-account-config.json --account-name $account
done

# Generate cross-account visualization
cloudmapper visualize --config multi-account-config.json \
    --all-accounts \
    --cross-account-connections \
    --output-file cross-account-network.html

# Run security audit across all accounts
cloudmapper audit --config multi-account-config.json \
    --all-accounts \
    --output-file multi-account-audit.json

# Compare security posture across accounts
cloudmapper compare --config multi-account-config.json \
    --accounts production,staging,development \
    --output-file account-comparison.html
```### Analyse multi-comptes

Analyse de plusieurs comptes AWS:
```bash
# Generate executive summary
cloudmapper report --config config.json \
    --account-name production \
    --template executive \
    --output-file executive-summary.html

# Create technical report
cloudmapper report --config config.json \
    --account-name production \
    --template technical \
    --include-recommendations \
    --output-file technical-report.html

# Export raw data
cloudmapper export --config config.json \
    --account-name production \
    --format json \
    --output-file raw-data.json

# Generate CSV exports
cloudmapper export --config config.json \
    --account-name production \
    --format csv \
    --services ec2,s3,rds \
    --output-dir csv-exports/

# Create compliance report
cloudmapper compliance --config config.json \
    --account-name production \
    --framework cis-aws \
    --output-file cis-compliance.html
```### Rapport personnalisé

Création de rapports et d'exportations personnalisés:
```python
#!/usr/bin/env python3
# Automated AWS security assessment with CloudMapper

import subprocess
import json
import os
import boto3
from datetime import datetime
import logging

class CloudMapperAutomation:
    def __init__(self, config_file="config.json"):
        self.config_file = config_file
        self.output_dir = f"cloudmapper-output-\\\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\\\}"
        self.setup_logging()

        # Create output directory
        os.makedirs(self.output_dir, exist_ok=True)

    def setup_logging(self):
        """Setup logging configuration"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(f'\\\\{self.output_dir\\\\}/cloudmapper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def load_config(self):
        """Load CloudMapper configuration"""
        try:
            with open(self.config_file, 'r') as f:
                config = json.load(f)
            return config
        except FileNotFoundError:
            self.logger.error(f"Configuration file \\\\{self.config_file\\\\} not found")
            return None
        except json.JSONDecodeError:
            self.logger.error(f"Invalid JSON in configuration file \\\\{self.config_file\\\\}")
            return None

    def run_cloudmapper_command(self, command):
        """Execute CloudMapper command"""
        try:
            self.logger.info(f"Running command: \\\\{' '.join(command)\\\\}")
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=1800  # 30 minute timeout
            )

            if result.returncode == 0:
                self.logger.info("Command completed successfully")
                return True, result.stdout
            else:
                self.logger.error(f"Command failed: \\\\{result.stderr\\\\}")
                return False, result.stderr

        except subprocess.TimeoutExpired:
            self.logger.error("Command timed out")
            return False, "Command timed out"
        except Exception as e:
            self.logger.error(f"Error running command: \\\\{e\\\\}")
            return False, str(e)

    def collect_account_data(self, account_name):
        """Collect data for specific account"""
        self.logger.info(f"Collecting data for account: \\\\{account_name\\\\}")

        command = [
            "cloudmapper", "collect",
            "--config", self.config_file,
            "--account-name", account_name
        ]

        success, output = self.run_cloudmapper_command(command)

        if success:
            self.logger.info(f"Data collection completed for \\\\{account_name\\\\}")
        else:
            self.logger.error(f"Data collection failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

        return success

    def generate_visualization(self, account_name):
        """Generate network visualization"""
        self.logger.info(f"Generating visualization for account: \\\\{account_name\\\\}")

        output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-network.html")

        command = [
            "cloudmapper", "visualize",
            "--config", self.config_file,
            "--account-name", account_name,
            "--output-file", output_file
        ]

        success, output = self.run_cloudmapper_command(command)

        if success:
            self.logger.info(f"Visualization generated: \\\\{output_file\\\\}")
        else:
            self.logger.error(f"Visualization failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

        return success, output_file if success else None

    def run_security_audit(self, account_name):
        """Run security audit"""
        self.logger.info(f"Running security audit for account: \\\\{account_name\\\\}")

        output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-audit.json")

        command = [
            "cloudmapper", "audit",
            "--config", self.config_file,
            "--account-name", account_name,
            "--output-file", output_file
        ]

        success, output = self.run_cloudmapper_command(command)

        if success:
            self.logger.info(f"Security audit completed: \\\\{output_file\\\\}")
            return success, output_file
        else:
            self.logger.error(f"Security audit failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
            return success, None

    def analyze_public_resources(self, account_name):
        """Analyze public resources"""
        self.logger.info(f"Analyzing public resources for account: \\\\{account_name\\\\}")

        output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-public.html")

        command = [
            "cloudmapper", "public",
            "--config", self.config_file,
            "--account-name", account_name,
            "--output-file", output_file
        ]

        success, output = self.run_cloudmapper_command(command)

        if success:
            self.logger.info(f"Public resources analysis completed: \\\\{output_file\\\\}")
        else:
            self.logger.error(f"Public resources analysis failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

        return success, output_file if success else None

    def generate_reports(self, account_name):
        """Generate comprehensive reports"""
        self.logger.info(f"Generating reports for account: \\\\{account_name\\\\}")

        reports = \\\\{\\\\}

        # Executive summary
        exec_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-executive.html")
        command = [
            "cloudmapper", "report",
            "--config", self.config_file,
            "--account-name", account_name,
            "--template", "executive",
            "--output-file", exec_output
        ]

        success, _ = self.run_cloudmapper_command(command)
        if success:
            reports["executive"] = exec_output

        # Technical report
        tech_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-technical.html")
        command = [
            "cloudmapper", "report",
            "--config", self.config_file,
            "--account-name", account_name,
            "--template", "technical",
            "--output-file", tech_output
        ]

        success, _ = self.run_cloudmapper_command(command)
        if success:
            reports["technical"] = tech_output

        return reports

    def analyze_account(self, account_name):
        """Complete analysis for single account"""
        self.logger.info(f"Starting complete analysis for account: \\\\{account_name\\\\}")

        results = \\\\{
            "account_name": account_name,
            "timestamp": datetime.now().isoformat(),
            "success": True,
            "outputs": \\\\{\\\\}
        \\\\}

        # Collect data
        if not self.collect_account_data(account_name):
            results["success"] = False
            return results

        # Generate visualization
        success, viz_file = self.generate_visualization(account_name)
        if success:
            results["outputs"]["visualization"] = viz_file

        # Run security audit
        success, audit_file = self.run_security_audit(account_name)
        if success:
            results["outputs"]["audit"] = audit_file

        # Analyze public resources
        success, public_file = self.analyze_public_resources(account_name)
        if success:
            results["outputs"]["public_analysis"] = public_file

        # Generate reports
        reports = self.generate_reports(account_name)
        results["outputs"]["reports"] = reports

        self.logger.info(f"Analysis completed for account: \\\\{account_name\\\\}")
        return results

    def run_multi_account_analysis(self):
        """Run analysis across all configured accounts"""
        config = self.load_config()
        if not config:
            return None

        accounts = config.get("accounts", [])
        if not accounts:
            self.logger.error("No accounts configured")
            return None

        self.logger.info(f"Starting multi-account analysis for \\\\{len(accounts)\\\\} accounts")

        all_results = []

        for account in accounts:
            account_name = account.get("name")
            if account_name:
                result = self.analyze_account(account_name)
                all_results.append(result)

        # Generate summary report
        summary = self.generate_summary_report(all_results)

        self.logger.info("Multi-account analysis completed")
        return all_results, summary

    def generate_summary_report(self, results):
        """Generate summary report for all accounts"""
        summary = \\\\{
            "analysis_summary": \\\\{
                "total_accounts": len(results),
                "successful_accounts": len([r for r in results if r["success"]]),
                "failed_accounts": len([r for r in results if not r["success"]]),
                "timestamp": datetime.now().isoformat()
            \\\\},
            "account_results": results
        \\\\}

        summary_file = os.path.join(self.output_dir, "analysis-summary.json")
        with open(summary_file, 'w') as f:
            json.dump(summary, f, indent=2)

        self.logger.info(f"Summary report generated: \\\\{summary_file\\\\}")
        return summary

    def cleanup_old_data(self, days_old=7):
        """Clean up old CloudMapper data"""
        self.logger.info(f"Cleaning up data older than \\\\{days_old\\\\} days")

        # This would implement cleanup logic
        # for old CloudMapper data files
        pass

# Usage
if __name__ == "__main__":
    automation = CloudMapperAutomation("config.json")
    results, summary = automation.run_multi_account_analysis()

    if results:
        print(f"Analysis completed for \\\\{len(results)\\\\} accounts")
        print(f"Results saved in: \\\\{automation.output_dir\\\\}")
```### Script d'évaluation de sécurité AWS automatisé
```python
#!/usr/bin/env python3
# Continuous AWS monitoring with CloudMapper

import schedule
import time
import json
import os
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import boto3

class CloudMapperMonitoring:
    def __init__(self, config_file="monitoring-config.json"):
        self.config_file = config_file
        self.load_monitoring_config()
        self.last_results = \\\\{\\\\}

    def load_monitoring_config(self):
        """Load monitoring configuration"""
        with open(self.config_file, 'r') as f:
            self.config = json.load(f)

        self.accounts = self.config.get("accounts", [])
        self.monitoring_settings = self.config.get("monitoring", \\\\{\\\\})
        self.notification_settings = self.config.get("notifications", \\\\{\\\\})

    def run_scheduled_assessment(self):
        """Run scheduled security assessment"""
        print(f"Running scheduled assessment at \\\\{datetime.now()\\\\}")

        current_results = \\\\{\\\\}

        for account in self.accounts:
            account_name = account.get("name")

            # Run CloudMapper audit
            audit_result = self.run_audit(account_name)
            current_results[account_name] = audit_result

            # Compare with previous results
            if account_name in self.last_results:
                changes = self.compare_results(
                    self.last_results[account_name],
                    audit_result
                )

                if changes:
                    self.send_change_notification(account_name, changes)

        # Update last results
        self.last_results = current_results

        # Save results
        self.save_monitoring_results(current_results)

    def run_audit(self, account_name):
        """Run CloudMapper audit for account"""
        output_file = f"monitoring-audit-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\\\}.json"

        command = [
            "cloudmapper", "audit",
            "--config", "config.json",
            "--account-name", account_name,
            "--output-file", output_file
        ]

        try:
            result = subprocess.run(command, capture_output=True, text=True)

            if result.returncode == 0:
                with open(output_file, 'r') as f:
                    audit_data = json.load(f)

                # Clean up temporary file
                os.remove(output_file)

                return audit_data
            else:
                print(f"Audit failed for \\\\{account_name\\\\}: \\\\{result.stderr\\\\}")
                return None

        except Exception as e:
            print(f"Error running audit for \\\\{account_name\\\\}: \\\\{e\\\\}")
            return None

    def compare_results(self, previous, current):
        """Compare audit results to detect changes"""
        changes = []

        if not previous or not current:
            return changes

        # Compare security findings
        prev_findings = previous.get("findings", [])
        curr_findings = current.get("findings", [])

        # New findings
        prev_finding_ids = \\\\{f.get("id") for f in prev_findings\\\\}
        new_findings = [f for f in curr_findings if f.get("id") not in prev_finding_ids]

        if new_findings:
            changes.append(\\\\{
                "type": "new_findings",
                "count": len(new_findings),
                "findings": new_findings
            \\\\})

        # Resolved findings
        curr_finding_ids = \\\\{f.get("id") for f in curr_findings\\\\}
        resolved_findings = [f for f in prev_findings if f.get("id") not in curr_finding_ids]

        if resolved_findings:
            changes.append(\\\\{
                "type": "resolved_findings",
                "count": len(resolved_findings),
                "findings": resolved_findings
            \\\\})

        return changes

    def send_change_notification(self, account_name, changes):
        """Send notification about changes"""
        if not self.notification_settings.get("enabled", False):
            return

        subject = f"CloudMapper Alert: Changes detected in \\\\{account_name\\\\}"

        body = f"Changes detected in AWS account \\\\{account_name\\\\}:\n\n"

        for change in changes:
            change_type = change["type"]
            count = change["count"]

            if change_type == "new_findings":
                body += f"New security findings: \\\\{count\\\\}\n"
                for finding in change["findings"][:5]:  # Limit to first 5
                    body += f"  - \\\\{finding.get('title', 'Unknown')\\\\}\n"

            elif change_type == "resolved_findings":
                body += f"Resolved security findings: \\\\{count\\\\}\n"

        self.send_email(subject, body)

    def send_email(self, subject, body):
        """Send email notification"""
        email_config = self.notification_settings.get("email", \\\\{\\\\})

        if not email_config.get("enabled", False):
            return

        try:
            msg = MIMEMultipart()
            msg['From'] = email_config["from"]
            msg['To'] = email_config["to"]
            msg['Subject'] = subject

            msg.attach(MIMEText(body, 'plain'))

            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
            server.starttls()
            server.login(email_config["username"], email_config["password"])

            text = msg.as_string()
            server.sendmail(email_config["from"], email_config["to"], text)
            server.quit()

            print(f"Notification sent: \\\\{subject\\\\}")

        except Exception as e:
            print(f"Failed to send email notification: \\\\{e\\\\}")

    def save_monitoring_results(self, results):
        """Save monitoring results to file"""
        timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
        filename = f"monitoring-results-\\\\{timestamp\\\\}.json"

        with open(filename, 'w') as f:
            json.dump(\\\\{
                "timestamp": datetime.now().isoformat(),
                "results": results
            \\\\}, f, indent=2)

    def start_monitoring(self):
        """Start continuous monitoring"""
        print("Starting CloudMapper continuous monitoring...")

        # Schedule assessments
        interval = self.monitoring_settings.get("interval_hours", 24)
        schedule.every(interval).hours.do(self.run_scheduled_assessment)

        # Run initial assessment
        self.run_scheduled_assessment()

        # Keep running
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute

# Usage
if __name__ == "__main__":
    monitoring = CloudMapperMonitoring()
    monitoring.start_monitoring()
```### Script de surveillance continue
```python
#!/usr/bin/env python3
# CloudMapper compliance reporting

import json
import subprocess
from datetime import datetime
import pandas as pd

class CloudMapperCompliance:
    def __init__(self, config_file="config.json"):
        self.config_file = config_file
        self.compliance_frameworks = \\\\{
            "cis": "CIS AWS Foundations Benchmark",
            "nist": "NIST Cybersecurity Framework",
            "pci": "PCI DSS",
            "sox": "Sarbanes-Oxley Act",
            "hipaa": "HIPAA Security Rule"
        \\\\}

    def run_compliance_audit(self, account_name, framework="cis"):
        """Run compliance audit for specific framework"""
        output_file = f"compliance-\\\\{framework\\\\}-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"

        command = [
            "cloudmapper", "audit",
            "--config", self.config_file,
            "--account-name", account_name,
            "--compliance-framework", framework,
            "--output-file", output_file
        ]

        try:
            result = subprocess.run(command, capture_output=True, text=True)

            if result.returncode == 0:
                with open(output_file, 'r') as f:
                    compliance_data = json.load(f)
                return compliance_data
            else:
                print(f"Compliance audit failed: \\\\{result.stderr\\\\}")
                return None

        except Exception as e:
            print(f"Error running compliance audit: \\\\{e\\\\}")
            return None

    def generate_compliance_report(self, account_name, frameworks=None):
        """Generate comprehensive compliance report"""
        if frameworks is None:
            frameworks = ["cis", "nist"]

        report_data = \\\\{
            "account_name": account_name,
            "report_date": datetime.now().isoformat(),
            "frameworks": \\\\{\\\\}
        \\\\}

        for framework in frameworks:
            print(f"Running \\\\{framework.upper()\\\\} compliance audit...")

            audit_result = self.run_compliance_audit(account_name, framework)

            if audit_result:
                compliance_score = self.calculate_compliance_score(audit_result)

                report_data["frameworks"][framework] = \\\\{
                    "name": self.compliance_frameworks.get(framework, framework),
                    "score": compliance_score,
                    "findings": audit_result.get("findings", []),
                    "recommendations": audit_result.get("recommendations", [])
                \\\\}

        # Generate HTML report
        html_report = self.generate_html_compliance_report(report_data)

        # Save reports
        json_file = f"compliance-report-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"
        html_file = f"compliance-report-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.html"

        with open(json_file, 'w') as f:
            json.dump(report_data, f, indent=2)

        with open(html_file, 'w') as f:
            f.write(html_report)

        print(f"Compliance report generated: \\\\{html_file\\\\}")
        return report_data

    def calculate_compliance_score(self, audit_result):
        """Calculate compliance score from audit results"""
        findings = audit_result.get("findings", [])

        if not findings:
            return 100.0

        total_checks = len(findings)
        passed_checks = len([f for f in findings if f.get("status") == "PASS"])

        score = (passed_checks / total_checks) * 100
        return round(score, 2)

    def generate_html_compliance_report(self, report_data):
        """Generate HTML compliance report"""
        html_template = """
<!DOCTYPE html>
<html>
<head>
    <title>CloudMapper Compliance Report</title>
    <style>
        body \\\\{ font-family: Arial, sans-serif; margin: 20px; \\\\}
        .header \\\\{ background-color: #f0f0f0; padding: 20px; \\\\}
        .framework \\\\{ margin: 20px 0; padding: 15px; border: 1px solid #ddd; \\\\}
        .score \\\\{ font-size: 24px; font-weight: bold; \\\\}
        .pass \\\\{ color: green; \\\\}
        .fail \\\\{ color: red; \\\\}
        .finding \\\\{ margin: 10px 0; padding: 10px; background-color: #f9f9f9; \\\\}
    </style>
</head>
<body>
    <div class="header">
        <h1>Compliance Report</h1>
        <p>Account: \\\\{account_name\\\\}</p>
        <p>Report Date: \\\\{report_date\\\\}</p>
    </div>

    \\\\{frameworks_html\\\\}
</body>
</html>
        """

        frameworks_html = ""

        for framework_id, framework_data in report_data["frameworks"].items():
            score = framework_data["score"]
            score_class = "pass" if score >= 80 else "fail"

            findings_html = ""
            for finding in framework_data["findings"][:10]:  # Limit to first 10
                status = finding.get("status", "UNKNOWN")
                status_class = "pass" if status == "PASS" else "fail"

                findings_html += f"""
                <div class="finding">
                    <strong class="\\\\{status_class\\\\}">\\\\{status\\\\}</strong>: \\\\{finding.get("title", "Unknown")\\\\}
                    <br><small>\\\\{finding.get("description", "")\\\\}</small>
                </div>
                """

            frameworks_html += f"""
            <div class="framework">
                <h2>\\\\{framework_data["name"]\\\\}</h2>
                <div class="score \\\\{score_class\\\\}">Compliance Score: \\\\{score\\\\}%</div>
                <h3>Key Findings:</h3>
                \\\\{findings_html\\\\}
            </div>
            """

        return html_template.format(
            account_name=report_data["account_name"],
            report_date=report_data["report_date"],
            frameworks_html=frameworks_html
        )

# Usage
if __name__ == "__main__":
    compliance = CloudMapperCompliance()
    report = compliance.generate_compliance_report("production", ["cis", "nist"])
```### Script de rapport de conformité
```python
#!/usr/bin/env python3
# CloudMapper SIEM integration

import json
import requests
from datetime import datetime

class CloudMapperSIEMIntegration:
    def __init__(self, siem_config):
        self.siem_config = siem_config
        self.siem_type = siem_config.get("type", "splunk")

    def send_to_splunk(self, data):
        """Send CloudMapper data to Splunk"""
        splunk_config = self.siem_config.get("splunk", \\\\{\\\\})

        headers = \\\\{
            "Authorization": f"Splunk \\\\{splunk_config['token']\\\\}",
            "Content-Type": "application/json"
        \\\\}

        # Format data for Splunk
        events = []
        for finding in data.get("findings", []):
            event = \\\\{
                "time": datetime.now().timestamp(),
                "source": "cloudmapper",
                "sourcetype": "aws:security:finding",
                "event": finding
            \\\\}
            events.append(event)

        # Send to Splunk HEC
        response = requests.post(
            f"\\\\{splunk_config['url']\\\\}/services/collector/event",
            headers=headers,
            json=\\\\{"event": events\\\\}
        )

        return response.status_code == 200

    def send_to_elasticsearch(self, data):
        """Send CloudMapper data to Elasticsearch"""
        es_config = self.siem_config.get("elasticsearch", \\\\{\\\\})

        # Format data for Elasticsearch
        for finding in data.get("findings", []):
            doc = \\\\{
                "timestamp": datetime.now().isoformat(),
                "source": "cloudmapper",
                "finding": finding
            \\\\}

            # Index document
            response = requests.post(
                f"\\\\{es_config['url']\\\\}/cloudmapper-findings/_doc",
                auth=(es_config['username'], es_config['password']),
                json=doc
            )

    def send_findings(self, cloudmapper_data):
        """Send CloudMapper findings to SIEM"""
        if self.siem_type == "splunk":
            return self.send_to_splunk(cloudmapper_data)
        elif self.siem_type == "elasticsearch":
            return self.send_to_elasticsearch(cloudmapper_data)
        else:
            print(f"Unsupported SIEM type: \\\\{self.siem_type\\\\}")
            return False

# Usage
siem_config = \\\\{
    "type": "splunk",
    "splunk": \\\\{
        "url": "https://splunk.example.com:8088",
        "token": "your-hec-token"
    \\\\}
\\\\}

siem_integration = CloudMapperSIEMIntegration(siem_config)
```### Intégration SIEM
```python
#!/usr/bin/env python3
# CloudMapper and Terraform integration

import json
import subprocess
import os

class CloudMapperTerraformIntegration:
    def __init__(self, terraform_dir):
        self.terraform_dir = terraform_dir

    def analyze_terraform_plan(self, plan_file):
        """Analyze Terraform plan with CloudMapper"""

        # Parse Terraform plan
        with open(plan_file, 'r') as f:
            plan_data = json.load(f)

        # Extract AWS resources
        aws_resources = self.extract_aws_resources(plan_data)

        # Generate CloudMapper configuration
        config = self.generate_cloudmapper_config(aws_resources)

        return config

    def extract_aws_resources(self, plan_data):
        """Extract AWS resources from Terraform plan"""
        aws_resources = []

        for resource_change in plan_data.get("resource_changes", []):
            resource_type = resource_change.get("type", "")

            if resource_type.startswith("aws_"):
                aws_resources.append(\\\\{
                    "type": resource_type,
                    "name": resource_change.get("name", ""),
                    "change": resource_change.get("change", \\\\{\\\\}),
                    "values": resource_change.get("change", \\\\{\\\\}).get("after", \\\\{\\\\})
                \\\\})

        return aws_resources

    def generate_cloudmapper_config(self, aws_resources):
        """Generate CloudMapper configuration from Terraform resources"""

        # This would generate appropriate CloudMapper configuration
        # based on the Terraform resources

        config = \\\\{
            "accounts": [
                \\\\{
                    "id": "123456789012",  # Would be extracted from Terraform
                    "name": "terraform-managed"
                \\\\}
            ]
        \\\\}

        return config

# Usage
terraform_integration = CloudMapperTerraformIntegration("/path/to/terraform")
```### Intégration Terraform
```bash
# Check AWS credentials
aws sts get-caller-identity

# Verify IAM permissions
aws iam simulate-principal-policy \
    --policy-source-arn arn:aws:iam::123456789012:user/cloudmapper-user \
    --action-names ec2:DescribeInstances \
    --resource-arns "*"

# Test specific service access
aws ec2 describe-instances --region us-east-1
aws s3 ls

# Check CloudMapper configuration
cloudmapper configure list-accounts --config config.json
```### Problèmes courants

**Problèmes d'authentification:**

**Problèmes de collecte de données:**```bash
# Enable verbose logging
cloudmapper collect --config config.json --verbose

# Test specific regions
cloudmapper collect --config config.json --regions us-east-1

# Check for rate limiting
cloudmapper collect --config config.json --rate-limit 10

# Verify account access
aws organizations describe-account --account-id 123456789012

Problèmes de Visualisation :

# Check dependencies
pip install graphviz pydot

# Install system graphviz
sudo apt install graphviz  # Ubuntu/Debian
brew install graphviz      # macOS

# Generate simplified diagram
cloudmapper visualize --config config.json --simplified

# Check output directory permissions
ls -la output/
chmod 755 output/

Optimisation des Performances

Optimisation des performances de CloudMapper :

# Use specific regions
cloudmapper collect --config config.json --regions us-east-1,us-west-2

# Limit services
cloudmapper collect --config config.json --services ec2,s3,iam

# Parallel collection
cloudmapper collect --config config.json --threads 4

# Use caching
cloudmapper collect --config config.json --cache-dir ./cache

# Optimize memory usage
export PYTHONHASHSEED=0
ulimit -v 4194304  # Limit virtual memory

Considérations de Sécurité

Protection des Données

Gestion des Données Sensibles :

  • CloudMapper collecte des données détaillées de configuration AWS
  • Stocker les fichiers de sortie de manière sécurisée avec des contrôles d’accès appropriés
  • Chiffrer les rapports sensibles et les résultats d’audit
  • Mettre en place des politiques de conservation des données collectées
  • Utiliser des canaux sécurisés pour la transmission des rapports

Contrôles d’Accès :

  • Limiter les permissions IAM de CloudMapper au minimum requis
  • Utiliser des identifiants temporaires si possible
  • Mettre en place une authentification multi-facteurs pour l’accès AWS
  • Rotation régulière des clés d’accès et des identifiants
  • Surveiller l’utilisation de CloudMapper via CloudTrail

Sécurité Opérationnelle

Pratiques d’Utilisation Sécurisées :

  • Exécuter CloudMapper depuis des environnements sécurisés et surveillés
  • Valider les fichiers de configuration avant l’exécution
  • Mises à jour régulières de CloudMapper et de ses dépendances
  • Surveiller les utilisations non autorisées ou les modifications de configuration
  • Mettre en place la journalisation et l’audit des activités de CloudMapper

Sécurité Réseau :

  • Utiliser un VPN ou des réseaux sécurisés pour l’exécution de CloudMapper
  • Mettre en place la segmentation réseau pour les environnements d’analyse
  • Surveiller le trafic réseau pendant la collecte de données
  • Utiliser des protocoles sécurisés pour toutes les communications
  • Évaluations de sécurité régulières de l’infrastructure CloudMapper

Références

CloudMapper GitHub RepositoryMeilleures Pratiques de Sécurité AWShttps://www.nist.gov/cyberframework[CIS AWS Foundations Benchmark](