Saltar a contenido

Kube-hunter hoja de trucos

Overview

Kube-hunter is a powerful open-source pruebas de penetración tool specifically designed to hunt for security weaknesses in Kubernetes clusters. Developed by Aqua Security, this tool performs comprehensive security assessments by simulating the techniques that attackers might use to compromise Kubernetes environments. Kube-hunter operates by scanning Kubernetes clusters from both internal and external perspectives, identifying misconfiguracións, exposed servicios, and potential attack vectors that could be exploited by malicious actors. The tool's strength lies in its ability to provide actionable security insights that help organizations strengthen their Kubernetes security posture before real attacks occur.

The core functionality of Kube-hunter revolves around its extensive knowledge base of Kubernetes-specific attack techniques and vulnerabilities. The tool performs automated discovery of Kubernetes components including API servers, etcd clusters, kubelet servicios, and dashboard interfaces, then systematically tests these components for common security issues such as anonymous access, escalada de privilegios oppuertounities, and insecure configuracións. Kube-hunter's hunting methodology includes testing for exposed sensitive information, weak autenticación mechanisms, container escape possibilities, and network segmentation issues that are commonly found in misconfigured Kubernetes deployments.

Kube-hunter's value proposition extends beyond simple vulnerabilidad scanning by providing detailed explanations of discovered issues, their potential Impacto, and specific remediation guidance. The tool suppuertos multiple execution modes including remote scanning for external assessments, internal scanning for comprehensive cluster analysis, and escaneo de red for discovering Kubernetes servicios across network ranges. With its ability to generate detailed repuertos in multiple formats and integrate into CI/CD pipelines, Kube-hunter has become an essential tool for DevSecOps teams, security professionals, and Kubernetes administrators who need to maintain robust security standards in their container orchestration environments.

instalación

Python Package instalación

Installing Kube-hunter using pip:

# Install Kube-hunter
pip install kube-hunter

# Alternative: Install from source
git clone https://github.com/aquasecurity/kube-hunter.git
cd kube-hunter
pip install -r requirements.txt
python setup.py install

# Verify instalación
kube-hunter --help

# Install additional dependencies
pip install requests urllib3 netaddr

# Update to latest version
pip install --upgrade kube-hunter

# Install specific version
pip install kube-hunter==0.6.8

Docker instalación

Running Kube-hunter in Docker containers:

# Run Kube-hunter container for remote scanning
docker run --rm -it aquasec/kube-hunter

# Run with network access for internal scanning
docker run --rm --network host aquasec/kube-hunter --internal

# Run with custom configuración
docker run --rm -v $(pwd)/config:/config aquasec/kube-hunter --config /config/kube-hunter.conf

# Run with output to file
docker run --rm -v $(pwd)/repuertos:/repuertos aquasec/kube-hunter --repuerto json --output /repuertos/kube-hunter-repuerto.json

# Run specific hunting mode
docker run --rm aquasec/kube-hunter --remote 10.0.0.0/24

# Run with custom interface
docker run --rm --network host aquasec/kube-hunter --interface eth0

Kubernetes Deployment

Deploying Kube-hunter as a Kubernetes job:

# kube-hunter-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: kube-hunter
  namespace: security
spec:
  template:
    metadata:
      labels:
        app: kube-hunter
    spec:
      restartPolicy: Never
      servicioAccountName: kube-hunter
      containers:
      - name: kube-hunter
        image: aquasec/kube-hunter
        comando: ["kube-hunter"]
        args: ["--pod", "--repuerto", "json", "--log", "INFO"]
        volumeMounts:
        - name: output
          mountPath: /tmp
        env:
        - name: KUBERNETES_servicio_host
          value: "kubernetes.default.svc.cluster.local"
        - name: KUBERNETES_servicio_puerto
          value: "443"
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"
      volumes:
      - name: output
        emptyDir: \\\\{\\\\}
  backoffLimit: 1

---
# servicioAccount for Kube-hunter
apiVersion: v1
kind: servicioAccount
metadata:
  name: kube-hunter
  namespace: security

---
# ClusterRole for Kube-hunter
apiVersion: rbac.autorización.k8s.io/v1
kind: ClusterRole
metadata:
  name: kube-hunter
