Über uns Cheat Sheet
Überblick
CloudMapper ist ein leistungsstarkes Open-Source-Tool, um Sicherheitsexperten und Cloud-Administratoren zu helfen, Amazon Web Services (AWS) Umgebungen für Sicherheitsbewertung und Compliance zu visualisieren und zu analysieren. Entwickelt von Duo Security (jetzt Teil von Cisco) bietet CloudMapper umfassende Fähigkeiten zur Kartierung von AWS-Infrastruktur, zur Erkennung von Sicherheitsfehlern und zur Erstellung detaillierter Berichte über Cloud-Ressourcenbeziehungen und potenzielle Sicherheitsrisiken. Das Tool zeichnet sich durch die Erstellung von visuellen Netzwerkdiagrammen aus, die die komplexen Zusammenhänge zwischen AWS-Diensten illustrieren, wodurch es einfacher wird, Angriffspfade, Datenflüsse und Sicherheitsgrenzen innerhalb von Cloud-Umgebungen zu verstehen.
Die Kernfunktionalität von CloudMapper dreht sich um seine Fähigkeit, detaillierte Informationen über AWS-Ressourcen über die AWS-API zu sammeln und diese Daten in sinnvolle Visualisierungen und Sicherheitsbewertungen zu transformieren. Das Tool kann Virtual Private Clouds (VPCs), Subnets, Sicherheitsgruppen, EC2-Instanzen, Load Balancer, Datenbanken und zahlreiche andere AWS-Dienste abbilden und bietet einen umfassenden Blick auf die Cloud-Infrastruktur. Die Stärke von CloudMapper liegt in der Fähigkeit, potenzielle Sicherheitsprobleme wie überfällige Sicherheitsgruppen, öffentlich zugängliche Ressourcen, unverschlüsselte Datenspeicher und komplexe Netzwerkkonfigurationen zu identifizieren, die Sicherheitslücken einführen könnten.
CloudMapper unterstützt mehrere Ausgabeformate einschließlich interaktiver HTML-Berichte, JSON-Datenexporte und grafischer Netzwerkdiagramme, wodurch es sowohl für die technische Analyse als auch für die Executive Reporting geeignet ist. Die modulare Architektur des Werkzeugs ermöglicht die Anpassung und Erweiterung, während seine Kommandozeilenschnittstelle die Integration in automatisierte Sicherheitsbewertungsabläufe und kontinuierliche Monitoring-Pipelines ermöglicht. Mit dem Fokus auf AWS Security Best Practices und Compliance Frameworks ist CloudMapper zu einem unverzichtbaren Werkzeug für Cloud Security Teams, Penetration Tester und Compliance Auditoren, die mit AWS-Umgebungen arbeiten.
Installation
Python Paket Installation
Cloud installieren Mapper mit pip:
```bash
Install CloudMapper
pip install cloudmapper
Alternative: Install from source
git clone https://github.com/duo-labs/cloudmapper.git cd cloudmapper pip install -r requirements.txt python setup.py install
Verify installation
cloudmapper --help
Install additional dependencies for visualization
pip install graphviz pip install pydot
Install AWS CLI (required for authentication)
pip install awscli
Configure AWS credentials
aws configure ```_
Docker Installation
```bash
Create CloudMapper Docker environment
cat > Dockerfile << 'EOF' FROM python:3.8-slim
Install system dependencies
RUN apt-get update && apt-get install -y \ git \ graphviz \ && rm -rf /var/lib/apt/lists/*
Install CloudMapper
RUN pip install cloudmapper
Create working directory
WORKDIR /cloudmapper
Copy AWS credentials (mount at runtime)
VOLUME ["/root/.aws"]
CMD ["cloudmapper", "--help"] EOF
Build container
docker build -t cloudmapper .
Run CloudMapper
docker run -it -v ~/.aws:/root/.aws -v $(pwd)/output:/cloudmapper/output cloudmapper
Example usage
docker run -it -v ~/.aws:/root/.aws cloudmapper cloudmapper collect --account-name myaccount ```_
Virtual Environment Setup
```bash
Create virtual environment
python3 -m venv cloudmapper-env source cloudmapper-env/bin/activate
Install CloudMapper
pip install cloudmapper
Install additional tools
pip install boto3 botocore
Create configuration directory
mkdir -p ~/.cloudmapper
Verify setup
cloudmapper --version ```_
AWS Berechtigungen Setup
```bash
Create IAM policy for CloudMapper
cat > cloudmapper-policy.json << 'EOF' \\{ "Version": "2012-10-17", "Statement": [ \\{ "Effect": "Allow", "Action": [ "ec2:Describe", "elasticloadbalancing:Describe", "rds:Describe", "s3:GetBucketLocation", "s3:GetBucketLogging", "s3:GetBucketPolicy", "s3:GetBucketVersioning", "s3:GetBucketWebsite", "s3:GetBucketAcl", "s3:ListAllMyBuckets", "cloudfront:ListDistributions", "cloudfront:GetDistribution", "route53:ListHostedZones", "route53:ListResourceRecordSets", "iam:GetAccountSummary", "iam:ListUsers", "iam:ListRoles", "iam:ListGroups", "iam:ListPolicies", "iam:GetRole", "iam:GetRolePolicy", "iam:ListRolePolicies", "iam:ListAttachedRolePolicies", "iam:GetPolicy", "iam:GetPolicyVersion", "lambda:ListFunctions", "lambda:GetFunction", "apigateway:GET" ], "Resource": "" \\} ] \\} EOF
Create IAM user and attach policy
aws iam create-user --user-name cloudmapper-user aws iam put-user-policy --user-name cloudmapper-user --policy-name CloudMapperPolicy --policy-document file://cloudmapper-policy.json
Create access keys
aws iam create-access-key --user-name cloudmapper-user ```_
Basisnutzung
Konfigurieren des Kontos
AWS-Konten für CloudMapper einrichten:
```bash
Configure AWS account
cloudmapper configure add-account --config-file config.json --name myaccount --id 123456789012
Alternative: Manual configuration
cat > config.json << 'EOF' \\{ "accounts": [ \\{ "id": "123456789012", "name": "production", "default": true \\}, \\{ "id": "123456789013", "name": "staging", "default": false \\} ] \\} EOF
List configured accounts
cloudmapper configure list-accounts --config config.json
Set default account
cloudmapper configure set-default --config config.json --name production ```_
Datenerhebung
Erfassung von AWS-Umgebungsdaten:
```bash
Collect data for default account
cloudmapper collect --config config.json
Collect data for specific account
cloudmapper collect --config config.json --account-name production
Collect data for specific regions
cloudmapper collect --config config.json --regions us-east-1,us-west-2
Collect with specific profile
cloudmapper collect --config config.json --profile myprofile
Collect with custom output directory
cloudmapper collect --config config.json --output-dir ./aws-data
Verbose collection
cloudmapper collect --config config.json --verbose
Collect specific services only
cloudmapper collect --config config.json --services ec2,s3,iam ```_
Basisvisualisierung
Erstellen grundlegende Netzwerkvisualisierungen:
```bash
Generate network diagram
cloudmapper visualize --config config.json --account-name production
Generate diagram for specific regions
cloudmapper visualize --config config.json --regions us-east-1
Generate simplified diagram
cloudmapper visualize --config config.json --simplified
Generate diagram with custom output
cloudmapper visualize --config config.json --output-file network-diagram.html
Generate PNG diagram
cloudmapper visualize --config config.json --output-format png
Generate with custom styling
cloudmapper visualize --config config.json --style-file custom-style.css ```_
Sicherheitsanalyse
Laufende Sicherheitsbewertungen:
```bash
Run security audit
cloudmapper audit --config config.json --account-name production
Run specific audit checks
cloudmapper audit --config config.json --checks security_groups,s3_buckets
Generate audit report
cloudmapper audit --config config.json --output-file security-audit.json
Run compliance checks
cloudmapper audit --config config.json --compliance-framework cis
Audit with custom severity
cloudmapper audit --config config.json --min-severity medium
Export audit results
cloudmapper audit --config config.json --format csv --output-file audit-results.csv ```_
Erweiterte Funktionen
Individuelle Visualisierung
Erstellung fortschrittlicher Netzwerkvisualisierungen:
```bash
Generate detailed network map
cloudmapper visualize --config config.json \ --account-name production \ --regions us-east-1,us-west-2 \ --include-services ec2,rds,elb,s3 \ --output-file detailed-network.html
Create VPC-focused diagram
cloudmapper visualize --config config.json \ --vpc-only \ --show-security-groups \ --show-nacls \ --output-file vpc-diagram.html
Generate public resource map
cloudmapper public --config config.json \ --account-name production \ --output-file public-resources.html
Create cross-region visualization
cloudmapper visualize --config config.json \ --all-regions \ --cross-region-connections \ --output-file global-network.html
Generate service dependency map
cloudmapper dependencies --config config.json \ --service ec2 \ --output-file ec2-dependencies.html ```_
Sicherheitsbewertung
Umfassende Sicherheitsanalyse:
```bash
Run comprehensive security audit
cloudmapper audit --config config.json \ --account-name production \ --all-checks \ --output-file comprehensive-audit.json
Check for public resources
cloudmapper public --config config.json \ --account-name production \ --services s3,ec2,rds,elb \ --output-file public-exposure.json
Analyze security groups
cloudmapper sg-audit --config config.json \ --account-name production \ --check-unused \ --check-overpermissive \ --output-file sg-audit.json
Check IAM permissions
cloudmapper iam-audit --config config.json \ --account-name production \ --check-admin-users \ --check-unused-roles \ --output-file iam-audit.json
Analyze network ACLs
cloudmapper nacl-audit --config config.json \ --account-name production \ --check-default-allow \ --output-file nacl-audit.json ```_
Multi-Account Analyse
Analyse mehrerer AWS-Konten:
```bash
Configure multiple accounts
cat > multi-account-config.json << 'EOF' \\{ "accounts": [ \\{ "id": "111111111111", "name": "production", "profile": "prod-profile" \\}, \\{ "id": "222222222222", "name": "staging", "profile": "staging-profile" \\}, \\{ "id": "333333333333", "name": "development", "profile": "dev-profile" \\} ] \\} EOF
Collect data from all accounts
for account in production staging development; do cloudmapper collect --config multi-account-config.json --account-name $account done
Generate cross-account visualization
cloudmapper visualize --config multi-account-config.json \ --all-accounts \ --cross-account-connections \ --output-file cross-account-network.html
Run security audit across all accounts
cloudmapper audit --config multi-account-config.json \ --all-accounts \ --output-file multi-account-audit.json
Compare security posture across accounts
cloudmapper compare --config multi-account-config.json \ --accounts production,staging,development \ --output-file account-comparison.html ```_
Berichterstattung
Benutzerdefinierte Berichte und Exporte erstellen:
```bash
Generate executive summary
cloudmapper report --config config.json \ --account-name production \ --template executive \ --output-file executive-summary.html
Create technical report
cloudmapper report --config config.json \ --account-name production \ --template technical \ --include-recommendations \ --output-file technical-report.html
Export raw data
cloudmapper export --config config.json \ --account-name production \ --format json \ --output-file raw-data.json
Generate CSV exports
cloudmapper export --config config.json \ --account-name production \ --format csv \ --services ec2,s3,rds \ --output-dir csv-exports/
Create compliance report
cloudmapper compliance --config config.json \ --account-name production \ --framework cis-aws \ --output-file cis-compliance.html ```_
Automatisierungsskripte
Automatisierte AWS Sicherheitsbewertung
```python
!/usr/bin/env python3
Automated AWS security assessment with CloudMapper
import subprocess import json import os import boto3 from datetime import datetime import logging
class CloudMapperAutomation: def init(self, config_file="config.json"): self.config_file = config_file self.output_dir = f"cloudmapper-output-\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\}" self.setup_logging()
# Create output directory
os.makedirs(self.output_dir, exist_ok=True)
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'\\\\{self.output_dir\\\\}/cloudmapper.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load CloudMapper configuration"""
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
return config
except FileNotFoundError:
self.logger.error(f"Configuration file \\\\{self.config_file\\\\} not found")
return None
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON in configuration file \\\\{self.config_file\\\\}")
return None
def run_cloudmapper_command(self, command):
"""Execute CloudMapper command"""
try:
self.logger.info(f"Running command: \\\\{' '.join(command)\\\\}")
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=1800 # 30 minute timeout
)
if result.returncode == 0:
self.logger.info("Command completed successfully")
return True, result.stdout
else:
self.logger.error(f"Command failed: \\\\{result.stderr\\\\}")
return False, result.stderr
except subprocess.TimeoutExpired:
self.logger.error("Command timed out")
return False, "Command timed out"
except Exception as e:
self.logger.error(f"Error running command: \\\\{e\\\\}")
return False, str(e)
def collect_account_data(self, account_name):
"""Collect data for specific account"""
self.logger.info(f"Collecting data for account: \\\\{account_name\\\\}")
command = [
"cloudmapper", "collect",
"--config", self.config_file,
"--account-name", account_name
]
success, output = self.run_cloudmapper_command(command)
if success:
self.logger.info(f"Data collection completed for \\\\{account_name\\\\}")
else:
self.logger.error(f"Data collection failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
return success
def generate_visualization(self, account_name):
"""Generate network visualization"""
self.logger.info(f"Generating visualization for account: \\\\{account_name\\\\}")
output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-network.html")
command = [
"cloudmapper", "visualize",
"--config", self.config_file,
"--account-name", account_name,
"--output-file", output_file
]
success, output = self.run_cloudmapper_command(command)
if success:
self.logger.info(f"Visualization generated: \\\\{output_file\\\\}")
else:
self.logger.error(f"Visualization failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
return success, output_file if success else None
def run_security_audit(self, account_name):
"""Run security audit"""
self.logger.info(f"Running security audit for account: \\\\{account_name\\\\}")
output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-audit.json")
command = [
"cloudmapper", "audit",
"--config", self.config_file,
"--account-name", account_name,
"--output-file", output_file
]
success, output = self.run_cloudmapper_command(command)
if success:
self.logger.info(f"Security audit completed: \\\\{output_file\\\\}")
return success, output_file
else:
self.logger.error(f"Security audit failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
return success, None
def analyze_public_resources(self, account_name):
"""Analyze public resources"""
self.logger.info(f"Analyzing public resources for account: \\\\{account_name\\\\}")
output_file = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-public.html")
command = [
"cloudmapper", "public",
"--config", self.config_file,
"--account-name", account_name,
"--output-file", output_file
]
success, output = self.run_cloudmapper_command(command)
if success:
self.logger.info(f"Public resources analysis completed: \\\\{output_file\\\\}")
else:
self.logger.error(f"Public resources analysis failed for \\\\{account_name\\\\}: \\\\{output\\\\}")
return success, output_file if success else None
def generate_reports(self, account_name):
"""Generate comprehensive reports"""
self.logger.info(f"Generating reports for account: \\\\{account_name\\\\}")
reports = \\\\{\\\\}
# Executive summary
exec_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-executive.html")
command = [
"cloudmapper", "report",
"--config", self.config_file,
"--account-name", account_name,
"--template", "executive",
"--output-file", exec_output
]
success, _ = self.run_cloudmapper_command(command)
if success:
reports["executive"] = exec_output
# Technical report
tech_output = os.path.join(self.output_dir, f"\\\\{account_name\\\\}-technical.html")
command = [
"cloudmapper", "report",
"--config", self.config_file,
"--account-name", account_name,
"--template", "technical",
"--output-file", tech_output
]
success, _ = self.run_cloudmapper_command(command)
if success:
reports["technical"] = tech_output
return reports
def analyze_account(self, account_name):
"""Complete analysis for single account"""
self.logger.info(f"Starting complete analysis for account: \\\\{account_name\\\\}")
results = \\\\{
"account_name": account_name,
"timestamp": datetime.now().isoformat(),
"success": True,
"outputs": \\\\{\\\\}
\\\\}
# Collect data
if not self.collect_account_data(account_name):
results["success"] = False
return results
# Generate visualization
success, viz_file = self.generate_visualization(account_name)
if success:
results["outputs"]["visualization"] = viz_file
# Run security audit
success, audit_file = self.run_security_audit(account_name)
if success:
results["outputs"]["audit"] = audit_file
# Analyze public resources
success, public_file = self.analyze_public_resources(account_name)
if success:
results["outputs"]["public_analysis"] = public_file
# Generate reports
reports = self.generate_reports(account_name)
results["outputs"]["reports"] = reports
self.logger.info(f"Analysis completed for account: \\\\{account_name\\\\}")
return results
def run_multi_account_analysis(self):
"""Run analysis across all configured accounts"""
config = self.load_config()
if not config:
return None
accounts = config.get("accounts", [])
if not accounts:
self.logger.error("No accounts configured")
return None
self.logger.info(f"Starting multi-account analysis for \\\\{len(accounts)\\\\} accounts")
all_results = []
for account in accounts:
account_name = account.get("name")
if account_name:
result = self.analyze_account(account_name)
all_results.append(result)
# Generate summary report
summary = self.generate_summary_report(all_results)
self.logger.info("Multi-account analysis completed")
return all_results, summary
def generate_summary_report(self, results):
"""Generate summary report for all accounts"""
summary = \\\\{
"analysis_summary": \\\\{
"total_accounts": len(results),
"successful_accounts": len([r for r in results if r["success"]]),
"failed_accounts": len([r for r in results if not r["success"]]),
"timestamp": datetime.now().isoformat()
\\\\},
"account_results": results
\\\\}
summary_file = os.path.join(self.output_dir, "analysis-summary.json")
with open(summary_file, 'w') as f:
json.dump(summary, f, indent=2)
self.logger.info(f"Summary report generated: \\\\{summary_file\\\\}")
return summary
def cleanup_old_data(self, days_old=7):
"""Clean up old CloudMapper data"""
self.logger.info(f"Cleaning up data older than \\\\{days_old\\\\} days")
# This would implement cleanup logic
# for old CloudMapper data files
pass
Usage
if name == "main": automation = CloudMapperAutomation("config.json") results, summary = automation.run_multi_account_analysis()
if results:
print(f"Analysis completed for \\\\{len(results)\\\\} accounts")
print(f"Results saved in: \\\\{automation.output_dir\\\\}")
```_
Continuous Monitoring Script
```python
!/usr/bin/env python3
Continuous AWS monitoring with CloudMapper
import schedule import time import json import os from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import boto3
class CloudMapperMonitoring: def init(self, config_file="monitoring-config.json"): self.config_file = config_file self.load_monitoring_config() self.last_results = \\{\\}
def load_monitoring_config(self):
"""Load monitoring configuration"""
with open(self.config_file, 'r') as f:
self.config = json.load(f)
self.accounts = self.config.get("accounts", [])
self.monitoring_settings = self.config.get("monitoring", \\\\{\\\\})
self.notification_settings = self.config.get("notifications", \\\\{\\\\})
def run_scheduled_assessment(self):
"""Run scheduled security assessment"""
print(f"Running scheduled assessment at \\\\{datetime.now()\\\\}")
current_results = \\\\{\\\\}
for account in self.accounts:
account_name = account.get("name")
# Run CloudMapper audit
audit_result = self.run_audit(account_name)
current_results[account_name] = audit_result
# Compare with previous results
if account_name in self.last_results:
changes = self.compare_results(
self.last_results[account_name],
audit_result
)
if changes:
self.send_change_notification(account_name, changes)
# Update last results
self.last_results = current_results
# Save results
self.save_monitoring_results(current_results)
def run_audit(self, account_name):
"""Run CloudMapper audit for account"""
output_file = f"monitoring-audit-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d-%H%M%S')\\\\}.json"
command = [
"cloudmapper", "audit",
"--config", "config.json",
"--account-name", account_name,
"--output-file", output_file
]
try:
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode == 0:
with open(output_file, 'r') as f:
audit_data = json.load(f)
# Clean up temporary file
os.remove(output_file)
return audit_data
else:
print(f"Audit failed for \\\\{account_name\\\\}: \\\\{result.stderr\\\\}")
return None
except Exception as e:
print(f"Error running audit for \\\\{account_name\\\\}: \\\\{e\\\\}")
return None
def compare_results(self, previous, current):
"""Compare audit results to detect changes"""
changes = []
if not previous or not current:
return changes
# Compare security findings
prev_findings = previous.get("findings", [])
curr_findings = current.get("findings", [])
# New findings
prev_finding_ids = \\\\{f.get("id") for f in prev_findings\\\\}
new_findings = [f for f in curr_findings if f.get("id") not in prev_finding_ids]
if new_findings:
changes.append(\\\\{
"type": "new_findings",
"count": len(new_findings),
"findings": new_findings
\\\\})
# Resolved findings
curr_finding_ids = \\\\{f.get("id") for f in curr_findings\\\\}
resolved_findings = [f for f in prev_findings if f.get("id") not in curr_finding_ids]
if resolved_findings:
changes.append(\\\\{
"type": "resolved_findings",
"count": len(resolved_findings),
"findings": resolved_findings
\\\\})
return changes
def send_change_notification(self, account_name, changes):
"""Send notification about changes"""
if not self.notification_settings.get("enabled", False):
return
subject = f"CloudMapper Alert: Changes detected in \\\\{account_name\\\\}"
body = f"Changes detected in AWS account \\\\{account_name\\\\}:\n\n"
for change in changes:
change_type = change["type"]
count = change["count"]
if change_type == "new_findings":
body += f"New security findings: \\\\{count\\\\}\n"
for finding in change["findings"][:5]: # Limit to first 5
body += f" - \\\\{finding.get('title', 'Unknown')\\\\}\n"
elif change_type == "resolved_findings":
body += f"Resolved security findings: \\\\{count\\\\}\n"
self.send_email(subject, body)
def send_email(self, subject, body):
"""Send email notification"""
email_config = self.notification_settings.get("email", \\\\{\\\\})
if not email_config.get("enabled", False):
return
try:
msg = MIMEMultipart()
msg['From'] = email_config["from"]
msg['To'] = email_config["to"]
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
server.starttls()
server.login(email_config["username"], email_config["password"])
text = msg.as_string()
server.sendmail(email_config["from"], email_config["to"], text)
server.quit()
print(f"Notification sent: \\\\{subject\\\\}")
except Exception as e:
print(f"Failed to send email notification: \\\\{e\\\\}")
def save_monitoring_results(self, results):
"""Save monitoring results to file"""
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
filename = f"monitoring-results-\\\\{timestamp\\\\}.json"
with open(filename, 'w') as f:
json.dump(\\\\{
"timestamp": datetime.now().isoformat(),
"results": results
\\\\}, f, indent=2)
def start_monitoring(self):
"""Start continuous monitoring"""
print("Starting CloudMapper continuous monitoring...")
# Schedule assessments
interval = self.monitoring_settings.get("interval_hours", 24)
schedule.every(interval).hours.do(self.run_scheduled_assessment)
# Run initial assessment
self.run_scheduled_assessment()
# Keep running
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
Usage
if name == "main": monitoring = CloudMapperMonitoring() monitoring.start_monitoring() ```_
Compliance Reporting Script
```python
!/usr/bin/env python3
CloudMapper compliance reporting
import json import subprocess from datetime import datetime import pandas as pd
class CloudMapperCompliance: def init(self, config_file="config.json"): self.config_file = config_file self.compliance_frameworks = \\{ "cis": "CIS AWS Foundations Benchmark", "nist": "NIST Cybersecurity Framework", "pci": "PCI DSS", "sox": "Sarbanes-Oxley Act", "hipaa": "HIPAA Security Rule" \\}
def run_compliance_audit(self, account_name, framework="cis"):
"""Run compliance audit for specific framework"""
output_file = f"compliance-\\\\{framework\\\\}-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"
command = [
"cloudmapper", "audit",
"--config", self.config_file,
"--account-name", account_name,
"--compliance-framework", framework,
"--output-file", output_file
]
try:
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode == 0:
with open(output_file, 'r') as f:
compliance_data = json.load(f)
return compliance_data
else:
print(f"Compliance audit failed: \\\\{result.stderr\\\\}")
return None
except Exception as e:
print(f"Error running compliance audit: \\\\{e\\\\}")
return None
def generate_compliance_report(self, account_name, frameworks=None):
"""Generate comprehensive compliance report"""
if frameworks is None:
frameworks = ["cis", "nist"]
report_data = \\\\{
"account_name": account_name,
"report_date": datetime.now().isoformat(),
"frameworks": \\\\{\\\\}
\\\\}
for framework in frameworks:
print(f"Running \\\\{framework.upper()\\\\} compliance audit...")
audit_result = self.run_compliance_audit(account_name, framework)
if audit_result:
compliance_score = self.calculate_compliance_score(audit_result)
report_data["frameworks"][framework] = \\\\{
"name": self.compliance_frameworks.get(framework, framework),
"score": compliance_score,
"findings": audit_result.get("findings", []),
"recommendations": audit_result.get("recommendations", [])
\\\\}
# Generate HTML report
html_report = self.generate_html_compliance_report(report_data)
# Save reports
json_file = f"compliance-report-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.json"
html_file = f"compliance-report-\\\\{account_name\\\\}-\\\\{datetime.now().strftime('%Y%m%d')\\\\}.html"
with open(json_file, 'w') as f:
json.dump(report_data, f, indent=2)
with open(html_file, 'w') as f:
f.write(html_report)
print(f"Compliance report generated: \\\\{html_file\\\\}")
return report_data
def calculate_compliance_score(self, audit_result):
"""Calculate compliance score from audit results"""
findings = audit_result.get("findings", [])
if not findings:
return 100.0
total_checks = len(findings)
passed_checks = len([f for f in findings if f.get("status") == "PASS"])
score = (passed_checks / total_checks) * 100
return round(score, 2)
def generate_html_compliance_report(self, report_data):
"""Generate HTML compliance report"""
html_template = """
Compliance Report
Account: \\\\{account_name\\\\}
Report Date: \\\\{report_date\\\\}
"""
frameworks_html = ""
for framework_id, framework_data in report_data["frameworks"].items():
score = framework_data["score"]
score_class = "pass" if score >= 80 else "fail"
findings_html = ""
for finding in framework_data["findings"][:10]: # Limit to first 10
status = finding.get("status", "UNKNOWN")
status_class = "pass" if status == "PASS" else "fail"
findings_html += f"""
<div class="finding">
<strong class="\\\\{status_class\\\\}">\\\\{status\\\\}</strong>: \\\\{finding.get("title", "Unknown")\\\\}
<br><small>\\\\{finding.get("description", "")\\\\}</small>
</div>
"""
frameworks_html += f"""
<div class="framework">
<h2>\\\\{framework_data["name"]\\\\}</h2>
<div class="score \\\\{score_class\\\\}">Compliance Score: \\\\{score\\\\}%</div>
<h3>Key Findings:</h3>
\\\\{findings_html\\\\}
</div>
"""
return html_template.format(
account_name=report_data["account_name"],
report_date=report_data["report_date"],
frameworks_html=frameworks_html
)
Usage
if name == "main": compliance = CloudMapperCompliance() report = compliance.generate_compliance_report("production", ["cis", "nist"]) ```_
Integrationsbeispiele
SIEM Integration
```python
!/usr/bin/env python3
CloudMapper SIEM integration
import json import requests from datetime import datetime
class CloudMapperSIEMIntegration: def init(self, siem_config): self.siem_config = siem_config self.siem_type = siem_config.get("type", "splunk")
def send_to_splunk(self, data):
"""Send CloudMapper data to Splunk"""
splunk_config = self.siem_config.get("splunk", \\\\{\\\\})
headers = \\\\{
"Authorization": f"Splunk \\\\{splunk_config['token']\\\\}",
"Content-Type": "application/json"
\\\\}
# Format data for Splunk
events = []
for finding in data.get("findings", []):
event = \\\\{
"time": datetime.now().timestamp(),
"source": "cloudmapper",
"sourcetype": "aws:security:finding",
"event": finding
\\\\}
events.append(event)
# Send to Splunk HEC
response = requests.post(
f"\\\\{splunk_config['url']\\\\}/services/collector/event",
headers=headers,
json=\\\\{"event": events\\\\}
)
return response.status_code == 200
def send_to_elasticsearch(self, data):
"""Send CloudMapper data to Elasticsearch"""
es_config = self.siem_config.get("elasticsearch", \\\\{\\\\})
# Format data for Elasticsearch
for finding in data.get("findings", []):
doc = \\\\{
"timestamp": datetime.now().isoformat(),
"source": "cloudmapper",
"finding": finding
\\\\}
# Index document
response = requests.post(
f"\\\\{es_config['url']\\\\}/cloudmapper-findings/_doc",
auth=(es_config['username'], es_config['password']),
json=doc
)
def send_findings(self, cloudmapper_data):
"""Send CloudMapper findings to SIEM"""
if self.siem_type == "splunk":
return self.send_to_splunk(cloudmapper_data)
elif self.siem_type == "elasticsearch":
return self.send_to_elasticsearch(cloudmapper_data)
else:
print(f"Unsupported SIEM type: \\\\{self.siem_type\\\\}")
return False
Usage
siem_config = \\{ "type": "splunk", "splunk": \\{ "url": "https://splunk.example.com:8088", "token": "your-hec-token" \\} \\}
siem_integration = CloudMapperSIEMIntegration(siem_config) ```_
Integration von Terrain
```python
!/usr/bin/env python3
CloudMapper and Terraform integration
import json import subprocess import os
class CloudMapperTerraformIntegration: def init(self, terraform_dir): self.terraform_dir = terraform_dir
def analyze_terraform_plan(self, plan_file):
"""Analyze Terraform plan with CloudMapper"""
# Parse Terraform plan
with open(plan_file, 'r') as f:
plan_data = json.load(f)
# Extract AWS resources
aws_resources = self.extract_aws_resources(plan_data)
# Generate CloudMapper configuration
config = self.generate_cloudmapper_config(aws_resources)
return config
def extract_aws_resources(self, plan_data):
"""Extract AWS resources from Terraform plan"""
aws_resources = []
for resource_change in plan_data.get("resource_changes", []):
resource_type = resource_change.get("type", "")
if resource_type.startswith("aws_"):
aws_resources.append(\\\\{
"type": resource_type,
"name": resource_change.get("name", ""),
"change": resource_change.get("change", \\\\{\\\\}),
"values": resource_change.get("change", \\\\{\\\\}).get("after", \\\\{\\\\})
\\\\})
return aws_resources
def generate_cloudmapper_config(self, aws_resources):
"""Generate CloudMapper configuration from Terraform resources"""
# This would generate appropriate CloudMapper configuration
# based on the Terraform resources
config = \\\\{
"accounts": [
\\\\{
"id": "123456789012", # Would be extracted from Terraform
"name": "terraform-managed"
\\\\}
]
\\\\}
return config
Usage
terraform_integration = CloudMapperTerraformIntegration("/path/to/terraform") ```_
Fehlerbehebung
Gemeinsame Themen
Authentifizierungsprobleme: ```bash
Check AWS credentials
aws sts get-caller-identity
Verify IAM permissions
aws iam simulate-principal-policy \ --policy-source-arn arn:aws:iam::123456789012:user/cloudmapper-user \ --action-names ec2:DescribeInstances \ --resource-arns "*"
Test specific service access
aws ec2 describe-instances --region us-east-1 aws s3 ls
Check CloudMapper configuration
cloudmapper configure list-accounts --config config.json ```_
Datenerhebung: ```bash
Enable verbose logging
cloudmapper collect --config config.json --verbose
Test specific regions
cloudmapper collect --config config.json --regions us-east-1
Check for rate limiting
cloudmapper collect --config config.json --rate-limit 10
Verify account access
aws organizations describe-account --account-id 123456789012 ```_
Visualisierungsprobleme: ```bash
Check dependencies
pip install graphviz pydot
Install system graphviz
sudo apt install graphviz # Ubuntu/Debian brew install graphviz # macOS
Generate simplified diagram
cloudmapper visualize --config config.json --simplified
Check output directory permissions
ls -la output/ chmod 755 output/ ```_
Leistungsoptimierung
Optimierung der Cloud Mapper Performance:
```bash
Use specific regions
cloudmapper collect --config config.json --regions us-east-1,us-west-2
Limit services
cloudmapper collect --config config.json --services ec2,s3,iam
Parallel collection
cloudmapper collect --config config.json --threads 4
Use caching
cloudmapper collect --config config.json --cache-dir ./cache
Optimize memory usage
export PYTHONHASHSEED=0 ulimit -v 4194304 # Limit virtual memory ```_
Sicherheitsüberlegungen
Datenschutz
Sensitive Datenverarbeitung: - CloudMapper sammelt detaillierte AWS-Konfigurationsdaten - Speichern Sie Ausgabedateien sicher mit entsprechenden Zugriffssteuerungen - Verschlüsseln Sie sensible Berichte und Auditergebnisse - Umsetzung von Datenschutzbestimmungen für gesammelte Informationen - Verwenden Sie sichere Kanäle für die Übertragung von Berichten
Access Controls: - Limit CloudMapper IAM Berechtigungen auf Mindestanforderungen - Verwenden Sie temporäre Anmeldeinformationen, wenn möglich - Multifaktor-Authentifizierung für AWS-Zugriff implementieren - Regelmäßige Rotation der Zugangsschlüssel und Anmeldeinformationen - Monitor Cloud Mapper Nutzung durch CloudTrail
Operationelle Sicherheit
** Sichere Nutzungspraktiken:** - Run Cloud Mapper aus sicheren, überwachten Umgebungen - Validierung von Konfigurationsdateien vor der Ausführung - Regelmäßige Updates zu CloudMapper und Abhängigkeiten - Monitor für unberechtigte Nutzungs- oder Konfigurationsänderungen - Durchführung der Protokollierung und Prüfung von CloudMapper-Aktivitäten
Netzwerksicherheit: - Verwenden Sie VPN oder sichere Netzwerke für CloudMapper-Ausführung - Netzwerksegmentierung für Analyseumgebungen implementieren - Überwachen Sie den Netzwerkverkehr während der Datenerfassung - Verwenden Sie sichere Protokolle für alle Kommunikationen - Regelmäßige Sicherheitsbewertungen der CloudMapper Infrastruktur
Referenzen
- [CloudMapper GitHub Repository](https://__LINK_5___
- (LINK_5)
- CIS AWS Foundations Benchmark
- (LINK_5)
- [NIST Cybersecurity Framework](__LINK_5___