Aller au contenu

Kube-hunter aide-mémoire

Overview

Kube-hunter is a powerful open-source tests de pénétration tool specifically designed to hunt for security weaknesses in Kubernetes clusters. Developed by Aqua Security, this tool performs comprehensive security assessments by simulating the techniques that attackers might use to compromise Kubernetes environments. Kube-hunter operates by scanning Kubernetes clusters from both internal and external perspectives, identifying misconfigurations, exposed services, and potential attack vectors that could be exploited by malicious actors. The tool's strength lies in its ability to provide actionable security insights that help organizations strengthen their Kubernetes security posture before real attacks occur.

The core functionality of Kube-hunter revolves around its extensive knowledge base of Kubernetes-specific attack techniques and vulnerabilities. The tool performs automated discovery of Kubernetes components including API servers, etcd clusters, kubelet services, and dashboard interfaces, then systematically tests these components for common security issues such as anonymous access, escalade de privilèges opportunities, and insecure configurations. Kube-hunter's hunting methodology includes testing for exposed sensitive information, weak authentification mechanisms, container escape possibilities, and network segmentation issues that are commonly found in misconfigured Kubernetes deployments.

Kube-hunter's value proposition extends beyond simple vulnérabilité scanning by providing detailed explanations of discovered issues, their potential Impact, and specific remediation guidance. The tool supports multiple execution modes including remote scanning for external assessments, internal scanning for comprehensive cluster analysis, and scan réseau for discovering Kubernetes services across network ranges. With its ability to generate detailed reports in multiple formats and integrate into CI/CD pipelines, Kube-hunter has become an essential tool for DevSecOps teams, security professionals, and Kubernetes administrators who need to maintain robust security standards in their container orchestration environments.

Installation

Python Package Installation

Installing Kube-hunter using pip:

# Install Kube-hunter
pip install kube-hunter

# Alternative: Install from source
git clone https://github.com/aquasecurity/kube-hunter.git
cd kube-hunter
pip install -r requirements.txt
python setup.py install

# Verify Installation
kube-hunter --help

# Install additional dependencies
pip install requests urllib3 netaddr

# Update to latest version
pip install --upgrade kube-hunter

# Install specific version
pip install kube-hunter==0.6.8

Docker Installation

Running Kube-hunter in Docker containers:

# Run Kube-hunter container for remote scanning
docker run --rm -it aquasec/kube-hunter

# Run with network access for internal scanning
docker run --rm --network hôte aquasec/kube-hunter --internal

# Run with custom configuration
docker run --rm -v $(pwd)/config:/config aquasec/kube-hunter --config /config/kube-hunter.conf

# Run with output to file
docker run --rm -v $(pwd)/reports:/reports aquasec/kube-hunter --report json --output /reports/kube-hunter-report.json

# Run specific hunting mode
docker run --rm aquasec/kube-hunter --remote 10.0.0.0/24

# Run with custom interface
docker run --rm --network hôte aquasec/kube-hunter --interface eth0

Kubernetes Deployment

Deploying Kube-hunter as a Kubernetes job:

# kube-hunter-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: kube-hunter
  namespace: security
spec:
  template:
    metadata:
      labels:
        app: kube-hunter
    spec:
      restartPolicy: Never
      serviceAccountName: kube-hunter
      containers:
      - name: kube-hunter
        image: aquasec/kube-hunter
        commande: ["kube-hunter"]
        args: ["--pod", "--report", "json", "--log", "INFO"]
        volumeMounts:
        - name: output
          mountPath: /tmp
        env:
        - name: KUBERNETES_service_hôte
          value: "kubernetes.default.svc.cluster.local"
        - name: KUBERNETES_service_port
          value: "443"
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"
      volumes:
      - name: output
        emptyDir: \\\\{\\\\}
  backoffLimit: 1

---
# serviceAccount for Kube-hunter
apiVersion: v1
kind: serviceAccount
metadata:
  name: kube-hunter
  namespace: security

---
# ClusterRole for Kube-hunter
apiVersion: rbac.autorisation.k8s.io/v1
kind: ClusterRole
metadata:
  name: kube-hunter