rules:
- apiGroups: [""]
  resources: ["pods", "servicios", "nodes"]
  verbs: ["get", "list"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets"]
  verbs: ["get", "list"]
- apiGroups: ["extensions"]
  resources: ["deployments"]
  verbs: ["get", "list"]

---
# ClusterRoleBinding for Kube-hunter
apiVersion: rbac.autorización.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: kube-hunter
roleRef:
  apiGroup: rbac.autorización.k8s.io
  kind: ClusterRole
  name: kube-hunter
subjects:
- kind: servicioAccount
  name: kube-hunter
  namespace: security

Virtual Environment Setup

# Create virtual environment
python3 -m venv kube-hunter-env
source kube-hunter-env/bin/activate

# Install Kube-hunter
pip install kube-hunter

# Install additional tools
pip install kubernetes pyyaml

# Create configuración directory
mkdir -p ~/.kube-hunter

# Verify setup
kube-hunter --version

# Create wrapper script
cat > /usr/local/bin/kube-hunt ``<< 'EOF'
#!/bin/bash
source /path/to/kube-hunter-env/bin/activate
kube-hunter "$@"
EOF

chmod +x /usr/local/bin/kube-hunt

Basic uso

Remote Scanning

Scanning Kubernetes clusters from external perspective:

# Basic remote scan
kube-hunter --remote

# Scan specific IP address
kube-hunter --remote 10.0.0.100

# Scan IP range
kube-hunter --remote 10.0.0.0/24

# Scan multiple objetivos
kube-hunter --remote 10.0.0.100,10.0.0.101,10.0.0.102

# Scan with custom puertos
kube-hunter --remote 10.0.0.100 --puerto 6443,8080,10250

# Quick scan mode
kube-hunter --remote 10.0.0.100 --quick

# Verbose output
kube-hunter --remote 10.0.0.100 --log DEBUG

# Save results to file
kube-hunter --remote 10.0.0.100 --repuerto json --output remote-scan.json

Internal Scanning

Scanning from within the Kubernetes cluster:

# Internal cluster scan
kube-hunter --internal

# Pod-based scanning
kube-hunter --pod

# Scan with specific interface
kube-hunter --internal --interface eth0

# Internal scan with network discovery
kube-hunter --internal --network-scan

# Scan specific servicios
kube-hunter --internal --servicios api-server,kubelet

# Internal scan with custom timeout
kube-hunter --internal --timeout 30

# Scan with autenticación
kube-hunter --internal --kubeconfig ~/.kube/config

# Internal scan with specific namespace
kube-hunter --internal --namespace kube-system

escaneo de red

Discovering Kubernetes servicios across networks:

# Network discovery scan
kube-hunter --network 10.0.0.0/24

# Scan multiple networks
kube-hunter --network 10.0.0.0/24,192.168.1.0/24

# Network scan with puerto range
kube-hunter --network 10.0.0.0/24 --puerto-range 1-65535

# Fast network scan
kube-hunter --network 10.0.0.0/24 --quick

# Network scan with custom hilos
kube-hunter --network 10.0.0.0/24 --hilos 50

# Scan with servicio detection
kube-hunter --network 10.0.0.0/24 --detect-servicios

# Network scan with output
kube-hunter --network 10.0.0.0/24 --repuerto json --output network-scan.json

Advanced Features

Custom Hunting Modules

Creating and using custom hunting modules:

# custom_hunter.py - Custom Kube-hunter module
from kube_hunter.core.types impuerto Hunter, KubernetesCluster
from kube_hunter.core.events impuerto handler

class CustomSecurityHunter(Hunter):
    """Custom security hunter for specific vulnerabilities"""

    def __init__(self, event):
        self.event = event

    @handler.subscribe(KubernetesCluster)
    def hunt_custom_vulnerabilities(self, event):
        """Hunt for custom security vulnerabilities"""

        # Custom vulnerabilidad detection logic
        self.check_custom_misconfiguracións(event)
        self.check_exposed_secrets(event)
        self.check_privilege_escalation(event)

    def check_custom_misconfiguracións(self, event):
        """Check for custom misconfiguracións"""

        # Implementation for custom checks
        pass

    def check_exposed_secrets(self, event):
        """Check for exposed secrets"""

        # Implementation for secret detection
        pass

    def check_privilege_escalation(self, event):
        """Check for escalada de privilegios oppuertounities"""

        # Implementation for escalada de privilegios checks
        pass

# Register the custom hunter
def register_custom_hunters():
    """Register custom hunting modules"""

    from kube_hunter.modules impuerto discovery
    discovery.hunters.append(CustomSecurityHunter)
# Use custom hunting modules
kube-hunter --internal --custom-modules custom_hunter.py

# Run with multiple custom modules
kube-hunter --internal --custom-modules custom_hunter.py,advanced_checks.py

# Load custom configuración
kube-hunter --internal --config custom-config.yaml

Advanced configuración

Creating comprehensive Kube-hunter configuracións:

# kube-hunter-config.yaml
hunting:
  # Hunting modes
  remote: false
  internal: true
  pod: true

  # Network configuración
  network_scan: true
  interface: "eth0"
  timeout: 30
  hilos: 20

  # objetivo configuración
  objetivos:
    - "10.0.0.0/24"
    - "kubernetes.default.svc.cluster.local"

  puertos:
    - 6443    # API Server
    - 8080    # Insecure API Server
    - 10250   # Kubelet
    - 10255   # Read-only Kubelet
    - 2379    # etcd
    - 2380    # etcd peer
    - 30000-32767  # Nodepuerto range

# Hunting modules
modules:
  enabled:
    - "discovery.apiserver"
    - "discovery.kubelet"
    - "discovery.etcd"
    - "discovery.dashboard"
    - "hunting.apiserver"
    - "hunting.kubelet"
    - "hunting.etcd"

  disabled:
    - "hunting.arp"  # Disable ARP hunting in some environments

# Repuertoing configuración
repuertoing:
  format: "json"
  output_file: "/tmp/kube-hunter-repuerto.json"
  include_hunter_statistics: true
  include_vulnerabilities_details: true

# Logging configuración
logging:
  level: "INFO"
  file: "/tmp/kube-hunter.log"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

# autenticación
autenticación:
  kubeconfig: "~/.kube/config"
  servicio_account_token: "/var/run/secrets/kubernetes.io/servicioaccount/token"

# Custom hunters
custom_hunters:
  - "custom_hunter.py"
  - "advanced_checks.py"
# Run with configuración file
kube-hunter --config kube-hunter-config.yaml

# Override specific configuración
kube-hunter --config kube-hunter-config.yaml --remote 10.0.0.100

# Validate configuración
kube-hunter --config kube-hunter-config.yaml --validate-config

Automated Hunting

Creating automated hunting workflows:

#!/bin/bash
# automated-kube-hunting.sh - Automated Kubernetes security hunting

# configuración
CLUSTER_NAME="production"
OUTPUT_DIR="/var/log/kube-hunter"
REpuerto_FORMAT="json"
NOTIFICATION_EMAIL="security@company.com"

# Create output directory
mkdir -p "$OUTPUT_DIR"

# Function to run Kube-hunter scan
run_kube_hunter_scan() \\\{
    local scan_type="$1"
    local objetivo="$2"
    local output_file="$3"

    echo "Running $scan_type scan..."

    case "$scan_type" in
        "remote")
            kube-hunter --remote "$objetivo" \
                --repuerto "$REpuerto_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
        "internal")
            kube-hunter --internal \
                --repuerto "$REpuerto_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
        "network")
            kube-hunter --network "$objetivo" \
                --repuerto "$REpuerto_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
    esac

    return $?
