Saltar a contenido

CloudMapper hoja de trucos

Overview

CloudMapper is a powerful open-source tool designed to help security professionals and cloud administrators visualize and analyze Amazon Web servicios (AWS) environments for security assessment and compliance purposes. Developed by Duo Security (now part of Cisco), CloudMapper provides comprehensive capabilities for mapping AWS infrastructure, identifying security misconfiguracións, and generating detailed repuertos about cloud resource relationships and potential security risks. The tool excels at creating visual network diagrams that illustrate the complex relationships between AWS servicios, making it easier to understand attack paths, data flows, and security boundaries within cloud environments.

The core functionality of CloudMapper revolves around its ability to collect detailed information about AWS resources through the AWS API and transform this data into meaningful visualizations and security assessments. The tool can map Virtual Private Clouds (VPCs), subnets, security groups, EC2 instances, load balancers, databases, and numerous other AWS servicios, providing a comprehensive view of the cloud infrastructure. CloudMapper's strength lies in its ability to identify potential security issues such as overly permissive security groups, publicly accessible resources, unencrypted data stores, and complex network configuracións that might introduce security vulnerabilities.

CloudMapper suppuertos multiple output formats including interactive HTML repuertos, JSON data expuertos, and graphical network diagrams, making it suitable for both technical analysis and executive repuertoing. The tool's modular architecture allows for customization and extension, while its comando-line interface enables integration into automated security assessment workflows and continuous monitoring pipelines. With its focus on AWS security best practices and compliance frameworks, CloudMapper has become an essential tool for Seguridad en la Nube teams, penetration testers, and compliance auditors working with AWS environments.

instalación

Python Package instalación

Installing CloudMapper using pip:

# Install CloudMapper
pip install cloudmapper

# Alternative: Install from source
git clone https://github.com/duo-labs/cloudmapper.git
cd cloudmapper
pip install -r requirements.txt
python setup.py install

# Verify instalación
cloudmapper --help

# Install additional dependencies for visualization
pip install graphviz
pip install pydot

# Install AWS CLI (required for autenticación)
pip install awscli

# Configure AWS credenciales
aws configure

Docker instalación