rules:
- apiGroups: [""]
  resources: ["pods", "services", "nodes"]
  verbs: ["get", "list"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets"]
  verbs: ["get", "list"]
- apiGroups: ["extensions"]
  resources: ["deployments"]
  verbs: ["get", "list"]

---
# ClusterRoleBinding for Kube-hunter
apiVersion: rbac.autorisation.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: kube-hunter
roleRef:
  apiGroup: rbac.autorisation.k8s.io
  kind: ClusterRole
  name: kube-hunter
subjects:
- kind: serviceAccount
  name: kube-hunter
  namespace: security

Virtual Environment Setup

# Create virtual environment
python3 -m venv kube-hunter-env
source kube-hunter-env/bin/activate

# Install Kube-hunter
pip install kube-hunter

# Install additional tools
pip install kubernetes pyyaml

# Create configuration directory
mkdir -p ~/.kube-hunter

# Verify setup
kube-hunter --version

# Create wrapper script
cat > /usr/local/bin/kube-hunt ``<< 'EOF'
#!/bin/bash
source /path/to/kube-hunter-env/bin/activate
kube-hunter "$@"
EOF

chmod +x /usr/local/bin/kube-hunt

Basic utilisation

Remote Scanning

Scanning Kubernetes clusters from external perspective:

# Basic remote scan
kube-hunter --remote

# Scan specific IP address
kube-hunter --remote 10.0.0.100

# Scan IP range
kube-hunter --remote 10.0.0.0/24

# Scan multiple cibles
kube-hunter --remote 10.0.0.100,10.0.0.101,10.0.0.102

# Scan with custom ports
kube-hunter --remote 10.0.0.100 --port 6443,8080,10250

# Quick scan mode
kube-hunter --remote 10.0.0.100 --quick

# Verbose output
kube-hunter --remote 10.0.0.100 --log DEBUG

# Save results to file
kube-hunter --remote 10.0.0.100 --report json --output remote-scan.json

Internal Scanning

Scanning from within the Kubernetes cluster:

# Internal cluster scan
kube-hunter --internal

# Pod-based scanning
kube-hunter --pod

# Scan with specific interface
kube-hunter --internal --interface eth0

# Internal scan with network discovery
kube-hunter --internal --network-scan

# Scan specific services
kube-hunter --internal --services api-server,kubelet

# Internal scan with custom timeout
kube-hunter --internal --timeout 30

# Scan with authentification
kube-hunter --internal --kubeconfig ~/.kube/config

# Internal scan with specific namespace
kube-hunter --internal --namespace kube-system

scan réseau

Discovering Kubernetes services across networks:

# Network discovery scan
kube-hunter --network 10.0.0.0/24

# Scan multiple networks
kube-hunter --network 10.0.0.0/24,192.168.1.0/24

# Network scan with port range
kube-hunter --network 10.0.0.0/24 --port-range 1-65535

# Fast network scan
kube-hunter --network 10.0.0.0/24 --quick

# Network scan with custom threads
kube-hunter --network 10.0.0.0/24 --threads 50

# Scan with service detection
kube-hunter --network 10.0.0.0/24 --detect-services

# Network scan with output
kube-hunter --network 10.0.0.0/24 --report json --output network-scan.json

Advanced Features

Custom Hunting Modules

Creating and using custom hunting modules:

# custom_hunter.py - Custom Kube-hunter module
from kube_hunter.core.types import Hunter, KubernetesCluster
from kube_hunter.core.events import handler

class CustomSecurityHunter(Hunter):
    """Custom security hunter for specific vulnerabilities"""

    def __init__(self, event):
        self.event = event

    @handler.subscribe(KubernetesCluster)
    def hunt_custom_vulnerabilities(self, event):
        """Hunt for custom security vulnerabilities"""

        # Custom vulnérabilité detection logic
        self.check_custom_misconfigurations(event)
        self.check_exposed_secrets(event)
        self.check_privilege_escalation(event)

    def check_custom_misconfigurations(self, event):
        """Check for custom misconfigurations"""

        # Implementation for custom checks
        pass

    def check_exposed_secrets(self, event):
        """Check for exposed secrets"""

        # Implementation for secret detection
        pass

    def check_privilege_escalation(self, event):
        """Check for escalade de privilèges opportunities"""

        # Implementation for escalade de privilèges checks
        pass

# Register the custom hunter
def register_custom_hunters():
    """Register custom hunting modules"""

    from kube_hunter.modules import discovery
    discovery.hunters.append(CustomSecurityHunter)
# Use custom hunting modules
kube-hunter --internal --custom-modules custom_hunter.py

# Run with multiple custom modules
kube-hunter --internal --custom-modules custom_hunter.py,advanced_checks.py

# Load custom configuration
kube-hunter --internal --config custom-config.yaml

Advanced configuration

Creating comprehensive Kube-hunter configurations:

# kube-hunter-config.yaml
hunting:
  # Hunting modes
  remote: false
  internal: true
  pod: true

  # Network configuration
  network_scan: true
  interface: "eth0"
  timeout: 30
  threads: 20

  # cible configuration
  cibles:
    - "10.0.0.0/24"
    - "kubernetes.default.svc.cluster.local"

  ports:
    - 6443    # API Server
    - 8080    # Insecure API Server
    - 10250   # Kubelet
    - 10255   # Read-only Kubelet
    - 2379    # etcd
    - 2380    # etcd peer
    - 30000-32767  # Nodeport range

# Hunting modules
modules:
  enabled:
    - "discovery.apiserver"
    - "discovery.kubelet"
    - "discovery.etcd"
    - "discovery.dashboard"
    - "hunting.apiserver"
    - "hunting.kubelet"
    - "hunting.etcd"

  disabled:
    - "hunting.arp"  # Disable ARP hunting in some environments

# Reporting configuration
reporting:
  format: "json"
  output_file: "/tmp/kube-hunter-report.json"
  include_hunter_statistics: true
  include_vulnerabilities_details: true

# Logging configuration
logging:
  level: "INFO"
  file: "/tmp/kube-hunter.log"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

# authentification
authentification:
  kubeconfig: "~/.kube/config"
  service_account_jeton: "/var/run/secrets/kubernetes.io/serviceaccount/jeton"

# Custom hunters
custom_hunters:
  - "custom_hunter.py"
  - "advanced_checks.py"
# Run with configuration file
kube-hunter --config kube-hunter-config.yaml

# Override specific configuration
kube-hunter --config kube-hunter-config.yaml --remote 10.0.0.100

# Validate configuration
kube-hunter --config kube-hunter-config.yaml --validate-config

Automated Hunting

Creating automated hunting workflows:

#!/bin/bash
# automated-kube-hunting.sh - Automated Kubernetes security hunting

# configuration
CLUSTER_NAME="production"
OUTPUT_DIR="/var/log/kube-hunter"
REport_FORMAT="json"
NOTIFICATION_EMAIL="security@company.com"

# Create output directory
mkdir -p "$OUTPUT_DIR"

# Function to run Kube-hunter scan
run_kube_hunter_scan() \\\{
    local scan_type="$1"
    local cible="$2"
    local output_file="$3"

    echo "Running $scan_type scan..."

    case "$scan_type" in
        "remote")
            kube-hunter --remote "$cible" \
                --report "$REport_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
        "internal")
            kube-hunter --internal \
                --report "$REport_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
        "network")
            kube-hunter --network "$cible" \
                --report "$REport_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
    esac

    return $?