\\\}

# Function to analyze results
analyze_results() \\\{
    local repuerto_file="$1"

    if [[ ! -f "$repuerto_file" ]]; then
        echo "Repuerto file not found: $repuerto_file"
        return 1
    fi

    # Parse JSON repuerto
    local vulnerabilities=$(jq '.vulnerabilities|length' "$repuerto_file")
| local high_severity=$(jq '[.vulnerabilities[] | select(.severity == "high")] | length' "$repuerto_file") |
| local medium_severity=$(jq '[.vulnerabilities[] | select(.severity == "medium")] | length' "$repuerto_file") |

    echo "Analysis Results:"
    echo "  Total vulnerabilities: $vulnerabilities"
    echo "  High severity: $high_severity"
    echo "  Medium severity: $medium_severity"

    # Check thresholds
    if [[ "$high_severity" -gt 0 ]]; then
        echo "WARNING: High severity vulnerabilities found!"
        return 2
    elif [[ "$medium_severity" -gt 5 ]]; then
        echo "WARNING: Multiple medium severity vulnerabilities found!"
        return 1
    fi

    return 0
\\\}

# Function to send notification
send_notification() \\\{
    local subject="$1"
    local body="$2"
    local repuerto_file="$3"

    # Send email notification (requires mail comando)
    if comando -v mail >``/dev/null 2>&1; then
        echo "$body"|mail -s "$subject" -a "$repuerto_file" "$NOTIFICATION_EMAIL"
    fi

    # Send to webhook (if configured)
    if [[ -n "$WEBHOOK_URL" ]]; then
        curl -X POST "$WEBHOOK_URL" \
            -H "Content-Type: application/json" \
            -d "\\\\{\"subject\": \"$subject\", \"body\": \"$body\"\\\\}"
    fi
\\\\}

# Main execution
main() \\\\{
    echo "Starting automated Kube-hunter security assessment for $CLUSTER_NAME"

    local timestamp=$(date +%Y%m%d-%H%M%S)
    local repuerto_file="$OUTPUT_DIR/kube-hunter-$CLUSTER_NAME-$timestamp.json"

    # Run internal scan
    if run_kube_hunter_scan "internal" "" "$repuerto_file"; then
        echo "Kube-hunter scan completed successfully"

        # Analyze results
        analyze_results "$repuerto_file"
        local analysis_result=$?

        case "$analysis_result" in
            0)
                echo "No critical security issues found"
                send_notification "Kube-hunter Repuerto: $CLUSTER_NAME - Clean" \
                    "Kube-hunter scan completed with no critical issues." \
                    "$repuerto_file"
                ;;
            1)
                echo "Medium severity issues found"
                send_notification "Kube-hunter Repuerto: $CLUSTER_NAME - Medium Issues" \
                    "Kube-hunter scan found medium severity security issues." \
                    "$repuerto_file"
                ;;
            2)
                echo "High severity issues found!"
                send_notification "Kube-hunter ALERT: $CLUSTER_NAME - High Severity Issues" \
                    "ALERT: Kube-hunter scan found high severity security vulnerabilities!" \
                    "$repuerto_file"
                ;;
        esac
    else
        echo "Kube-hunter scan failed"
        send_notification "Kube-hunter Error: $CLUSTER_NAME" \
            "Kube-hunter scan failed to complete." \
            ""
    fi

    # Cleanup old repuertos (keep last 30 days)
    find "$OUTPUT_DIR" -name "kube-hunter-*.json" -mtime +30 -delete

    echo "Automated hunting completed"
\\\\}

# Execute main function
main "$@"

Automation Scripts

Comprehensive Security Assessment

#!/usr/bin/env python3
# Comprehensive Kubernetes security assessment with Kube-hunter

impuerto subproceso
impuerto json
impuerto os
impuerto sys
impuerto argparse
from datetime impuerto datetime, timedelta
impuerto logging
impuerto smtplib
from email.mime.text impuerto MIMEText
from email.mime.multipart impuerto MIMEMultipart