# Create CloudMapper Docker environment
cat > Dockerfile << 'EOF'
FROM python:3.8-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    git \
    graphviz \
    && rm -rf /var/lib/apt/lists/*

# Install CloudMapper
RUN pip install cloudmapper

# Create working directory
WORKDIR /cloudmapper

# Copy AWS credenciales (mount at runtime)
VOLUME ["/root/.aws"]

CMD ["cloudmapper", "--help"]
EOF

# Build container
docker build -t cloudmapper .

# Run CloudMapper
docker run -it -v ~/.aws:/root/.aws -v $(pwd)/output:/cloudmapper/output cloudmapper

# ejemplo uso
docker run -it -v ~/.aws:/root/.aws cloudmapper cloudmapper collect --account-name myaccount

Virtual Environment Setup

# Create virtual environment
python3 -m venv cloudmapper-env
source cloudmapper-env/bin/activate

# Install CloudMapper
pip install cloudmapper

# Install additional tools
pip install boto3 botocore

# Create configuración directory
mkdir -p ~/.cloudmapper

# Verify setup
cloudmapper --version

AWS Permissions Setup

# Create IAM policy for CloudMapper
cat > cloudmapper-policy.json << 'EOF'
\\\\{
    "Version": "2012-10-17",
    "Statement": [
        \\\\{
            "Effect": "Allow",
            "Action": [
                "ec2:Describe*",
                "elasticloadbalancing:Describe*",
                "rds:Describe*",
                "s3:GetBucketLocation",
                "s3:GetBucketLogging",
                "s3:GetBucketPolicy",
                "s3:GetBucketVersioning",
                "s3:GetBucketWebsite",
                "s3:GetBucketAcl",
                "s3:ListAllMyBuckets",
                "cloudfront:ListDistributions",
                "cloudfront:GetDistribution",
                "route53:ListhostedZones",
                "route53:ListResourceRecordSets",
                "iam:GetAccountSummary",
                "iam:ListUsers",
                "iam:ListRoles",
                "iam:ListGroups",
                "iam:ListPolicies",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListRolePolicies",
                "iam:ListAttachedRolePolicies",
                "iam:GetPolicy",
                "iam:GetPolicyVersion",
                "lambda:ListFunctions",
                "lambda:GetFunction",
                "apigateway:GET"
            ],
            "Resource": "*"
        \\\\}
    ]
\\\\}
EOF

# Create IAM user and attach policy
aws iam create-user --user-name cloudmapper-user
aws iam put-user-policy --user-name cloudmapper-user --policy-name CloudMapperPolicy --policy-document file://cloudmapper-policy.json

# Create access claves
aws iam create-access-clave --user-name cloudmapper-user

Basic uso

Account configuración

Setting up AWS accounts for CloudMapper:

# Configure AWS account
cloudmapper configure add-account --config-file config.json --name myaccount --id 123456789012

# Alternative: Manual configuración
cat > config.json << 'EOF'
\\\\{
    "accounts": [
        \\\\{
            "id": "123456789012",
            "name": "production",
            "default": true
        \\\\},
        \\\\{
            "id": "123456789013",
            "name": "staging",
            "default": false
        \\\\}
    ]
\\\\}
EOF

# List configured accounts
cloudmapper configure list-accounts --config config.json

# Set default account
cloudmapper configure set-default --config config.json --name production

Data Collection

Collecting AWS environment data:

# Collect data for default account
cloudmapper collect --config config.json

# Collect data for specific account
cloudmapper collect --config config.json --account-name production

# Collect data for specific regions
cloudmapper collect --config config.json --regions us-east-1,us-west-2

# Collect with specific profile
cloudmapper collect --config config.json --profile myprofile

# Collect with custom output directory
cloudmapper collect --config config.json --output-dir ./aws-data

# Verbose collection
cloudmapper collect --config config.json --verbose

# Collect specific servicios only
cloudmapper collect --config config.json --servicios ec2,s3,iam

Basic Visualization

Creating basic network visualizations:

# Generate network diagram
cloudmapper visualize --config config.json --account-name production

# Generate diagram for specific regions
cloudmapper visualize --config config.json --regions us-east-1

# Generate simplified diagram
cloudmapper visualize --config config.json --simplified

# Generate diagram with custom output
cloudmapper visualize --config config.json --output-file network-diagram.html

# Generate PNG diagram
cloudmapper visualize --config config.json --output-format png

# Generate with custom styling
cloudmapper visualize --config config.json --style-file custom-style.css

Security Analysis

Running security assessments:

# Run security audit
cloudmapper audit --config config.json --account-name production

# Run specific audit checks
cloudmapper audit --config config.json --checks security_groups,s3_buckets

# Generate audit repuerto
cloudmapper audit --config config.json --output-file security-audit.json

# Run compliance checks
cloudmapper audit --config config.json --compliance-framework cis

# Audit with custom severity
cloudmapper audit --config config.json --min-severity medium

# Expuerto audit results
cloudmapper audit --config config.json --format csv --output-file audit-results.csv

Advanced Features

Custom Visualization

Creating advanced network visualizations:

# Generate detailed network map
cloudmapper visualize --config config.json \
    --account-name production \
    --regions us-east-1,us-west-2 \
    --include-servicios ec2,rds,elb,s3 \
    --output-file detailed-network.html

# Create VPC-focused diagram
cloudmapper visualize --config config.json \
    --vpc-only \
    --show-security-groups \
    --show-nacls \
    --output-file vpc-diagram.html

# Generate public resource map
cloudmapper public --config config.json \
    --account-name production \
    --output-file public-resources.html

# Create cross-region visualization
cloudmapper visualize --config config.json \
    --all-regions \
    --cross-region-conexións \
    --output-file global-network.html

# Generate servicio dependency map
cloudmapper dependencies --config config.json \
    --servicio ec2 \
    --output-file ec2-dependencies.html

Security Assessment

Comprehensive security analysis:

# Run comprehensive security audit
cloudmapper audit --config config.json \
    --account-name production \
    --all-checks \
    --output-file comprehensive-audit.json

# Check for public resources
cloudmapper public --config config.json \
    --account-name production \
    --servicios s3,ec2,rds,elb \
    --output-file public-exposure.json

# Analyze security groups
cloudmapper sg-audit --config config.json \
    --account-name production \
    --check-unused \
    --check-overpermissive \
    --output-file sg-audit.json

# Check IAM permissions
cloudmapper iam-audit --config config.json \
    --account-name production \
    --check-admin-users \
    --check-unused-roles \
    --output-file iam-audit.json

# Analyze network ACLs
cloudmapper nacl-audit --config config.json \
    --account-name production \
    --check-default-allow \
    --output-file nacl-audit.json

Multi-Account Analysis

Analyzing multiple AWS accounts:

# Configure multiple accounts
cat > multi-account-config.json << 'EOF'
\\\\{
    "accounts": [
        \\\\{
            "id": "111111111111",
            "name": "production",
            "profile": "prod-profile"
        \\\\},
        \\\\{
            "id": "222222222222",
            "name": "staging",
            "profile": "staging-profile"
        \\\\},
        \\\\{
            "id": "333333333333",
            "name": "development",
            "profile": "dev-profile"
        \\\\}
    ]
\\\\}
EOF

# Collect data from all accounts
for account in production staging development; do
    cloudmapper collect --config multi-account-config.json --account-name $account
done

# Generate cross-account visualization
cloudmapper visualize --config multi-account-config.json \
    --all-accounts \
    --cross-account-conexións \
    --output-file cross-account-network.html

# Run security audit across all accounts
cloudmapper audit --config multi-account-config.json \
    --all-accounts \
    --output-file multi-account-audit.json

# Compare security posture across accounts
cloudmapper compare --config multi-account-config.json \
    --accounts production,staging,development \
    --output-file account-comparison.html

Custom Repuertoing

Creating custom repuertos and expuertos:

# Generate executive summary
cloudmapper repuerto --config config.json \
    --account-name production \
    --template executive \
    --output-file executive-summary.html

# Create technical repuerto
cloudmapper repuerto --config config.json \
    --account-name production \
    --template technical \
    --include-recommendations \
    --output-file technical-repuerto.html

# Expuerto raw data
cloudmapper expuerto --config config.json \
    --account-name production \
    --format json \
    --output-file raw-data.json

# Generate CSV expuertos
cloudmapper expuerto --config config.json \
    --account-name production \
    --format csv \
    --servicios ec2,s3,rds \
    --output-dir csv-expuertos/

# Create compliance repuerto
cloudmapper compliance --config config.json \
    --account-name production \
    --framework cis-aws \
    --output-file cis-compliance.html

Automation Scripts

Automated AWS Security Assessment

#!/usr/bin/env python3
# Automated AWS security assessment with CloudMapper

impuerto subproceso
impuerto json
impuerto os
impuerto boto3
from datetime impuerto datetime
impuerto logging

class CloudMapperAutomation:
    def __init__(self, config_file="config.json"):
        self.config_file = config_file
        self.output_dir = f"cloudmapper-output-\\\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\\\}"
        self.setup_logging()

        # Create output directory
        os.makedirs(self.output_dir, exist_ok=True)

    def setup_logging(self):
        """Setup logging configuración"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(f'\\\\{self.output_dir\\\\}/cloudmapper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def load_config(self):
        """Load CloudMapper configuración"""
        try:
            with open(self.config_file, 'r') as f:
                config = json.load(f)
            return config
        except FileNotFoundError:
            self.logger.error(f"configuración file \\\\{self.config_file\\\\} not found")
            return None
        except json.JSONDecodeError:
            self.logger.error(f"Invalid JSON in configuración file \\\\{self.config_file\\\\}")
            return None

    def run_cloudmapper_comando(self, comando):
        """Execute CloudMapper comando"""
        try:
            self.logger.info(f"Running comando: \\\\{' '.join(comando)\\\\}")
            result = subproceso.run(
                comando,
                capture_output=True,
                text=True,
                timeout=1800  # 30 minute timeout
            )

            if result.returncode == 0:
                self.logger.info("comando completed successfully")
                return True, result.stdout
            else:
                self.logger.error(f"comando failed: \\\\{result.stderr\\\\}")
                return False, result.stderr

        except subproceso.TimeoutExpired:
            self.logger.error("comando timed out")
            return False, "comando timed out"
        except Exception as e:
            self.logger.error(f"Error running comando: \\\\{e\\\\}")
            return False, str(e)

    def collect_account_data(self, account_name):
        """Collect data for specific account"""
        self.logger.info(f"Collecting data for account: \\\\{account_name\\\\}")

        comando = [
            "cloudmapper", "collect",
            "--config", self.config_file,
            "--account-name", account_name
        ]

        success, output = self.run_cloudmapper_comando(comando)

        if success:
            self.logger.info(f"Data collection completed for \\\\{account_name\\\\}")
        else:
            self.logger.error(f"Data collection failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

        return success

    def generate_visualization(self, account_name):
        """Generate network visualization"""
        self.logger.info(f"Generating visualization for account: \\\\{account_name\\\\}")

        output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-network.html")

        comando = [
            "cloudmapper", "visualize",
            "--config", self.config_file,
            "--account-name", account_name,
            "--output-file", output_file
        ]

        success, output = self.run_cloudmapper_comando(comando)

        if success:
            self.logger.info(f"Visualization generated: \\\\{output_file\\\\}")
        else:
            self.logger.error(f"Visualization failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

        return success, output_file if success else None

    def run_security_audit(self, account_name):
        """Run security audit"""
        self.logger.info(f"Running security audit for account: \\\\{account_name\\\\}")

        output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-audit.json")

        comando = [
            "cloudmapper", "audit",
            "--config", self.config_file,
            "--account-name", account_name,
            "--output-file", output_file
        ]

        success, output = self.run_cloudmapper_comando(comando)

        if success:
            self.logger.info(f"Security audit completed: \\\\{output_file\\\\}")
            return success, output_file
        else:
            self.logger.error(f"Security audit failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
            return success, None

    def analyze_public_resources(self, account_name):
        """Analyze public resources"""
        self.logger.info(f"Analyzing public resources for account: \\\\{account_name\\\\}")

        output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-public.html")

        comando = [
            "cloudmapper", "public",
            "--config", self.config_file,
            "--account-name", account_name,
            "--output-file", output_file
        ]

        success, output = self.run_cloudmapper_comando(comando)

        if success:
            self.logger.info(f"Public resources analysis completed: \\\\{output_file\\\\}")
        else:
            self.logger.error(f"Public resources analysis failed for \\\\{account_name\\\\}: \\\\{output\\\\}")

        return success, output_file if success else None

    def generate_repuertos(self, account_name):
        """Generate comprehensive repuertos"""
        self.logger.info(f"Generating repuertos for account: \\\\{account_name\\\\}")

        repuertos = \\\\{\\\\}

        # Executive summary
        exec_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-executive.html")
        comando = [
            "cloudmapper", "repuerto",
            "--config", self.config_file,
            "--account-name", account_name,
            "--template", "executive",
            "--output-file", exec_output
        ]

        success, _ = self.run_cloudmapper_comando(comando)
        if success:
            repuertos["executive"] = exec_output

        # Technical repuerto
        tech_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-technical.html")
        comando = [
            "cloudmapper", "repuerto",
            "--config", self.config_file,
            "--account-name", account_name,
            "--template", "technical",
            "--output-file", tech_output
        ]

        success, _ = self.run_cloudmapper_comando(comando)
        if success:
            repuertos["technical"] = tech_output

        return repuertos

    def analyze_account(self, account_name):
        """Complete analysis for single account"""
        self.logger.info(f"Starting complete analysis for account: \\\\{account_name\\\\}")

        results = \\\\{
            "account_name": account_name,
            "timestamp": datetime.now().isoformat(),
            "success": True,
            "outputs": \\\\{\\\\}
        \\\\}

        # Collect data
        if not self.collect_account_data(account_name):
            results["success"] = False
            return results

        # Generate visualization
        success, viz_file = self.generate_visualization(account_name)
        if success:
            results["outputs"]["visualization"] = viz_file

        # Run security audit
        success, audit_file = self.run_security_audit(account_name)
        if success:
            results["outputs"]["audit"] = audit_file

        # Analyze public resources
        success, public_file = self.analyze_public_resources(account_name)
        if success:
            results["outputs"]["public_analysis"] = public_file

        # Generate repuertos
        repuertos = self.generate_repuertos(account_name)
        results["outputs"]["repuertos"] = repuertos

        self.logger.info(f"Analysis completed for account: \\\\{account_name\\\\}")
        return results

    def run_multi_account_analysis(self):
        """Run analysis across all configured accounts"""
        config = self.load_config()
        if not config:
            return None

        accounts = config.get("accounts", [])
        if not accounts:
            self.logger.error("No accounts configured")
            return None

        self.logger.info(f"Starting multi-account analysis for \\\\{len(accounts)\\\\} accounts")

        all_results = []

        for account in accounts:
            account_name = account.get("name")
            if account_name:
                result = self.analyze_account(account_name)
                all_results.append(result)

        # Generate summary repuerto
        summary = self.generate_summary_repuerto(all_results)

        self.logger.info("Multi-account analysis completed")
        return all_results, summary

    def generate_summary_repuerto(self, results):
        """Generate summary repuerto for all accounts"""
        summary = \\\\{
            "analysis_summary": \\\\{
                "total_accounts": len(results),
                "successful_accounts": len([r for r in results if r["success"]]),
                "failed_accounts": len([r for r in results if not r["success"]]),
                "timestamp": datetime.now().isoformat()
            \\\\},
            "account_results": results
        \\\\}

        summary_file = os.path.join(self.output_dir, "analysis-summary.json")
        with open(summary_file, 'w') as f:
            json.dump(summary, f, indent=2)

        self.logger.info(f"Summary repuerto generated: \\\\{summary_file\\\\}")
        return summary

    def cleanup_old_data(self, days_old=7):
        """Clean up old CloudMapper data"""
        self.logger.info(f"Cleaning up data older than \\\\{days_old\\\\} days")

        # This would implement cleanup logic
        # for old CloudMapper data files
        pass

# uso
if __name__ == "__main__":
    automation = CloudMapperAutomation("config.json")
    results, summary = automation.run_multi_account_analysis()

    if results:
        print(f"Analysis completed for \\\\{len(results)\\\\} accounts")
        print(f"Results saved in: \\\\{automation.output_dir\\\\}")

Continuous Monitoring Script

#!/usr/bin/env python3
# Continuous AWS monitoring with CloudMapper

impuerto schedule
impuerto time
impuerto json
impuerto os
from datetime impuerto datetime, timedelta
impuerto smtplib
from email.mime.text impuerto MIMEText
from email.mime.multipart impuerto MIMEMultipart
impuerto boto3

class CloudMapperMonitoring:
    def __init__(self, config_file="monitoring-config.json"):
        self.config_file = config_file
        self.load_monitoring_config()
        self.last_results = \\\\{\\\\}

    def load_monitoring_config(self):
        """Load monitoring configuración"""
        with open(self.config_file, 'r') as f:
            self.config = json.load(f)

        self.accounts = self.config.get("accounts", [])
        self.monitoring_settings = self.config.get("monitoring", \\\\{\\\\})
        self.notification_settings = self.config.get("notifications", \\\\{\\\\})

    def run_scheduled_assessment(self):
        """Run scheduled security assessment"""
        print(f"Running scheduled assessment at \\\\{datetime.now()\\\\}")

        current_results = \\\\{\\\\}

        for account in self.accounts:
            account_name = account.get("name")

            # Run CloudMapper audit
            audit_result = self.run_audit(account_name)
            current_results[account_name] = audit_result

            # Compare with previous results
            if account_name in self.last_results:
                changes = self.compare_results(
                    self.last_results[account_name],
                    audit_result
                )

                if changes:
                    self.send_change_notification(account_name, changes)

        # Update last results
        self.last_results = current_results

        # Save results
        self.save_monitoring_results(current_results)

    def run_audit(self, account_name):
        """Run CloudMapper audit for account"""
        output_file = f"monitoring-audit-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\\\}.json"

        comando = [
            "cloudmapper", "audit",
            "--config", "config.json",
            "--account-name", account_name,
            "--output-file", output_file
        ]

        try:
            result = subproceso.run(comando, capture_output=True, text=True)

            if result.returncode == 0:
                with open(output_file, 'r') as f:
                    audit_data = json.load(f)

                # Clean up temporary file
                os.remove(output_file)

                return audit_data
            else:
                print(f"Audit failed for \\\\{account_name\\\\}: \\\\{result.stderr\\\\}")
                return None

        except Exception as e:
            print(f"Error running audit for \\\\{account_name\\\\}: \\\\{e\\\\}")
            return None

    def compare_results(self, previous, current):
        """Compare audit results to detect changes"""
        changes = []

        if not previous or not current:
            return changes

        # Compare security findings
        prev_findings = previous.get("findings", [])
        curr_findings = current.get("findings", [])

        # New findings
        prev_finding_ids = \\\\{f.get("id") for f in prev_findings\\\\}
        new_findings = [f for f in curr_findings if f.get("id") not in prev_finding_ids]

        if new_findings:
            changes.append(\\\\{
                "type": "new_findings",
                "count": len(new_findings),
                "findings": new_findings
            \\\\})

        # Resolved findings
        curr_finding_ids = \\\\{f.get("id") for f in curr_findings\\\\}
        resolved_findings = [f for f in prev_findings if f.get("id") not in curr_finding_ids]

        if resolved_findings:
            changes.append(\\\\{
                "type": "resolved_findings",
                "count": len(resolved_findings),
                "findings": resolved_findings
            \\\\})

        return changes

    def send_change_notification(self, account_name, changes):
        """Send notification about changes"""
        if not self.notification_settings.get("enabled", False):
            return

        subject = f"CloudMapper Alert: Changes detected in \\\\{account_name\\\\}"

        body = f"Changes detected in AWS account \\\\{account_name\\\\}:\n\n"

        for change in changes:
            change_type = change["type"]
            count = change["count"]

            if change_type == "new_findings":
                body += f"New security findings: \\\\{count\\\\}\n"
                for finding in change["findings"][:5]:  # Limit to first 5
                    body += f"  - \\\\{finding.get('title', 'Unknown')\\\\}\n"

            elif change_type == "resolved_findings":
                body += f"Resolved security findings: \\\\{count\\\\}\n"

        self.send_email(subject, body)

    def send_email(self, subject, body):
        """Send email notification"""
        email_config = self.notification_settings.get("email", \\\\{\\\\})

        if not email_config.get("enabled", False):
            return

        try:
            msg = MIMEMultipart()
            msg['From'] = email_config["from"]
            msg['To'] = email_config["to"]
            msg['Subject'] = subject

            msg.attach(MIMEText(body, 'plain'))

            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_puerto"])
            server.starttls()
            server.login(email_config["nombre de usuario"], email_config["contraseña"])

            text = msg.as_string()
            server.sendmail(email_config["from"], email_config["to"], text)
            server.quit()

            print(f"Notification sent: \\\\{subject\\\\}")

        except Exception as e:
            print(f"Failed to send email notification: \\\\{e\\\\}")

    def save_monitoring_results(self, results):
        """Save monitoring results to file"""
        timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
        filename = f"monitoring-results-\\\\{timestamp\\\\}.json"

        with open(filename, 'w') as f:
            json.dump(\\\\{
                "timestamp": datetime.now().isoformat(),
                "results": results
            \\\\}, f, indent=2)

    def start_monitoring(self):
        """Start continuous monitoring"""
        print("Starting CloudMapper continuous monitoring...")

        # Schedule assessments
        interval = self.monitoring_settings.get("interval_hours", 24)
        schedule.every(interval).hours.do(self.run_scheduled_assessment)

        # Run initial assessment
        self.run_scheduled_assessment()

        # Keep running
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute

# uso
if __name__ == "__main__":
    monitoring = CloudMapperMonitoring()
    monitoring.start_monitoring()

Compliance Repuertoing Script

#!/usr/bin/env python3
# CloudMapper compliance repuertoing

impuerto json
impuerto subproceso
from datetime impuerto datetime
impuerto pandas as pd

class CloudMapperCompliance:
    def __init__(self, config_file="config.json"):
        self.config_file = config_file
        self.compliance_frameworks = \\\\{
            "cis": "CIS AWS Foundations Benchmark",
            "nist": "NIST Cybersecurity Framework",
            "pci": "PCI DSS",
            "sox": "Sarbanes-Oxley Act",
            "hipaa": "HIPAA Security Rule"
        \\\\}

    def run_compliance_audit(self, account_name, framework="cis"):
        """Run compliance audit for specific framework"""
        output_file = f"compliance-\\\\{framework\\\\}-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"

        comando = [
            "cloudmapper", "audit",
            "--config", self.config_file,
            "--account-name", account_name,
            "--compliance-framework", framework,
            "--output-file", output_file
        ]

        try:
            result = subproceso.run(comando, capture_output=True, text=True)

            if result.returncode == 0:
                with open(output_file, 'r') as f:
                    compliance_data = json.load(f)
                return compliance_data
            else:
                print(f"Compliance audit failed: \\\\{result.stderr\\\\}")
                return None

        except Exception as e:
            print(f"Error running compliance audit: \\\\{e\\\\}")
            return None

    def generate_compliance_repuerto(self, account_name, frameworks=None):
        """Generate comprehensive compliance repuerto"""
        if frameworks is None:
            frameworks = ["cis", "nist"]

        repuerto_data = \\\\{
            "account_name": account_name,
            "repuerto_date": datetime.now().isoformat(),
            "frameworks": \\\\{\\\\}
        \\\\}

        for framework in frameworks:
            print(f"Running \\\\{framework.upper()\\\\} compliance audit...")

            audit_result = self.run_compliance_audit(account_name, framework)

            if audit_result:
                compliance_score = self.calculate_compliance_score(audit_result)

                repuerto_data["frameworks"][framework] = \\\\{
                    "name": self.compliance_frameworks.get(framework, framework),
                    "score": compliance_score,
                    "findings": audit_result.get("findings", []),
                    "recommendations": audit_result.get("recommendations", [])
                \\\\}

        # Generate HTML repuerto
        html_repuerto = self.generate_html_compliance_repuerto(repuerto_data)

        # Save repuertos
        json_file = f"compliance-repuerto-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"
        html_file = f"compliance-repuerto-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.html"

        with open(json_file, 'w') as f:
            json.dump(repuerto_data, f, indent=2)

        with open(html_file, 'w') as f:
            f.write(html_repuerto)

        print(f"Compliance repuerto generated: \\\\{html_file\\\\}")
        return repuerto_data

    def calculate_compliance_score(self, audit_result):
        """Calculate compliance score from audit results"""
        findings = audit_result.get("findings", [])

        if not findings:
            return 100.0

        total_checks = len(findings)
        passed_checks = len([f for f in findings if f.get("status") == "PASS"])

        score = (passed_checks / total_checks) * 100
        return round(score, 2)

    def generate_html_compliance_repuerto(self, repuerto_data):
        """Generate HTML compliance repuerto"""
        html_template = """
<!DOCTYPE html>
<html>
<head>
    <title>CloudMapper Compliance Repuerto</title>
    <style>
        body \\\\{ font-family: Arial, sans-serif; margin: 20px; \\\\}
        .header \\\\{ background-color: #f0f0f0; padding: 20px; \\\\}
        .framework \\\\{ margin: 20px 0; padding: 15px; border: 1px solid #ddd; \\\\}
        .score \\\\{ font-size: 24px; font-weight: bold; \\\\}
        .pass \\\\{ color: green; \\\\}
        .fail \\\\{ color: red; \\\\}
        .finding \\\\{ margin: 10px 0; padding: 10px; background-color: #f9f9f9; \\\\}
    </style>
</head>
<body>
    <div class="header">
        <h1>Compliance Repuerto</h1>
        <p>Account: \\\\{account_name\\\\}</p>
        <p>Repuerto Date: \\\\{repuerto_date\\\\}</p>
    </div>

    \\\\{frameworks_html\\\\}
</body>
</html>
        """

        frameworks_html = ""

        for framework_id, framework_data in repuerto_data["frameworks"].items():
            score = framework_data["score"]
            score_class = "pass" if score >= 80 else "fail"

            findings_html = ""
            for finding in framework_data["findings"][:10]:  # Limit to first 10
                status = finding.get("status", "UNKNOWN")
                status_class = "pass" if status == "PASS" else "fail"

                findings_html += f"""
                <div class="finding">
                    <strong class="\\\\{status_class\\\\}">\\\\{status\\\\}</strong>: \\\\{finding.get("title", "Unknown")\\\\}
                    <br><small>\\\\{finding.get("Descripción", "")\\\\}</small>
                </div>
                """

            frameworks_html += f"""
            <div class="framework">
                <h2>\\\\{framework_data["name"]\\\\}</h2>
                <div class="score \\\\{score_class\\\\}">Compliance Score: \\\\{score\\\\}%</div>
                <h3>clave Findings:</h3>
                \\\\{findings_html\\\\}
            </div>
            """

        return html_template.format(
            account_name=repuerto_data["account_name"],
            repuerto_date=repuerto_data["repuerto_date"],
            frameworks_html=frameworks_html
        )

# uso
if __name__ == "__main__":
    compliance = CloudMapperCompliance()
    repuerto = compliance.generate_compliance_repuerto("production", ["cis", "nist"])

Integration ejemplos

SIEM Integration

#!/usr/bin/env python3
# CloudMapper SIEM integration

impuerto json
impuerto requests
from datetime impuerto datetime

class CloudMapperSIEMIntegration:
    def __init__(self, siem_config):
        self.siem_config = siem_config
        self.siem_type = siem_config.get("type", "splunk")

    def send_to_splunk(self, data):
        """Send CloudMapper data to Splunk"""
        splunk_config = self.siem_config.get("splunk", \\\\{\\\\})

        headers = \\\\{
            "autorización": f"Splunk \\\\{splunk_config['token']\\\\}",
            "Content-Type": "application/json"
        \\\\}

        # Format data for Splunk
        events = []
        for finding in data.get("findings", []):
            event = \\\\{
                "time": datetime.now().timestamp(),
                "source": "cloudmapper",
                "sourcetype": "aws:security:finding",
                "event": finding
            \\\\}
            events.append(event)

        # Send to Splunk HEC
        response = requests.post(
            f"\\\\{splunk_config['url']\\\\}/servicios/collector/event",
            headers=headers,
            json=\\\\{"event": events\\\\}
        )

        return response.status_code == 200

    def send_to_elasticsearch(self, data):
        """Send CloudMapper data to Elasticsearch"""
        es_config = self.siem_config.get("elasticsearch", \\\\{\\\\})

        # Format data for Elasticsearch
        for finding in data.get("findings", []):
            doc = \\\\{
                "timestamp": datetime.now().isoformat(),
                "source": "cloudmapper",
                "finding": finding
            \\\\}

            # Index document
            response = requests.post(
                f"\\\\{es_config['url']\\\\}/cloudmapper-findings/_doc",
                auth=(es_config['nombre de usuario'], es_config['contraseña']),
                json=doc
            )

    def send_findings(self, cloudmapper_data):
        """Send CloudMapper findings to SIEM"""
        if self.siem_type == "splunk":
            return self.send_to_splunk(cloudmapper_data)
        elif self.siem_type == "elasticsearch":
            return self.send_to_elasticsearch(cloudmapper_data)
        else:
            print(f"Unsuppuertoed SIEM type: \\\\{self.siem_type\\\\}")
            return False

# uso
siem_config = \\\\{
    "type": "splunk",
    "splunk": \\\\{
        "url": "https://splunk.ejemplo.com:8088",
        "token": "your-hec-token"
    \\\\}
\\\\}

siem_integration = CloudMapperSIEMIntegration(siem_config)

Terraform Integration

#!/usr/bin/env python3
# CloudMapper and Terraform integration

impuerto json
impuerto subproceso
impuerto os

class CloudMapperTerraformIntegration:
    def __init__(self, terraform_dir):
        self.terraform_dir = terraform_dir

    def analyze_terraform_plan(self, plan_file):
        """Analyze Terraform plan with CloudMapper"""

        # Parse Terraform plan
        with open(plan_file, 'r') as f:
            plan_data = json.load(f)

        # Extract AWS resources
        aws_resources = self.extract_aws_resources(plan_data)

        # Generate CloudMapper configuración
        config = self.generate_cloudmapper_config(aws_resources)

        return config

    def extract_aws_resources(self, plan_data):
        """Extract AWS resources from Terraform plan"""
        aws_resources = []

        for resource_change in plan_data.get("resource_changes", []):
            resource_type = resource_change.get("type", "")

            if resource_type.startswith("aws_"):
                aws_resources.append(\\\\{
                    "type": resource_type,
                    "name": resource_change.get("name", ""),
                    "change": resource_change.get("change", \\\\{\\\\}),
                    "values": resource_change.get("change", \\\\{\\\\}).get("after", \\\\{\\\\})
                \\\\})

        return aws_resources

    def generate_cloudmapper_config(self, aws_resources):
        """Generate CloudMapper configuración from Terraform resources"""

        # This would generate appropriate CloudMapper configuración
        # based on the Terraform resources

        config = \\\\{
            "accounts": [
                \\\\{
                    "id": "123456789012",  # Would be extracted from Terraform
                    "name": "terraform-managed"
                \\\\}
            ]
        \\\\}

        return config

# uso
terraform_integration = CloudMapperTerraformIntegration("/path/to/terraform")

solución de problemas

Common Issues

autenticación Problems:

# Check AWS credenciales
aws sts get-caller-identity

# Verify IAM permissions
aws iam simulate-principal-policy \
    --policy-source-arn arn:aws:iam::123456789012:user/cloudmapper-user \
    --action-names ec2:DescribeInstances \
    --resource-arns "*"

# Test specific servicio access
aws ec2 describe-instances --region us-east-1
aws s3 ls

# Check CloudMapper configuración
cloudmapper configure list-accounts --config config.json

Data Collection Issues:

# Enable verbose logging
cloudmapper collect --config config.json --verbose

# Test specific regions
cloudmapper collect --config config.json --regions us-east-1

# Check for rate limiting
cloudmapper collect --config config.json --rate-limit 10

# Verify account access
aws organizations describe-account --account-id 123456789012

Visualization Problems:

# Check dependencies
pip install graphviz pydot

# Install system graphviz
sudo apt install graphviz  # Ubuntu/Debian
brew install graphviz      # macOS

# Generate simplified diagram
cloudmapper visualize --config config.json --simplified

# Check output directory permissions
ls -la output/
chmod 755 output/

Performance Optimization

Optimizing CloudMapper performance:

# Use specific regions
cloudmapper collect --config config.json --regions us-east-1,us-west-2

# Limit servicios
cloudmapper collect --config config.json --servicios ec2,s3,iam

# Parallel collection
cloudmapper collect --config config.json --hilos 4

# Use caching
cloudmapper collect --config config.json --cache-dir ./cache

# Optimize memory uso
expuerto PYTHONhashSEED=0
ulimit -v 4194304  # Limit virtual memory

Security Considerations

Data Protection

Sensitive Data Handling: - CloudMapper collects detailed AWS configuración data - Store output files securely with appropriate Control de Accesos - Encrypt sensitive repuertos and audit results - Implement data retention policies for collected information - Use secure channels for transmitting repuertos

Control de Accesos: - Limit CloudMapper IAM permissions to minimum required - Use temporary credenciales when possible - Implement multi-factor autenticación for AWS access - Regular rotation of access claves and credenciales - Monitor CloudMapper uso through CloudTrail

Operational Security

Safe uso Practices: - Run CloudMapper from secure, monitored environments - Validate configuración files before execution - Regular updates to CloudMapper and dependencies - Monitor for unauthorized uso or configuración changes - Implement logging and auditing of CloudMapper activities

Network Security: - Use VPN or secure networks for CloudMapper execution - Implement network segmentation for analysis environments - Monitor network traffic during data collection - Use secure protocolos for all communications - Regular security assessments of CloudMapper infrastructure

referencias

  1. CloudMapper GitHub Repository
  2. AWS Security Best Practices
  3. CIS AWS Foundations Benchmark
  4. AWS Well-Architected Security Pillar
  5. NIST Cybersecurity Framework