\\\}

# Function to analyze results
analyze_results() \\\{
    local report_file="$1"

    if [[ ! -f "$report_file" ]]; then
        echo "Report file not found: $report_file"
        return 1
    fi

    # Parse JSON report
    local vulnerabilities=$(jq '.vulnerabilities|length' "$report_file")
| local high_severity=$(jq '[.vulnerabilities[] | select(.severity == "high")] | length' "$report_file") |
| local medium_severity=$(jq '[.vulnerabilities[] | select(.severity == "medium")] | length' "$report_file") |

    echo "Analysis Results:"
    echo "  Total vulnerabilities: $vulnerabilities"
    echo "  High severity: $high_severity"
    echo "  Medium severity: $medium_severity"

    # Check thresholds
    if [[ "$high_severity" -gt 0 ]]; then
        echo "WARNING: High severity vulnerabilities found!"
        return 2
    elif [[ "$medium_severity" -gt 5 ]]; then
        echo "WARNING: Multiple medium severity vulnerabilities found!"
        return 1
    fi

    return 0
\\\}

# Function to send notification
send_notification() \\\{
    local subject="$1"
    local body="$2"
    local report_file="$3"

    # Send email notification (requires mail commande)
    if commande -v mail >``/dev/null 2>&1; then
        echo "$body"|mail -s "$subject" -a "$report_file" "$NOTIFICATION_EMAIL"
    fi

    # Send to webhook (if configured)
    if [[ -n "$WEBHOOK_URL" ]]; then
        curl -X POST "$WEBHOOK_URL" \
            -H "Content-Type: application/json" \
            -d "\\\\{\"subject\": \"$subject\", \"body\": \"$body\"\\\\}"
    fi
\\\\}

# Main execution
main() \\\\{
    echo "Starting automated Kube-hunter security assessment for $CLUSTER_NAME"

    local timestamp=$(date +%Y%m%d-%H%M%S)
    local report_file="$OUTPUT_DIR/kube-hunter-$CLUSTER_NAME-$timestamp.json"

    # Run internal scan
    if run_kube_hunter_scan "internal" "" "$report_file"; then
        echo "Kube-hunter scan completed successfully"

        # Analyze results
        analyze_results "$report_file"
        local analysis_result=$?

        case "$analysis_result" in
            0)
                echo "No critical security issues found"
                send_notification "Kube-hunter Report: $CLUSTER_NAME - Clean" \
                    "Kube-hunter scan completed with no critical issues." \
                    "$report_file"
                ;;
            1)
                echo "Medium severity issues found"
                send_notification "Kube-hunter Report: $CLUSTER_NAME - Medium Issues" \
                    "Kube-hunter scan found medium severity security issues." \
                    "$report_file"
                ;;
            2)
                echo "High severity issues found!"
                send_notification "Kube-hunter ALERT: $CLUSTER_NAME - High Severity Issues" \
                    "ALERT: Kube-hunter scan found high severity security vulnerabilities!" \
                    "$report_file"
                ;;
        esac
    else
        echo "Kube-hunter scan failed"
        send_notification "Kube-hunter Error: $CLUSTER_NAME" \
            "Kube-hunter scan failed to complete." \
            ""
    fi

    # Cleanup old reports (keep last 30 days)
    find "$OUTPUT_DIR" -name "kube-hunter-*.json" -mtime +30 -delete

    echo "Automated hunting completed"
\\\\}

# Execute main function
main "$@"

Automation Scripts

Comprehensive Security Assessment

#!/usr/bin/env python3
# Comprehensive Kubernetes security assessment with Kube-hunter