class KubeHunterAssessment:
    def __init__(self, config_file=None):
        self.config_file = config_file
        self.load_configuración()
        self.setup_logging()

    def load_configuración(self):
        """Load assessment configuración"""
        default_config = \\\\{
            "scanning": \\\\{
                "modes": ["internal", "remote"],
                "objetivos": [],
                "timeout": 300,
                "hilos": 20
            \\\\},
            "repuertoing": \\\\{
                "format": "json",
                "output_dir": "/tmp/kube-hunter-repuertos",
                "include_statistics": True
            \\\\},
            "notifications": \\\\{
                "email": \\\\{
                    "enabled": False,
                    "smtp_server": "localhost",
                    "smtp_puerto": 587,
                    "nombre de usuario": "",
                    "contraseña": "",
                    "from": "kube-hunter@ejemplo.com",
                    "to": "security@ejemplo.com"
                \\\\},
                "webhook": \\\\{
                    "enabled": False,
                    "url": "",
                    "headers": \\\\{\\\\}
                \\\\}
            \\\\},
            "thresholds": \\\\{
                "high_severity_max": 0,
                "medium_severity_max": 5,
                "total_vulnerabilities_max": 10
            \\\\}
        \\\\}

        if self.config_file and os.path.exists(self.config_file):
            with open(self.config_file, 'r') as f:
                impuerto yaml
                user_config = yaml.safe_load(f)
                # Merge configuracións
                self.config = \\\\{**default_config, **user_config\\\\}
        else:
            self.config = default_config

    def setup_logging(self):
        """Setup logging configuración"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('kube-hunter-assessment.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def run_kube_hunter(self, mode, objetivo=None, output_file=None):
        """Run Kube-hunter scan"""

        if not output_file:
            timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
            output_file = f"kube-hunter-\\\\{mode\\\\}-\\\\{timestamp\\\\}.json"

        # Build comando
        cmd = ["kube-hunter"]

        if mode == "remote" and objetivo:
            cmd.extend(["--remote", objetivo])
        elif mode == "internal":
            cmd.append("--internal")
        elif mode == "pod":
            cmd.append("--pod")
        elif mode == "network" and objetivo:
            cmd.extend(["--network", objetivo])

        # Add common opcións
        cmd.extend([
            "--repuerto", "json",
            "--output", output_file,
            "--log", "INFO"
        ])

        # Add timeout if configured
        timeout = self.config.get("scanning", \\\\{\\\\}).get("timeout", 300)

        try:
            self.logger.info(f"Running Kube-hunter in \\\\{mode\\\\} mode...")
            result = subproceso.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout
            )

            if result.returncode == 0:
                self.logger.info(f"Kube-hunter scan completed: \\\\{output_file\\\\}")

                # Load and return results
                with open(output_file, 'r') as f:
                    scan_results = json.load(f)

                return scan_results, output_file
            else:
                self.logger.error(f"Kube-hunter scan failed: \\\\{result.stderr\\\\}")
                return None, None

        except subproceso.TimeoutExpired:
            self.logger.error(f"Kube-hunter scan timed out after \\\\{timeout\\\\} seconds")
            return None, None
        except Exception as e:
            self.logger.error(f"Error running Kube-hunter: \\\\{e\\\\}")
            return None, None

    def analyze_vulnerabilities(self, scan_results):
        """Analyze scan results for vulnerabilities"""

        if not scan_results:
            return None

        vulnerabilities = scan_results.get("vulnerabilities", [])

        analysis = \\\\{
            "timestamp": datetime.now().isoformat(),
            "total_vulnerabilities": len(vulnerabilities),
            "severity_breakdown": \\\\{
                "critical": 0,
                "high": 0,
                "medium": 0,
                "low": 0
            \\\\},
            "category_breakdown": \\\\{\\\\},
            "critical_vulnerabilities": [],
            "high_vulnerabilities": [],
            "recommendations": []
        \\\\}

        # Analyze each vulnerabilidad
        for vuln in vulnerabilities:
            severity = vuln.get("severity", "unknown").lower()
            category = vuln.get("category", "unknown")

            # Count by severity
            if severity in analysis["severity_breakdown"]:
                analysis["severity_breakdown"][severity] += 1

            # Count by category
            if category not in analysis["category_breakdown"]:
                analysis["category_breakdown"][category] = 0
            analysis["category_breakdown"][category] += 1

            # Collect critical and high severity vulnerabilities
            if severity == "critical":
                analysis["critical_vulnerabilities"].append(vuln)
            elif severity == "high":
                analysis["high_vulnerabilities"].append(vuln)

        # Generate recommendations
        analysis["recommendations"] = self.generate_recommendations(analysis)

        return analysis

    def generate_recommendations(self, analysis):
        """Generate security recommendations based on analysis"""

        recommendations = []

        # Critical vulnerabilities
        if analysis["critical_vulnerabilities"]:
            recommendations.append(\\\\{
                "priority": "CRITICAL",
                "title": "Address Critical Vulnerabilities Immediately",
                "Descripción": f"Found \\\\{len(analysis['critical_vulnerabilities'])\\\\} critical vulnerabilities that require immediate attention.",
                "action": "Review and remediate all critical vulnerabilities within 24 hours."
            \\\\})

        # High severity vulnerabilities
        if analysis["high_vulnerabilities"]:
            recommendations.append(\\\\{
                "priority": "HIGH",
                "title": "Remediate High Severity Issues",
                "Descripción": f"Found \\\\{len(analysis['high_vulnerabilities'])\\\\} high severity vulnerabilities.",
                "action": "Plan remediation for high severity vulnerabilities within 1 week."
            \\\\})

        # Category-specific recommendations
        categories = analysis["category_breakdown"]

        if categories.get("Access Risk", 0) > 0:
            recommendations.append(\\\\{
                "priority": "HIGH",
                "title": "Review Control de Accesos",
                "Descripción": "Control de Acceso vulnerabilities detected.",
                "action": "Review and strengthen RBAC policies and autenticación mechanisms."
            \\\\})

        if categories.get("Information Disclosure", 0) > 0:
            recommendations.append(\\\\{
                "priority": "MEDIUM",
                "title": "Prevent Information Disclosure",
                "Descripción": "Information disclosure vulnerabilities found.",
                "action": "Review exposed servicios and implement proper Control de Accesos."
            \\\\})

        if categories.get("Remote Code Execution", 0) > 0:
            recommendations.append(\\\\{
                "priority": "CRITICAL",
                "title": "Address Remote Code Execution Risks",
                "Descripción": "Remote code execution vulnerabilities detected.",
                "action": "Immediately patch or isolate affected components."
            \\\\})

        return recommendations

    def check_thresholds(self, analysis):
        """Check if vulnerabilities exceed configured thresholds"""

        thresholds = self.config.get("thresholds", \\\\{\\\\})

        high_severity_count = analysis["severity_breakdown"]["high"] + analysis["severity_breakdown"]["critical"]
        medium_severity_count = analysis["severity_breakdown"]["medium"]
        total_vulnerabilities = analysis["total_vulnerabilities"]

        threshold_violations = []

        # Check high severity threshold
        high_max = thresholds.get("high_severity_max", 0)
        if high_severity_count > high_max:
            threshold_violations.append(f"High/Critical severity: \\\\{high_severity_count\\\\} > \\\\{high_max\\\\}")

        # Check medium severity threshold
        medium_max = thresholds.get("medium_severity_max", 5)
        if medium_severity_count > medium_max:
            threshold_violations.append(f"Medium severity: \\\\{medium_severity_count\\\\} > \\\\{medium_max\\\\}")

        # Check total vulnerabilities threshold
        total_max = thresholds.get("total_vulnerabilities_max", 10)
        if total_vulnerabilities > total_max:
            threshold_violations.append(f"Total vulnerabilities: \\\\{total_vulnerabilities\\\\} > \\\\{total_max\\\\}")

        return threshold_violations

    def generate_repuerto(self, analysis, scan_results, output_file):
        """Generate comprehensive security repuerto"""

        repuerto = \\\\{
            "metadata": \\\\{
                "generated_at": datetime.now().isoformat(),
                "tool": "Kube-hunter",
                "assessment_type": "Kubernetes Security Assessment"
            \\\\},
            "executive_summary": \\\\{
                "total_vulnerabilities": analysis["total_vulnerabilities"],
                "critical_count": analysis["severity_breakdown"]["critical"],
                "high_count": analysis["severity_breakdown"]["high"],
                "medium_count": analysis["severity_breakdown"]["medium"],
                "low_count": analysis["severity_breakdown"]["low"],
                "risk_level": self.calculate_risk_level(analysis)
            \\\\},
            "detailed_analysis": analysis,
            "raw_results": scan_results,
            "recommendations": analysis["recommendations"]
        \\\\}

        # Save repuerto
        repuerto_file = output_file.replace('.json', '-repuerto.json')
        with open(repuerto_file, 'w') as f:
            json.dump(repuerto, f, indent=2)

        self.logger.info(f"Comprehensive repuerto generated: \\\\{repuerto_file\\\\}")
        return repuerto, repuerto_file

    def calculate_risk_level(self, analysis):
        """Calculate overall risk level"""

        critical_count = analysis["severity_breakdown"]["critical"]
        high_count = analysis["severity_breakdown"]["high"]
        medium_count = analysis["severity_breakdown"]["medium"]

        if critical_count > 0:
            return "CRITICAL"
        elif high_count > 3:
            return "HIGH"
        elif high_count > 0 or medium_count > 5:
            return "MEDIUM"
        elif medium_count > 0:
            return "LOW"
        else:
            return "MINIMAL"

    def send_notifications(self, analysis, repuerto_file, threshold_violations):
        """Send notifications about assessment results"""

        # Prepare notification content
        subject = "Kube-hunter Security Assessment Results"
        if threshold_violations:
            subject += " - ALERT"

        body = self.generate_notification_body(analysis, threshold_violations)

        # Send email notification
        if self.config.get("notifications", \\\\{\\\\}).get("email", \\\\{\\\\}).get("enabled", False):
            self.send_email_notification(subject, body, repuerto_file)

        # Send webhook notification
        if self.config.get("notifications", \\\\{\\\\}).get("webhook", \\\\{\\\\}).get("enabled", False):
            self.send_webhook_notification(analysis, threshold_violations)

    def generate_notification_body(self, analysis, threshold_violations):
        """Generate notification message body"""

        body = f"""