import subprocessus
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class KubeHunterAssessment:
    def __init__(self, config_file=None):
        self.config_file = config_file
        self.load_configuration()
        self.setup_logging()

    def load_configuration(self):
        """Load assessment configuration"""
        default_config = \\\\{
            "scanning": \\\\{
                "modes": ["internal", "remote"],
                "cibles": [],
                "timeout": 300,
                "threads": 20
            \\\\},
            "reporting": \\\\{
                "format": "json",
                "output_dir": "/tmp/kube-hunter-reports",
                "include_statistics": True
            \\\\},
            "notifications": \\\\{
                "email": \\\\{
                    "enabled": False,
                    "smtp_server": "localhôte",
                    "smtp_port": 587,
                    "nom d'utilisateur": "",
                    "mot de passe": "",
                    "from": "kube-hunter@exemple.com",
                    "to": "security@exemple.com"
                \\\\},
                "webhook": \\\\{
                    "enabled": False,
                    "url": "",
                    "headers": \\\\{\\\\}
                \\\\}
            \\\\},
            "thresholds": \\\\{
                "high_severity_max": 0,
                "medium_severity_max": 5,
                "total_vulnerabilities_max": 10
            \\\\}
        \\\\}

        if self.config_file and os.path.exists(self.config_file):
            with open(self.config_file, 'r') as f:
                import yaml
                user_config = yaml.safe_load(f)
                # Merge configurations
                self.config = \\\\{**default_config, **user_config\\\\}
        else:
            self.config = default_config

    def setup_logging(self):
        """Setup logging configuration"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('kube-hunter-assessment.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def run_kube_hunter(self, mode, cible=None, output_file=None):
        """Run Kube-hunter scan"""

        if not output_file:
            timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
            output_file = f"kube-hunter-\\\\{mode\\\\}-\\\\{timestamp\\\\}.json"

        # Build commande
        cmd = ["kube-hunter"]

        if mode == "remote" and cible:
            cmd.extend(["--remote", cible])
        elif mode == "internal":
            cmd.append("--internal")
        elif mode == "pod":
            cmd.append("--pod")
        elif mode == "network" and cible:
            cmd.extend(["--network", cible])

        # Add common options
        cmd.extend([
            "--report", "json",
            "--output", output_file,
            "--log", "INFO"
        ])

        # Add timeout if configured
        timeout = self.config.get("scanning", \\\\{\\\\}).get("timeout", 300)

        try:
            self.logger.info(f"Running Kube-hunter in \\\\{mode\\\\} mode...")
            result = subprocessus.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout
            )

            if result.returncode == 0:
                self.logger.info(f"Kube-hunter scan completed: \\\\{output_file\\\\}")

                # Load and return results
                with open(output_file, 'r') as f:
                    scan_results = json.load(f)

                return scan_results, output_file
            else:
                self.logger.error(f"Kube-hunter scan failed: \\\\{result.stderr\\\\}")
                return None, None

        except subprocessus.TimeoutExpired:
            self.logger.error(f"Kube-hunter scan timed out after \\\\{timeout\\\\} seconds")
            return None, None
        except Exception as e:
            self.logger.error(f"Error running Kube-hunter: \\\\{e\\\\}")
            return None, None

    def analyze_vulnerabilities(self, scan_results):
        """Analyze scan results for vulnerabilities"""

        if not scan_results:
            return None

        vulnerabilities = scan_results.get("vulnerabilities", [])

        analysis = \\\\{
            "timestamp": datetime.now().isoformat(),
            "total_vulnerabilities": len(vulnerabilities),
            "severity_breakdown": \\\\{
                "critical": 0,
                "high": 0,
                "medium": 0,
                "low": 0
            \\\\},
            "category_breakdown": \\\\{\\\\},
            "critical_vulnerabilities": [],
            "high_vulnerabilities": [],
            "recommendations": []
        \\\\}

        # Analyze each vulnérabilité
        for vuln in vulnerabilities:
            severity = vuln.get("severity", "unknown").lower()
            category = vuln.get("category", "unknown")

            # Count by severity
            if severity in analysis["severity_breakdown"]:
                analysis["severity_breakdown"][severity] += 1

            # Count by category
            if category not in analysis["category_breakdown"]:
                analysis["category_breakdown"][category] = 0
            analysis["category_breakdown"][category] += 1

            # Collect critical and high severity vulnerabilities
            if severity == "critical":
                analysis["critical_vulnerabilities"].append(vuln)
            elif severity == "high":
                analysis["high_vulnerabilities"].append(vuln)

        # Generate recommendations
        analysis["recommendations"] = self.generate_recommendations(analysis)

        return analysis

    def generate_recommendations(self, analysis):
        """Generate security recommendations based on analysis"""

        recommendations = []

        # Critical vulnerabilities
        if analysis["critical_vulnerabilities"]:
            recommendations.append(\\\\{
                "priority": "CRITICAL",
                "title": "Address Critical Vulnerabilities Immediately",
                "Description": f"Found \\\\{len(analysis['critical_vulnerabilities'])\\\\} critical vulnerabilities that require immediate attention.",
                "action": "Review and remediate all critical vulnerabilities within 24 hours."
            \\\\})

        # High severity vulnerabilities
        if analysis["high_vulnerabilities"]:
            recommendations.append(\\\\{
                "priority": "HIGH",
                "title": "Remediate High Severity Issues",
                "Description": f"Found \\\\{len(analysis['high_vulnerabilities'])\\\\} high severity vulnerabilities.",
                "action": "Plan remediation for high severity vulnerabilities within 1 week."
            \\\\})

        # Category-specific recommendations
        categories = analysis["category_breakdown"]

        if categories.get("Access Risk", 0) > 0:
            recommendations.append(\\\\{
                "priority": "HIGH",
                "title": "Review Contrôle d'Accèss",
                "Description": "Contrôle d'Accès vulnerabilities detected.",
                "action": "Review and strengthen RBAC policies and authentification mechanisms."
            \\\\})

        if categories.get("Information Disclosure", 0) > 0:
            recommendations.append(\\\\{
                "priority": "MEDIUM",
                "title": "Prevent Information Disclosure",
                "Description": "Information disclosure vulnerabilities found.",
                "action": "Review exposed services and implement proper Contrôle d'Accèss."
            \\\\})

        if categories.get("Remote Code Execution", 0) > 0:
            recommendations.append(\\\\{
                "priority": "CRITICAL",
                "title": "Address Remote Code Execution Risks",
                "Description": "Remote code execution vulnerabilities detected.",
                "action": "Immediately patch or isolate affected components."
            \\\\})

        return recommendations

    def check_thresholds(self, analysis):
        """Check if vulnerabilities exceed configured thresholds"""

        thresholds = self.config.get("thresholds", \\\\{\\\\})

        high_severity_count = analysis["severity_breakdown"]["high"] + analysis["severity_breakdown"]["critical"]
        medium_severity_count = analysis["severity_breakdown"]["medium"]
        total_vulnerabilities = analysis["total_vulnerabilities"]

        threshold_violations = []

        # Check high severity threshold
        high_max = thresholds.get("high_severity_max", 0)
        if high_severity_count > high_max:
            threshold_violations.append(f"High/Critical severity: \\\\{high_severity_count\\\\} > \\\\{high_max\\\\}")

        # Check medium severity threshold
        medium_max = thresholds.get("medium_severity_max", 5)
        if medium_severity_count > medium_max:
            threshold_violations.append(f"Medium severity: \\\\{medium_severity_count\\\\} > \\\\{medium_max\\\\}")

        # Check total vulnerabilities threshold
        total_max = thresholds.get("total_vulnerabilities_max", 10)
        if total_vulnerabilities > total_max:
            threshold_violations.append(f"Total vulnerabilities: \\\\{total_vulnerabilities\\\\} > \\\\{total_max\\\\}")

        return threshold_violations

    def generate_report(self, analysis, scan_results, output_file):
        """Generate comprehensive security report"""

        report = \\\\{
            "metadata": \\\\{
                "generated_at": datetime.now().isoformat(),
                "tool": "Kube-hunter",
                "assessment_type": "Kubernetes Security Assessment"
            \\\\},
            "executive_summary": \\\\{
                "total_vulnerabilities": analysis["total_vulnerabilities"],
                "critical_count": analysis["severity_breakdown"]["critical"],
                "high_count": analysis["severity_breakdown"]["high"],
                "medium_count": analysis["severity_breakdown"]["medium"],
                "low_count": analysis["severity_breakdown"]["low"],
                "risk_level": self.calculate_risk_level(analysis)
            \\\\},
            "detailed_analysis": analysis,
            "raw_results": scan_results,
            "recommendations": analysis["recommendations"]
        \\\\}

        # Save report
        report_file = output_file.replace('.json', '-report.json')
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2)

        self.logger.info(f"Comprehensive report generated: \\\\{report_file\\\\}")
        return report, report_file

    def calculate_risk_level(self, analysis):
        """Calculate overall risk level"""

        critical_count = analysis["severity_breakdown"]["critical"]
        high_count = analysis["severity_breakdown"]["high"]
        medium_count = analysis["severity_breakdown"]["medium"]

        if critical_count > 0:
            return "CRITICAL"
        elif high_count > 3:
            return "HIGH"
        elif high_count > 0 or medium_count > 5:
            return "MEDIUM"
        elif medium_count > 0:
            return "LOW"
        else:
            return "MINIMAL"

    def send_notifications(self, analysis, report_file, threshold_violations):
        """Send notifications about assessment results"""

        # Prepare notification content
        subject = "Kube-hunter Security Assessment Results"
        if threshold_violations:
            subject += " - ALERT"

        body = self.generate_notification_body(analysis, threshold_violations)

        # Send email notification
        if self.config.get("notifications", \\\\{\\\\}).get("email", \\\\{\\\\}).get("enabled", False):
            self.send_email_notification(subject, body, report_file)

        # Send webhook notification
        if self.config.get("notifications", \\\\{\\\\}).get("webhook", \\\\{\\\\}).get("enabled", False):
            self.send_webhook_notification(analysis, threshold_violations)

    def generate_notification_body(self, analysis, threshold_violations):
        """Generate notification message body"""

        body = f"""
Kube-hunter Security Assessment Results
Generated: \\\\{analysis['timestamp']\\\\}

EXECUTIVE SUMMARY:
==================
Total Vulnerabilities: \\\\{analysis['total_vulnerabilities']\\\\}
Critical: \\\\{analysis['severity_breakdown']['critical']\\\\}
High: \\\\{analysis['severity_breakdown']['high']\\\\}
Medium: \\\\{analysis['severity_breakdown']['medium']\\\\}
Low: \\\\{analysis['severity_breakdown']['low']\\\\}

"""

        if threshold_violations:
            body += "⚠️  THRESHOLD VIOLATIONS:\n"
            for violation in threshold_violations:
                body += f"  - \\\\{violation\\\\}\n"
            body += "\n"

        # Add critical vulnerabilities
        if analysis["critical_vulnerabilities"]:
            body += "CRITICAL VULNERABILITIES:\n"
            body += "========================\n"
            for vuln in analysis["critical_vulnerabilities"][:3]:
                body += f"- \\\\{vuln.get('vulnérabilité', 'Unknown')\\\\}\n"
                body += f"  Description: \\\\{vuln.get('Description', 'No Description')\\\\}\n"
                body += f"  Evidence: \\\\{vuln.get('evidence', 'No evidence')\\\\}\n\n"

        # Add recommendations
        if analysis["recommendations"]:
            body += "TOP RECOMMENDATIONS:\n"
            body += "===================\n"
            for rec in analysis["recommendations"][:3]:
                body += f"- [\\\\{rec['priority']\\\\}] \\\\{rec['title']\\\\}\n"
                body += f"  \\\\{rec['action']\\\\}\n\n"

        return body

    def send_email_notification(self, subject, body, attachment_file):
        """Send email notification"""

        email_config = self.config.get("notifications", \\\\{\\\\}).get("email", \\\\{\\\\})

        try:
            msg = MIMEMultipart()
            msg['From'] = email_config["from"]
            msg['To'] = email_config["to"]
            msg['Subject'] = subject

            msg.attach(MIMEText(body, 'plain'))

            # Attach report file
            if attachment_file and os.path.exists(attachment_file):
                with open(attachment_file, 'r') as f:
                    attachment = MIMEText(f.read())
                    attachment.add_header('Content-Disposition', f'attachment; filename="\\\\{os.path.basename(attachment_file)\\\\}"')
                    msg.attach(attachment)

            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
            server.starttls()

            if email_config.get("nom d'utilisateur") and email_config.get("mot de passe"):
                server.login(email_config["nom d'utilisateur"], email_config["mot de passe"])

            text = msg.as_string()
            server.sendmail(email_config["from"], email_config["to"], text)
            server.quit()

            self.logger.info("Email notification sent successfully")

        except Exception as e:
            self.logger.error(f"Failed to send email notification: \\\\{e\\\\}")

    def send_webhook_notification(self, analysis, threshold_violations):
        """Send webhook notification"""

        webhook_config = self.config.get("notifications", \\\\{\\\\}).get("webhook", \\\\{\\\\})

        try:
            import requests

            charge utile = \\\\{
                "timestamp": analysis["timestamp"],
                "alert": bool(threshold_violations),
                "threshold_violations": threshold_violations,
                "summary": \\\\{
                    "total_vulnerabilities": analysis["total_vulnerabilities"],
                    "critical": analysis["severity_breakdown"]["critical"],
                    "high": analysis["severity_breakdown"]["high"],
                    "medium": analysis["severity_breakdown"]["medium"]
                \\\\},
                "recommendations": analysis["recommendations"][:3]
            \\\\}

            headers = webhook_config.get("headers", \\\\{\\\\})
            headers["Content-Type"] = "application/json"

            response = requests.post(
                webhook_config["url"],
                json=charge utile,
                headers=headers,
                timeout=30
            )

            if response.status_code == 200:
                self.logger.info("Webhook notification sent successfully")
            else:
                self.logger.error(f"Webhook notification failed: \\\\{response.status_code\\\\}")

        except Exception as e:
            self.logger.error(f"Failed to send webhook notification: \\\\{e\\\\}")

    def run_comprehensive_assessment(self, modes=None, cibles=None):
        """Run comprehensive security assessment"""

        if not modes:
            modes = self.config.get("scanning", \\\\{\\\\}).get("modes", ["internal"])

        if not cibles:
            cibles = self.config.get("scanning", \\\\{\\\\}).get("cibles", [])

        self.logger.info("Starting comprehensive Kube-hunter security assessment")

        all_results = []
        all_analyses = []

        # Run scans for each mode
        for mode in modes:
            if mode in ["remote", "network"] and cibles:
                for cible in cibles:
                    scan_results, output_file = self.run_kube_hunter(mode, cible)
                    if scan_results:
                        analysis = self.analyze_vulnerabilities(scan_results)
                        if analysis:
                            all_results.append((scan_results, output_file))
                            all_analyses.append(analysis)
            else:
                scan_results, output_file = self.run_kube_hunter(mode)
                if scan_results:
                    analysis = self.analyze_vulnerabilities(scan_results)
                    if analysis:
                        all_results.append((scan_results, output_file))
                        all_analyses.append(analysis)

        if not all_analyses:
            self.logger.error("No successful scans completed")
            return False

        # Combine analyses
        combined_analysis = self.combine_analyses(all_analyses)

        # Check thresholds
        threshold_violations = self.check_thresholds(combined_analysis)

        # Generate comprehensive report
        combined_results = \\\\{"scans": [result[0] for result in all_results]\\\\}
        report, report_file = self.generate_report(combined_analysis, combined_results, "combined-assessment.json")

        # Send notifications
        self.send_notifications(combined_analysis, report_file, threshold_violations)

        # Log summary
        self.logger.info(f"Assessment completed. Total vulnerabilities: \\\\{combined_analysis['total_vulnerabilities']\\\\}")
        if threshold_violations:
            self.logger.warning(f"Threshold violations: \\\\{threshold_violations\\\\}")

        return True

    def combine_analyses(self, analyses):
        """Combine multiple scan analyses"""

        combined = \\\\{
            "timestamp": datetime.now().isoformat(),
            "total_vulnerabilities": 0,
            "severity_breakdown": \\\\{
                "critical": 0,
                "high": 0,
                "medium": 0,
                "low": 0
            \\\\},
            "category_breakdown": \\\\{\\\\},
            "critical_vulnerabilities": [],
            "high_vulnerabilities": [],
            "recommendations": []
        \\\\}

        # Combine all analyses
        for analysis in analyses:
            combined["total_vulnerabilities"] += analysis["total_vulnerabilities"]

            # Combine severity counts
            for severity, count in analysis["severity_breakdown"].items():
                combined["severity_breakdown"][severity] += count

            # Combine category counts
            for category, count in analysis["category_breakdown"].items():
                if category not in combined["category_breakdown"]:
                    combined["category_breakdown"][category] = 0
                combined["category_breakdown"][category] += count

            # Combine vulnerabilities
            combined["critical_vulnerabilities"].extend(analysis["critical_vulnerabilities"])
            combined["high_vulnerabilities"].extend(analysis["high_vulnerabilities"])

        # Generate combined recommendations
        combined["recommendations"] = self.generate_recommendations(combined)

        return combined

# utilisation
if __name__ == "__main__":
    parser = argparse.ArgumentParser(Description="Comprehensive Kube-hunter Security Assessment")
    parser.add_argument("--config", help="configuration file path")
    parser.add_argument("--modes", nargs="+", choices=["internal", "remote", "pod", "network"],
                       default=["internal"], help="Scanning modes")
    parser.add_argument("--cibles", nargs="+", help="cibles for remote/scan réseau")

    args = parser.parse_args()

    assessment = KubeHunterAssessment(args.config)
    success = assessment.run_comprehensive_assessment(args.modes, args.cibles)

    sys.exit(0 if success else 1)

CI/CD Integration Script

#!/usr/bin/env python3
# Kube-hunter CI/CD integration

import subprocessus
import json
import sys
import os

class KubeHunterCICD:
    def __init__(self, fail_on_high=True, fail_on_medium=False):
        self.fail_on_high = fail_on_high
        self.fail_on_medium = fail_on_medium

    def run_security_scan(self, mode="internal", cible=None):
        """Run Kube-hunter security scan in CI/CD"""

        cmd = ["kube-hunter", "--report", "json"]

        if mode == "remote" and cible:
            cmd.extend(["--remote", cible])
        elif mode == "internal":
            cmd.append("--internal")
        elif mode == "pod":
            cmd.append("--pod")

        try:
            result = subprocessus.run(cmd, capture_output=True, text=True, timeout=300)

            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                print(f"Kube-hunter scan failed: \\\\{result.stderr\\\\}")
                return None

        except Exception as e:
            print(f"Error running Kube-hunter: \\\\{e\\\\}")
            return None

    def evaluate_results(self, scan_results):
        """Evaluate scan results for CI/CD pipeline"""

        if not scan_results:
            return False, "Scan failed"

        vulnerabilities = scan_results.get("vulnerabilities", [])

        high_count = len([v for v in vulnerabilities if v.get("severity") == "high"])
        medium_count = len([v for v in vulnerabilities if v.get("severity") == "medium"])
        critical_count = len([v for v in vulnerabilities if v.get("severity") == "critical"])

        # Check failure conditions
        if critical_count > 0:
            return False, f"Critical vulnerabilities found: \\\\{critical_count\\\\}"

        if self.fail_on_high and high_count > 0:
            return False, f"High severity vulnerabilities found: \\\\{high_count\\\\}"

        if self.fail_on_medium and medium_count > 0:
            return False, f"Medium severity vulnerabilities found: \\\\{medium_count\\\\}"

        return True, f"Security scan passed. Vulnerabilities: \\\\{len(vulnerabilities)\\\\}"

    def generate_ci_report(self, scan_results, output_file="kube-hunter-ci-report.json"):
        """Generate CI/CD friendly report"""

        if not scan_results:
            return None

        vulnerabilities = scan_results.get("vulnerabilities", [])

        report = \\\\{
            "summary": \\\\{
                "total_vulnerabilities": len(vulnerabilities),
                "critical": len([v for v in vulnerabilities if v.get("severity") == "critical"]),
                "high": len([v for v in vulnerabilities if v.get("severity") == "high"]),
                "medium": len([v for v in vulnerabilities if v.get("severity") == "medium"]),
                "low": len([v for v in vulnerabilities if v.get("severity") == "low"])
            \\\\},
            "vulnerabilities": vulnerabilities,
            "scan_metadata": scan_results.get("metadata", \\\\{\\\\})
        \\\\}

        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2)

        return report

# utilisation in CI/CD
if __name__ == "__main__":
    cicd = KubeHunterCICD(fail_on_high=True, fail_on_medium=False)

    # Run scan
    results = cicd.run_security_scan("internal")

    # Evaluate results
    passed, message = cicd.evaluate_results(results)

    # Generate report
    cicd.generate_ci_report(results)

    print(message)
    sys.exit(0 if passed else 1)

Integration exemples

Prometheus Monitoring

# kube-hunter-exporter.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kube-hunter-exporter
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kube-hunter-exporter
  template:
    metadata:
      labels:
        app: kube-hunter-exporter
    spec:
      serviceAccountName: kube-hunter-exporter
      containers:
      - name: kube-hunter-exporter
        image: custom/kube-hunter-exporter:latest
        ports:
        - containerport: 8080
        env:
        - name: SCAN_INTERVAL
          value: "3600"  # 1 hour
        - name: METRICS_port
          value: "8080"
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"

---
apiVersion: v1
kind: service
metadata:
  name: kube-hunter-exporter
  namespace: monitoring
  labels:
    app: kube-hunter-exporter
spec:
  ports:
  - port: 8080
    cibleport: 8080
    name: metrics
  selector:
    app: kube-hunter-exporter

---
apiVersion: monitoring.coreos.com/v1
kind: serviceMonitor
metadata:
  name: kube-hunter-exporter
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: kube-hunter-exporter
  endpoints:
  - port: metrics
    interval: 60s
    path: /metrics

Grafana Dashboard

\\\\{
  "dashboard": \\\\{
    "title": "Kube-hunter Security Dashboard",
    "panels": [
      \\\\{
        "title": "Total Vulnerabilities",
        "type": "stat",
        "cibles": [
          \\\\{
            "expr": "kube_hunter_vulnerabilities_total",
            "legendFormat": "Total Vulnerabilities"
          \\\\}
        ]
      \\\\},
      \\\\{
        "title": "Vulnerabilities by Severity",
        "type": "piechart",
        "cibles": [
          \\\\{
            "expr": "kube_hunter_vulnerabilities_by_severity",
            "legendFormat": "\\\\{\\\\{severity\\\\}\\\\}"
          \\\\}
        ]
      \\\\},
      \\\\{
        "title": "Security Scan History",
        "type": "graph",
        "cibles": [
          \\\\{
            "expr": "kube_hunter_scan_duration_seconds",
            "legendFormat": "Scan Duration"
          \\\\}
        ]
      \\\\}
    ]
  \\\\}
\\\\}

dépannage

Common Issues

Permission Problems:

# Check Kubernetes access
kubectl auth can-i get pods
kubectl auth can-i list services

# Verify service account permissions
kubectl describe serviceaccount kube-hunter -n security
kubectl describe clusterrolebinding kube-hunter

# Check pod security policies
kubectl get psp
kubectl describe psp restricted

# Test network connectivity
kubectl run test-pod --image=busybox --rm -it -- nslookup kubernetes.default.svc.cluster.local

Network Connectivity Issues:

# Check cluster networking
kubectl get nodes -o wide
kubectl get services -A

# Test API server connectivity
curl -k https://kubernetes.default.svc.cluster.local:443/version

# Check DNS resolution
nslookup kubernetes.default.svc.cluster.local

# Verify network policies
kubectl get networkpolicies -A
kubectl describe networkpolicy -n kube-system

Scanning Problems:

# Run with debug logging
kube-hunter --internal --log DEBUG

# Check for timeouts
kube-hunter --internal --timeout 60

# Test specific components
kube-hunter --internal --services api-server

# Verify output format
kube-hunter --internal --report json|jq '.'

# Check for missing dependencies
pip list|grep kube-hunter
pip install --upgrade kube-hunter

Performance Optimization

Optimizing Kube-hunter performance:

# Reduce scan scope
kube-hunter --internal --quick

# Limit thread count
kube-hunter --internal --threads 10

# Use specific interface
kube-hunter --internal --interface eth0

# Skip slow checks
kube-hunter --internal --skip-slow-checks

# Optimize for CI/CD
kube-hunter --pod --report json --timeout 120

Security Considerations

Safe utilisation Practices

Scanning Ethics: - Only scan clusters you own or have explicit permission to test - Coordinate with operations teams before running scans - Use read-only scanning modes when possible - Implement proper Contrôle d'Accèss for scan results - Regular updates of Kube-hunter and its dependencies

Data Protection: - Encrypt sensitive scan results and reports - Implement secure storage for vulnérabilité data - Use secure channels for transmitting reports - Regular cleanup of temporary files and logs - Implement data retention policies for compliance

Operational Security

Monitoring and Alerting: - Monitor Kube-hunter execution and results - Set up alerting for critical security findings - Track vulnérabilité trends over time - Implement automated remediation workflows - Regular review of security configurations

Integration Security: - Secure CI/CD pipeline integration - Protect service account jetons and identifiants - Implement proper RBAC for Kubernetes deployments - Monitor for unauthorized Kube-hunter utilisation - Regular security assessment of scanning infrastructure

références

  1. Kube-hunter GitHub Repository
  2. Kubernetes Security Best Practices
  3. CIS Kubernetes Benchmark
  4. NIST Sécurité des Conteneurs Guide
  5. Aqua Security documentation