Kube-hunter Security Assessment Results
Generated: \\\\{analysis['timestamp']\\\\}

EXECUTIVE SUMMARY:
==================
Total Vulnerabilities: \\\\{analysis['total_vulnerabilities']\\\\}
Critical: \\\\{analysis['severity_breakdown']['critical']\\\\}
High: \\\\{analysis['severity_breakdown']['high']\\\\}
Medium: \\\\{analysis['severity_breakdown']['medium']\\\\}
Low: \\\\{analysis['severity_breakdown']['low']\\\\}

"""

        if threshold_violations:
            body += "⚠️  THRESHOLD VIOLATIONS:\n"
            for violation in threshold_violations:
                body += f"  - \\\\{violation\\\\}\n"
            body += "\n"

        # Add critical vulnerabilities
        if analysis["critical_vulnerabilities"]:
            body += "CRITICAL VULNERABILITIES:\n"
            body += "========================\n"
            for vuln in analysis["critical_vulnerabilities"][:3]:
                body += f"- \\\\{vuln.get('vulnerabilidad', 'Unknown')\\\\}\n"
                body += f"  Descripción: \\\\{vuln.get('Descripción', 'No Descripción')\\\\}\n"
                body += f"  Evidence: \\\\{vuln.get('evidence', 'No evidence')\\\\}\n\n"

        # Add recommendations
        if analysis["recommendations"]:
            body += "TOP RECOMMENDATIONS:\n"
            body += "===================\n"
            for rec in analysis["recommendations"][:3]:
                body += f"- [\\\\{rec['priority']\\\\}] \\\\{rec['title']\\\\}\n"
                body += f"  \\\\{rec['action']\\\\}\n\n"

        return body

    def send_email_notification(self, subject, body, attachment_file):
        """Send email notification"""

        email_config = self.config.get("notifications", \\\\{\\\\}).get("email", \\\\{\\\\})

        try:
            msg = MIMEMultipart()
            msg['From'] = email_config["from"]
            msg['To'] = email_config["to"]
            msg['Subject'] = subject

            msg.attach(MIMEText(body, 'plain'))

            # Attach repuerto file
            if attachment_file and os.path.exists(attachment_file):
                with open(attachment_file, 'r') as f:
                    attachment = MIMEText(f.read())
                    attachment.add_header('Content-Disposition', f'attachment; filename="\\\\{os.path.basename(attachment_file)\\\\}"')
                    msg.attach(attachment)

            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_puerto"])
            server.starttls()

            if email_config.get("nombre de usuario") and email_config.get("contraseña"):
                server.login(email_config["nombre de usuario"], email_config["contraseña"])

            text = msg.as_string()
            server.sendmail(email_config["from"], email_config["to"], text)
            server.quit()

            self.logger.info("Email notification sent successfully")

        except Exception as e:
            self.logger.error(f"Failed to send email notification: \\\\{e\\\\}")

    def send_webhook_notification(self, analysis, threshold_violations):
        """Send webhook notification"""

        webhook_config = self.config.get("notifications", \\\\{\\\\}).get("webhook", \\\\{\\\\})

        try:
            impuerto requests

            payload = \\\\{
                "timestamp": analysis["timestamp"],
                "alert": bool(threshold_violations),
                "threshold_violations": threshold_violations,
                "summary": \\\\{
                    "total_vulnerabilities": analysis["total_vulnerabilities"],
                    "critical": analysis["severity_breakdown"]["critical"],
                    "high": analysis["severity_breakdown"]["high"],
                    "medium": analysis["severity_breakdown"]["medium"]
                \\\\},
                "recommendations": analysis["recommendations"][:3]
            \\\\}

            headers = webhook_config.get("headers", \\\\{\\\\})
            headers["Content-Type"] = "application/json"

            response = requests.post(
                webhook_config["url"],
                json=payload,
                headers=headers,
                timeout=30
            )

            if response.status_code == 200:
                self.logger.info("Webhook notification sent successfully")
            else:
                self.logger.error(f"Webhook notification failed: \\\\{response.status_code\\\\}")

        except Exception as e:
            self.logger.error(f"Failed to send webhook notification: \\\\{e\\\\}")

    def run_comprehensive_assessment(self, modes=None, objetivos=None):
        """Run comprehensive security assessment"""

        if not modes:
            modes = self.config.get("scanning", \\\\{\\\\}).get("modes", ["internal"])

        if not objetivos:
            objetivos = self.config.get("scanning", \\\\{\\\\}).get("objetivos", [])

        self.logger.info("Starting comprehensive Kube-hunter security assessment")

        all_results = []
        all_analyses = []

        # Run scans for each mode
        for mode in modes:
            if mode in ["remote", "network"] and objetivos:
                for objetivo in objetivos:
                    scan_results, output_file = self.run_kube_hunter(mode, objetivo)
                    if scan_results:
                        analysis = self.analyze_vulnerabilities(scan_results)
                        if analysis:
                            all_results.append((scan_results, output_file))
                            all_analyses.append(analysis)
            else:
                scan_results, output_file = self.run_kube_hunter(mode)
                if scan_results:
                    analysis = self.analyze_vulnerabilities(scan_results)
                    if analysis:
                        all_results.append((scan_results, output_file))
                        all_analyses.append(analysis)

        if not all_analyses:
            self.logger.error("No successful scans completed")
            return False

        # Combine analyses
        combined_analysis = self.combine_analyses(all_analyses)

        # Check thresholds
        threshold_violations = self.check_thresholds(combined_analysis)

        # Generate comprehensive repuerto
        combined_results = \\\\{"scans": [result[0] for result in all_results]\\\\}
        repuerto, repuerto_file = self.generate_repuerto(combined_analysis, combined_results, "combined-assessment.json")

        # Send notifications
        self.send_notifications(combined_analysis, repuerto_file, threshold_violations)

        # Log summary
        self.logger.info(f"Assessment completed. Total vulnerabilities: \\\\{combined_analysis['total_vulnerabilities']\\\\}")
        if threshold_violations:
            self.logger.warning(f"Threshold violations: \\\\{threshold_violations\\\\}")

        return True

    def combine_analyses(self, analyses):
        """Combine multiple scan analyses"""

        combined = \\\\{
            "timestamp": datetime.now().isoformat(),
            "total_vulnerabilities": 0,
            "severity_breakdown": \\\\{
                "critical": 0,
                "high": 0,
                "medium": 0,
                "low": 0
            \\\\},
            "category_breakdown": \\\\{\\\\},
            "critical_vulnerabilities": [],
            "high_vulnerabilities": [],
            "recommendations": []
        \\\\}

        # Combine all analyses
        for analysis in analyses:
            combined["total_vulnerabilities"] += analysis["total_vulnerabilities"]

            # Combine severity counts
            for severity, count in analysis["severity_breakdown"].items():
                combined["severity_breakdown"][severity] += count

            # Combine category counts
            for category, count in analysis["category_breakdown"].items():
                if category not in combined["category_breakdown"]:
                    combined["category_breakdown"][category] = 0
                combined["category_breakdown"][category] += count

            # Combine vulnerabilities
            combined["critical_vulnerabilities"].extend(analysis["critical_vulnerabilities"])
            combined["high_vulnerabilities"].extend(analysis["high_vulnerabilities"])

        # Generate combined recommendations
        combined["recommendations"] = self.generate_recommendations(combined)

        return combined

# uso
if __name__ == "__main__":
    parser = argparse.ArgumentParser(Descripción="Comprehensive Kube-hunter Security Assessment")
    parser.add_argument("--config", help="configuración file path")
    parser.add_argument("--modes", nargs="+", choices=["internal", "remote", "pod", "network"],
                       default=["internal"], help="Scanning modes")
    parser.add_argument("--objetivos", nargs="+", help="objetivos for remote/escaneo de red")

    args = parser.parse_args()

    assessment = KubeHunterAssessment(args.config)
    success = assessment.run_comprehensive_assessment(args.modes, args.objetivos)

    sys.exit(0 if success else 1)

CI/CD Integration Script

#!/usr/bin/env python3
# Kube-hunter CI/CD integration

impuerto subproceso
impuerto json
impuerto sys
impuerto os

class KubeHunterCICD:
    def __init__(self, fail_on_high=True, fail_on_medium=False):
        self.fail_on_high = fail_on_high
        self.fail_on_medium = fail_on_medium

    def run_security_scan(self, mode="internal", objetivo=None):
        """Run Kube-hunter security scan in CI/CD"""

        cmd = ["kube-hunter", "--repuerto", "json"]

        if mode == "remote" and objetivo:
            cmd.extend(["--remote", objetivo])
        elif mode == "internal":
            cmd.append("--internal")
        elif mode == "pod":
            cmd.append("--pod")

        try:
            result = subproceso.run(cmd, capture_output=True, text=True, timeout=300)

            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                print(f"Kube-hunter scan failed: \\\\{result.stderr\\\\}")
                return None

        except Exception as e:
            print(f"Error running Kube-hunter: \\\\{e\\\\}")
            return None

    def evaluate_results(self, scan_results):
        """Evaluate scan results for CI/CD pipeline"""

        if not scan_results:
            return False, "Scan failed"

        vulnerabilities = scan_results.get("vulnerabilities", [])

        high_count = len([v for v in vulnerabilities if v.get("severity") == "high"])
        medium_count = len([v for v in vulnerabilities if v.get("severity") == "medium"])
        critical_count = len([v for v in vulnerabilities if v.get("severity") == "critical"])

        # Check failure conditions
        if critical_count > 0:
            return False, f"Critical vulnerabilities found: \\\\{critical_count\\\\}"

        if self.fail_on_high and high_count > 0:
            return False, f"High severity vulnerabilities found: \\\\{high_count\\\\}"

        if self.fail_on_medium and medium_count > 0:
            return False, f"Medium severity vulnerabilities found: \\\\{medium_count\\\\}"

        return True, f"Security scan passed. Vulnerabilities: \\\\{len(vulnerabilities)\\\\}"

    def generate_ci_repuerto(self, scan_results, output_file="kube-hunter-ci-repuerto.json"):
        """Generate CI/CD friendly repuerto"""

        if not scan_results:
            return None

        vulnerabilities = scan_results.get("vulnerabilities", [])

        repuerto = \\\\{
            "summary": \\\\{
                "total_vulnerabilities": len(vulnerabilities),
                "critical": len([v for v in vulnerabilities if v.get("severity") == "critical"]),
                "high": len([v for v in vulnerabilities if v.get("severity") == "high"]),
                "medium": len([v for v in vulnerabilities if v.get("severity") == "medium"]),
                "low": len([v for v in vulnerabilities if v.get("severity") == "low"])
            \\\\},
            "vulnerabilities": vulnerabilities,
            "scan_metadata": scan_results.get("metadata", \\\\{\\\\})
        \\\\}

        with open(output_file, 'w') as f:
            json.dump(repuerto, f, indent=2)

        return repuerto

# uso in CI/CD
if __name__ == "__main__":
    cicd = KubeHunterCICD(fail_on_high=True, fail_on_medium=False)

    # Run scan
    results = cicd.run_security_scan("internal")

    # Evaluate results
    passed, message = cicd.evaluate_results(results)

    # Generate repuerto
    cicd.generate_ci_repuerto(results)

    print(message)
    sys.exit(0 if passed else 1)

Integration ejemplos

Prometheus Monitoring

# kube-hunter-expuertoer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kube-hunter-expuertoer
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kube-hunter-expuertoer
  template:
    metadata:
      labels:
        app: kube-hunter-expuertoer
    spec:
      servicioAccountName: kube-hunter-expuertoer
      containers:
      - name: kube-hunter-expuertoer
        image: custom/kube-hunter-expuertoer:latest
        puertos:
        - containerpuerto: 8080
        env:
        - name: SCAN_INTERVAL
          value: "3600"  # 1 hour
        - name: METRICS_puerto
          value: "8080"
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"

---
apiVersion: v1
kind: servicio
metadata:
  name: kube-hunter-expuertoer
  namespace: monitoring
  labels:
    app: kube-hunter-expuertoer
spec:
  puertos:
  - puerto: 8080
    objetivopuerto: 8080
    name: metrics
  selector:
    app: kube-hunter-expuertoer

---
apiVersion: monitoring.coreos.com/v1
kind: servicioMonitor
metadata:
  name: kube-hunter-expuertoer
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: kube-hunter-expuertoer
  endpoints:
  - puerto: metrics
    interval: 60s
    path: /metrics

Grafana Dashboard

\\\\{
  "dashboard": \\\\{
    "title": "Kube-hunter Security Dashboard",
    "panels": [
      \\\\{
        "title": "Total Vulnerabilities",
        "type": "stat",
        "objetivos": [
          \\\\{
            "expr": "kube_hunter_vulnerabilities_total",
            "legendFormat": "Total Vulnerabilities"
          \\\\}
        ]
      \\\\},
      \\\\{
        "title": "Vulnerabilities by Severity",
        "type": "piechart",
        "objetivos": [
          \\\\{
            "expr": "kube_hunter_vulnerabilities_by_severity",
            "legendFormat": "\\\\{\\\\{severity\\\\}\\\\}"
          \\\\}
        ]
      \\\\},
      \\\\{
        "title": "Security Scan History",
        "type": "graph",
        "objetivos": [
          \\\\{
            "expr": "kube_hunter_scan_duration_seconds",
            "legendFormat": "Scan Duration"
          \\\\}
        ]
      \\\\}
    ]
  \\\\}
\\\\}

solución de problemas

Common Issues

Permission Problems:

# Check Kubernetes access
kubectl auth can-i get pods
kubectl auth can-i list servicios

# Verify servicio account permissions
kubectl describe servicioaccount kube-hunter -n security
kubectl describe clusterrolebinding kube-hunter

# Check pod security policies
kubectl get psp
kubectl describe psp restricted

# Test network connectivity
kubectl run test-pod --image=busybox --rm -it -- nslookup kubernetes.default.svc.cluster.local

Network Connectivity Issues:

# Check cluster networking
kubectl get nodes -o wide
kubectl get servicios -A

# Test API server connectivity
curl -k https://kubernetes.default.svc.cluster.local:443/version

# Check DNS resolution
nslookup kubernetes.default.svc.cluster.local

# Verify network policies
kubectl get networkpolicies -A
kubectl describe networkpolicy -n kube-system

Scanning Problems:

# Run with debug logging
kube-hunter --internal --log DEBUG

# Check for timeouts
kube-hunter --internal --timeout 60

# Test specific components
kube-hunter --internal --servicios api-server

# Verify output format
kube-hunter --internal --repuerto json|jq '.'

# Check for missing dependencies
pip list|grep kube-hunter
pip install --upgrade kube-hunter

Performance Optimization

Optimizing Kube-hunter performance:

# Reduce scan scope
kube-hunter --internal --quick

# Limit hilo count
kube-hunter --internal --hilos 10

# Use specific interface
kube-hunter --internal --interface eth0

# Skip slow checks
kube-hunter --internal --skip-slow-checks

# Optimize for CI/CD
kube-hunter --pod --repuerto json --timeout 120

Security Considerations

Safe uso Practices

Scanning Ethics: - Only scan clusters you own or have explicit permission to test - Coordinate with operations teams before running scans - Use read-only scanning modes when possible - Implement proper Control de Accesos for scan results - Regular updates of Kube-hunter and its dependencies

Data Protection: - Encrypt sensitive scan results and repuertos - Implement secure storage for vulnerabilidad data - Use secure channels for transmitting repuertos - Regular cleanup of temporary files and logs - Implement data retention policies for compliance

Operational Security

Monitoring and Alerting: - Monitor Kube-hunter execution and results - Set up alerting for critical security findings - Track vulnerabilidad trends over time - Implement automated remediation workflows - Regular review of security configuracións

Integration Security: - Secure CI/CD pipeline integration - Protect servicio account tokens and credenciales - Implement proper RBAC for Kubernetes deployments - Monitor for unauthorized Kube-hunter uso - Regular security assessment of scanning infrastructure

referencias

  1. Kube-hunter GitHub Repository
  2. Kubernetes Security Best Practices
  3. CIS Kubernetes Benchmark
  4. NIST Seguridad de Contenedores Guide
  5. Aqua Security documentación