Skip to content

Kube-hunter Cheat Sheet

Overview

Kube-hunter is a powerful open-source penetration testing tool specifically designed to hunt for security weaknesses in Kubernetes clusters. Developed by Aqua Security, this tool performs comprehensive security assessments by simulating the techniques that attackers might use to compromise Kubernetes environments. Kube-hunter operates by scanning Kubernetes clusters from both internal and external perspectives, identifying misconfigurations, exposed services, and potential attack vectors that could be exploited by malicious actors. The tool's strength lies in its ability to provide actionable security insights that help organizations strengthen their Kubernetes security posture before real attacks occur.

The core functionality of Kube-hunter revolves around its extensive knowledge base of Kubernetes-specific attack techniques and vulnerabilities. The tool performs automated discovery of Kubernetes components including API servers, etcd clusters, kubelet services, and dashboard interfaces, then systematically tests these components for common security issues such as anonymous access, privilege escalation opportunities, and insecure configurations. Kube-hunter's hunting methodology includes testing for exposed sensitive information, weak authentication mechanisms, container escape possibilities, and network segmentation issues that are commonly found in misconfigured Kubernetes deployments.

Kube-hunter's value proposition extends beyond simple vulnerability scanning by providing detailed explanations of discovered issues, their potential impact, and specific remediation guidance. The tool supports multiple execution modes including remote scanning for external assessments, internal scanning for comprehensive cluster analysis, and network scanning for discovering Kubernetes services across network ranges. With its ability to generate detailed reports in multiple formats and integrate into CI/CD pipelines, Kube-hunter has become an essential tool for DevSecOps teams, security professionals, and Kubernetes administrators who need to maintain robust security standards in their container orchestration environments.

Installation

Python Package Installation

Installing Kube-hunter using pip:

bash
# Install Kube-hunter
pip install kube-hunter

# Alternative: Install from source
git clone https://github.com/aquasecurity/kube-hunter.git
cd kube-hunter
pip install -r requirements.txt
python setup.py install

# Verify installation
kube-hunter --help

# Install additional dependencies
pip install requests urllib3 netaddr

# Update to latest version
pip install --upgrade kube-hunter

# Install specific version
pip install kube-hunter==0.6.8

Docker Installation

Running Kube-hunter in Docker containers:

bash
# Run Kube-hunter container for remote scanning
docker run --rm -it aquasec/kube-hunter

# Run with network access for internal scanning
docker run --rm --network host aquasec/kube-hunter --internal

# Run with custom configuration
docker run --rm -v $(pwd)/config:/config aquasec/kube-hunter --config /config/kube-hunter.conf

# Run with output to file
docker run --rm -v $(pwd)/reports:/reports aquasec/kube-hunter --report json --output /reports/kube-hunter-report.json

# Run specific hunting mode
docker run --rm aquasec/kube-hunter --remote 10.0.0.0/24

# Run with custom interface
docker run --rm --network host aquasec/kube-hunter --interface eth0

Kubernetes Deployment

Deploying Kube-hunter as a Kubernetes job:

yaml
# kube-hunter-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: kube-hunter
  namespace: security
spec:
  template:
    metadata:
      labels:
        app: kube-hunter
    spec:
      restartPolicy: Never
      serviceAccountName: kube-hunter
      containers:
      - name: kube-hunter
        image: aquasec/kube-hunter
        command: ["kube-hunter"]
        args: ["--pod", "--report", "json", "--log", "INFO"]
        volumeMounts:
        - name: output
          mountPath: /tmp
        env:
        - name: KUBERNETES_SERVICE_HOST
          value: "kubernetes.default.svc.cluster.local"
        - name: KUBERNETES_SERVICE_PORT
          value: "443"
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"
      volumes:
      - name: output
        emptyDir: {}
  backoffLimit: 1

---
# ServiceAccount for Kube-hunter
apiVersion: v1
kind: ServiceAccount
metadata:
  name: kube-hunter
  namespace: security

---
# ClusterRole for Kube-hunter
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: kube-hunter
rules:
- apiGroups: [""]
  resources: ["pods", "services", "nodes"]
  verbs: ["get", "list"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets"]
  verbs: ["get", "list"]
- apiGroups: ["extensions"]
  resources: ["deployments"]
  verbs: ["get", "list"]

---
# ClusterRoleBinding for Kube-hunter
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: kube-hunter
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kube-hunter
subjects:
- kind: ServiceAccount
  name: kube-hunter
  namespace: security

Virtual Environment Setup

bash
# Create virtual environment
python3 -m venv kube-hunter-env
source kube-hunter-env/bin/activate

# Install Kube-hunter
pip install kube-hunter

# Install additional tools
pip install kubernetes pyyaml

# Create configuration directory
mkdir -p ~/.kube-hunter

# Verify setup
kube-hunter --version

# Create wrapper script
cat > /usr/local/bin/kube-hunt << 'EOF'
#!/bin/bash
source /path/to/kube-hunter-env/bin/activate
kube-hunter "$@"
EOF

chmod +x /usr/local/bin/kube-hunt

Basic Usage

Remote Scanning

Scanning Kubernetes clusters from external perspective:

bash
# Basic remote scan
kube-hunter --remote

# Scan specific IP address
kube-hunter --remote 10.0.0.100

# Scan IP range
kube-hunter --remote 10.0.0.0/24

# Scan multiple targets
kube-hunter --remote 10.0.0.100,10.0.0.101,10.0.0.102

# Scan with custom ports
kube-hunter --remote 10.0.0.100 --port 6443,8080,10250

# Quick scan mode
kube-hunter --remote 10.0.0.100 --quick

# Verbose output
kube-hunter --remote 10.0.0.100 --log DEBUG

# Save results to file
kube-hunter --remote 10.0.0.100 --report json --output remote-scan.json

Internal Scanning

Scanning from within the Kubernetes cluster:

bash
# Internal cluster scan
kube-hunter --internal

# Pod-based scanning
kube-hunter --pod

# Scan with specific interface
kube-hunter --internal --interface eth0

# Internal scan with network discovery
kube-hunter --internal --network-scan

# Scan specific services
kube-hunter --internal --services api-server,kubelet

# Internal scan with custom timeout
kube-hunter --internal --timeout 30

# Scan with authentication
kube-hunter --internal --kubeconfig ~/.kube/config

# Internal scan with specific namespace
kube-hunter --internal --namespace kube-system

Network Scanning

Discovering Kubernetes services across networks:

bash
# Network discovery scan
kube-hunter --network 10.0.0.0/24

# Scan multiple networks
kube-hunter --network 10.0.0.0/24,192.168.1.0/24

# Network scan with port range
kube-hunter --network 10.0.0.0/24 --port-range 1-65535

# Fast network scan
kube-hunter --network 10.0.0.0/24 --quick

# Network scan with custom threads
kube-hunter --network 10.0.0.0/24 --threads 50

# Scan with service detection
kube-hunter --network 10.0.0.0/24 --detect-services

# Network scan with output
kube-hunter --network 10.0.0.0/24 --report json --output network-scan.json

Advanced Features

Custom Hunting Modules

Creating and using custom hunting modules:

python
# custom_hunter.py - Custom Kube-hunter module
from kube_hunter.core.types import Hunter, KubernetesCluster
from kube_hunter.core.events import handler

class CustomSecurityHunter(Hunter):
    """Custom security hunter for specific vulnerabilities"""
    
    def __init__(self, event):
        self.event = event
    
    @handler.subscribe(KubernetesCluster)
    def hunt_custom_vulnerabilities(self, event):
        """Hunt for custom security vulnerabilities"""
        
        # Custom vulnerability detection logic
        self.check_custom_misconfigurations(event)
        self.check_exposed_secrets(event)
        self.check_privilege_escalation(event)
    
    def check_custom_misconfigurations(self, event):
        """Check for custom misconfigurations"""
        
        # Implementation for custom checks
        pass
    
    def check_exposed_secrets(self, event):
        """Check for exposed secrets"""
        
        # Implementation for secret detection
        pass
    
    def check_privilege_escalation(self, event):
        """Check for privilege escalation opportunities"""
        
        # Implementation for privilege escalation checks
        pass

# Register the custom hunter
def register_custom_hunters():
    """Register custom hunting modules"""
    
    from kube_hunter.modules import discovery
    discovery.hunters.append(CustomSecurityHunter)
bash
# Use custom hunting modules
kube-hunter --internal --custom-modules custom_hunter.py

# Run with multiple custom modules
kube-hunter --internal --custom-modules custom_hunter.py,advanced_checks.py

# Load custom configuration
kube-hunter --internal --config custom-config.yaml

Advanced Configuration

Creating comprehensive Kube-hunter configurations:

yaml
# kube-hunter-config.yaml
hunting:
  # Hunting modes
  remote: false
  internal: true
  pod: true
  
  # Network configuration
  network_scan: true
  interface: "eth0"
  timeout: 30
  threads: 20
  
  # Target configuration
  targets:
    - "10.0.0.0/24"
    - "kubernetes.default.svc.cluster.local"
  
  ports:
    - 6443    # API Server
    - 8080    # Insecure API Server
    - 10250   # Kubelet
    - 10255   # Read-only Kubelet
    - 2379    # etcd
    - 2380    # etcd peer
    - 30000-32767  # NodePort range

# Hunting modules
modules:
  enabled:
    - "discovery.apiserver"
    - "discovery.kubelet"
    - "discovery.etcd"
    - "discovery.dashboard"
    - "hunting.apiserver"
    - "hunting.kubelet"
    - "hunting.etcd"
  
  disabled:
    - "hunting.arp"  # Disable ARP hunting in some environments

# Reporting configuration
reporting:
  format: "json"
  output_file: "/tmp/kube-hunter-report.json"
  include_hunter_statistics: true
  include_vulnerabilities_details: true

# Logging configuration
logging:
  level: "INFO"
  file: "/tmp/kube-hunter.log"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

# Authentication
authentication:
  kubeconfig: "~/.kube/config"
  service_account_token: "/var/run/secrets/kubernetes.io/serviceaccount/token"

# Custom hunters
custom_hunters:
  - "custom_hunter.py"
  - "advanced_checks.py"
bash
# Run with configuration file
kube-hunter --config kube-hunter-config.yaml

# Override specific configuration
kube-hunter --config kube-hunter-config.yaml --remote 10.0.0.100

# Validate configuration
kube-hunter --config kube-hunter-config.yaml --validate-config

Automated Hunting

Creating automated hunting workflows:

bash
#!/bin/bash
# automated-kube-hunting.sh - Automated Kubernetes security hunting

# Configuration
CLUSTER_NAME="production"
OUTPUT_DIR="/var/log/kube-hunter"
REPORT_FORMAT="json"
NOTIFICATION_EMAIL="security@company.com"

# Create output directory
mkdir -p "$OUTPUT_DIR"

# Function to run Kube-hunter scan
run_kube_hunter_scan() {
    local scan_type="$1"
    local target="$2"
    local output_file="$3"
    
    echo "Running $scan_type scan..."
    
    case "$scan_type" in
        "remote")
            kube-hunter --remote "$target" \
                --report "$REPORT_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
        "internal")
            kube-hunter --internal \
                --report "$REPORT_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
        "network")
            kube-hunter --network "$target" \
                --report "$REPORT_FORMAT" \
                --output "$output_file" \
                --log INFO
            ;;
    esac
    
    return $?
}

# Function to analyze results
analyze_results() {
    local report_file="$1"
    
    if [[ ! -f "$report_file" ]]; then
        echo "Report file not found: $report_file"
        return 1
    fi
    
    # Parse JSON report
    local vulnerabilities=$(jq '.vulnerabilities | length' "$report_file")
    local high_severity=$(jq '[.vulnerabilities[] | select(.severity == "high")] | length' "$report_file")
    local medium_severity=$(jq '[.vulnerabilities[] | select(.severity == "medium")] | length' "$report_file")
    
    echo "Analysis Results:"
    echo "  Total vulnerabilities: $vulnerabilities"
    echo "  High severity: $high_severity"
    echo "  Medium severity: $medium_severity"
    
    # Check thresholds
    if [[ "$high_severity" -gt 0 ]]; then
        echo "WARNING: High severity vulnerabilities found!"
        return 2
    elif [[ "$medium_severity" -gt 5 ]]; then
        echo "WARNING: Multiple medium severity vulnerabilities found!"
        return 1
    fi
    
    return 0
}

# Function to send notification
send_notification() {
    local subject="$1"
    local body="$2"
    local report_file="$3"
    
    # Send email notification (requires mail command)
    if command -v mail >/dev/null 2>&1; then
        echo "$body" | mail -s "$subject" -a "$report_file" "$NOTIFICATION_EMAIL"
    fi
    
    # Send to webhook (if configured)
    if [[ -n "$WEBHOOK_URL" ]]; then
        curl -X POST "$WEBHOOK_URL" \
            -H "Content-Type: application/json" \
            -d "{\"subject\": \"$subject\", \"body\": \"$body\"}"
    fi
}

# Main execution
main() {
    echo "Starting automated Kube-hunter security assessment for $CLUSTER_NAME"
    
    local timestamp=$(date +%Y%m%d-%H%M%S)
    local report_file="$OUTPUT_DIR/kube-hunter-$CLUSTER_NAME-$timestamp.json"
    
    # Run internal scan
    if run_kube_hunter_scan "internal" "" "$report_file"; then
        echo "Kube-hunter scan completed successfully"
        
        # Analyze results
        analyze_results "$report_file"
        local analysis_result=$?
        
        case "$analysis_result" in
            0)
                echo "No critical security issues found"
                send_notification "Kube-hunter Report: $CLUSTER_NAME - Clean" \
                    "Kube-hunter scan completed with no critical issues." \
                    "$report_file"
                ;;
            1)
                echo "Medium severity issues found"
                send_notification "Kube-hunter Report: $CLUSTER_NAME - Medium Issues" \
                    "Kube-hunter scan found medium severity security issues." \
                    "$report_file"
                ;;
            2)
                echo "High severity issues found!"
                send_notification "Kube-hunter ALERT: $CLUSTER_NAME - High Severity Issues" \
                    "ALERT: Kube-hunter scan found high severity security vulnerabilities!" \
                    "$report_file"
                ;;
        esac
    else
        echo "Kube-hunter scan failed"
        send_notification "Kube-hunter Error: $CLUSTER_NAME" \
            "Kube-hunter scan failed to complete." \
            ""
    fi
    
    # Cleanup old reports (keep last 30 days)
    find "$OUTPUT_DIR" -name "kube-hunter-*.json" -mtime +30 -delete
    
    echo "Automated hunting completed"
}

# Execute main function
main "$@"

Automation Scripts

Comprehensive Security Assessment

python
#!/usr/bin/env python3
# Comprehensive Kubernetes security assessment with Kube-hunter

import subprocess
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class KubeHunterAssessment:
    def __init__(self, config_file=None):
        self.config_file = config_file
        self.load_configuration()
        self.setup_logging()
        
    def load_configuration(self):
        """Load assessment configuration"""
        default_config = {
            "scanning": {
                "modes": ["internal", "remote"],
                "targets": [],
                "timeout": 300,
                "threads": 20
            },
            "reporting": {
                "format": "json",
                "output_dir": "/tmp/kube-hunter-reports",
                "include_statistics": True
            },
            "notifications": {
                "email": {
                    "enabled": False,
                    "smtp_server": "localhost",
                    "smtp_port": 587,
                    "username": "",
                    "password": "",
                    "from": "kube-hunter@example.com",
                    "to": "security@example.com"
                },
                "webhook": {
                    "enabled": False,
                    "url": "",
                    "headers": {}
                }
            },
            "thresholds": {
                "high_severity_max": 0,
                "medium_severity_max": 5,
                "total_vulnerabilities_max": 10
            }
        }
        
        if self.config_file and os.path.exists(self.config_file):
            with open(self.config_file, 'r') as f:
                import yaml
                user_config = yaml.safe_load(f)
                # Merge configurations
                self.config = {**default_config, **user_config}
        else:
            self.config = default_config
    
    def setup_logging(self):
        """Setup logging configuration"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('kube-hunter-assessment.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def run_kube_hunter(self, mode, target=None, output_file=None):
        """Run Kube-hunter scan"""
        
        if not output_file:
            timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
            output_file = f"kube-hunter-{mode}-{timestamp}.json"
        
        # Build command
        cmd = ["kube-hunter"]
        
        if mode == "remote" and target:
            cmd.extend(["--remote", target])
        elif mode == "internal":
            cmd.append("--internal")
        elif mode == "pod":
            cmd.append("--pod")
        elif mode == "network" and target:
            cmd.extend(["--network", target])
        
        # Add common options
        cmd.extend([
            "--report", "json",
            "--output", output_file,
            "--log", "INFO"
        ])
        
        # Add timeout if configured
        timeout = self.config.get("scanning", {}).get("timeout", 300)
        
        try:
            self.logger.info(f"Running Kube-hunter in {mode} mode...")
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout
            )
            
            if result.returncode == 0:
                self.logger.info(f"Kube-hunter scan completed: {output_file}")
                
                # Load and return results
                with open(output_file, 'r') as f:
                    scan_results = json.load(f)
                
                return scan_results, output_file
            else:
                self.logger.error(f"Kube-hunter scan failed: {result.stderr}")
                return None, None
                
        except subprocess.TimeoutExpired:
            self.logger.error(f"Kube-hunter scan timed out after {timeout} seconds")
            return None, None
        except Exception as e:
            self.logger.error(f"Error running Kube-hunter: {e}")
            return None, None
    
    def analyze_vulnerabilities(self, scan_results):
        """Analyze scan results for vulnerabilities"""
        
        if not scan_results:
            return None
        
        vulnerabilities = scan_results.get("vulnerabilities", [])
        
        analysis = {
            "timestamp": datetime.now().isoformat(),
            "total_vulnerabilities": len(vulnerabilities),
            "severity_breakdown": {
                "critical": 0,
                "high": 0,
                "medium": 0,
                "low": 0
            },
            "category_breakdown": {},
            "critical_vulnerabilities": [],
            "high_vulnerabilities": [],
            "recommendations": []
        }
        
        # Analyze each vulnerability
        for vuln in vulnerabilities:
            severity = vuln.get("severity", "unknown").lower()
            category = vuln.get("category", "unknown")
            
            # Count by severity
            if severity in analysis["severity_breakdown"]:
                analysis["severity_breakdown"][severity] += 1
            
            # Count by category
            if category not in analysis["category_breakdown"]:
                analysis["category_breakdown"][category] = 0
            analysis["category_breakdown"][category] += 1
            
            # Collect critical and high severity vulnerabilities
            if severity == "critical":
                analysis["critical_vulnerabilities"].append(vuln)
            elif severity == "high":
                analysis["high_vulnerabilities"].append(vuln)
        
        # Generate recommendations
        analysis["recommendations"] = self.generate_recommendations(analysis)
        
        return analysis
    
    def generate_recommendations(self, analysis):
        """Generate security recommendations based on analysis"""
        
        recommendations = []
        
        # Critical vulnerabilities
        if analysis["critical_vulnerabilities"]:
            recommendations.append({
                "priority": "CRITICAL",
                "title": "Address Critical Vulnerabilities Immediately",
                "description": f"Found {len(analysis['critical_vulnerabilities'])} critical vulnerabilities that require immediate attention.",
                "action": "Review and remediate all critical vulnerabilities within 24 hours."
            })
        
        # High severity vulnerabilities
        if analysis["high_vulnerabilities"]:
            recommendations.append({
                "priority": "HIGH",
                "title": "Remediate High Severity Issues",
                "description": f"Found {len(analysis['high_vulnerabilities'])} high severity vulnerabilities.",
                "action": "Plan remediation for high severity vulnerabilities within 1 week."
            })
        
        # Category-specific recommendations
        categories = analysis["category_breakdown"]
        
        if categories.get("Access Risk", 0) > 0:
            recommendations.append({
                "priority": "HIGH",
                "title": "Review Access Controls",
                "description": "Access control vulnerabilities detected.",
                "action": "Review and strengthen RBAC policies and authentication mechanisms."
            })
        
        if categories.get("Information Disclosure", 0) > 0:
            recommendations.append({
                "priority": "MEDIUM",
                "title": "Prevent Information Disclosure",
                "description": "Information disclosure vulnerabilities found.",
                "action": "Review exposed services and implement proper access controls."
            })
        
        if categories.get("Remote Code Execution", 0) > 0:
            recommendations.append({
                "priority": "CRITICAL",
                "title": "Address Remote Code Execution Risks",
                "description": "Remote code execution vulnerabilities detected.",
                "action": "Immediately patch or isolate affected components."
            })
        
        return recommendations
    
    def check_thresholds(self, analysis):
        """Check if vulnerabilities exceed configured thresholds"""
        
        thresholds = self.config.get("thresholds", {})
        
        high_severity_count = analysis["severity_breakdown"]["high"] + analysis["severity_breakdown"]["critical"]
        medium_severity_count = analysis["severity_breakdown"]["medium"]
        total_vulnerabilities = analysis["total_vulnerabilities"]
        
        threshold_violations = []
        
        # Check high severity threshold
        high_max = thresholds.get("high_severity_max", 0)
        if high_severity_count > high_max:
            threshold_violations.append(f"High/Critical severity: {high_severity_count} > {high_max}")
        
        # Check medium severity threshold
        medium_max = thresholds.get("medium_severity_max", 5)
        if medium_severity_count > medium_max:
            threshold_violations.append(f"Medium severity: {medium_severity_count} > {medium_max}")
        
        # Check total vulnerabilities threshold
        total_max = thresholds.get("total_vulnerabilities_max", 10)
        if total_vulnerabilities > total_max:
            threshold_violations.append(f"Total vulnerabilities: {total_vulnerabilities} > {total_max}")
        
        return threshold_violations
    
    def generate_report(self, analysis, scan_results, output_file):
        """Generate comprehensive security report"""
        
        report = {
            "metadata": {
                "generated_at": datetime.now().isoformat(),
                "tool": "Kube-hunter",
                "assessment_type": "Kubernetes Security Assessment"
            },
            "executive_summary": {
                "total_vulnerabilities": analysis["total_vulnerabilities"],
                "critical_count": analysis["severity_breakdown"]["critical"],
                "high_count": analysis["severity_breakdown"]["high"],
                "medium_count": analysis["severity_breakdown"]["medium"],
                "low_count": analysis["severity_breakdown"]["low"],
                "risk_level": self.calculate_risk_level(analysis)
            },
            "detailed_analysis": analysis,
            "raw_results": scan_results,
            "recommendations": analysis["recommendations"]
        }
        
        # Save report
        report_file = output_file.replace('.json', '-report.json')
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2)
        
        self.logger.info(f"Comprehensive report generated: {report_file}")
        return report, report_file
    
    def calculate_risk_level(self, analysis):
        """Calculate overall risk level"""
        
        critical_count = analysis["severity_breakdown"]["critical"]
        high_count = analysis["severity_breakdown"]["high"]
        medium_count = analysis["severity_breakdown"]["medium"]
        
        if critical_count > 0:
            return "CRITICAL"
        elif high_count > 3:
            return "HIGH"
        elif high_count > 0 or medium_count > 5:
            return "MEDIUM"
        elif medium_count > 0:
            return "LOW"
        else:
            return "MINIMAL"
    
    def send_notifications(self, analysis, report_file, threshold_violations):
        """Send notifications about assessment results"""
        
        # Prepare notification content
        subject = "Kube-hunter Security Assessment Results"
        if threshold_violations:
            subject += " - ALERT"
        
        body = self.generate_notification_body(analysis, threshold_violations)
        
        # Send email notification
        if self.config.get("notifications", {}).get("email", {}).get("enabled", False):
            self.send_email_notification(subject, body, report_file)
        
        # Send webhook notification
        if self.config.get("notifications", {}).get("webhook", {}).get("enabled", False):
            self.send_webhook_notification(analysis, threshold_violations)
    
    def generate_notification_body(self, analysis, threshold_violations):
        """Generate notification message body"""
        
        body = f"""
Kube-hunter Security Assessment Results
Generated: {analysis['timestamp']}

EXECUTIVE SUMMARY:
==================
Total Vulnerabilities: {analysis['total_vulnerabilities']}
Critical: {analysis['severity_breakdown']['critical']}
High: {analysis['severity_breakdown']['high']}
Medium: {analysis['severity_breakdown']['medium']}
Low: {analysis['severity_breakdown']['low']}

"""
        
        if threshold_violations:
            body += "⚠️  THRESHOLD VIOLATIONS:\n"
            for violation in threshold_violations:
                body += f"  - {violation}\n"
            body += "\n"
        
        # Add critical vulnerabilities
        if analysis["critical_vulnerabilities"]:
            body += "CRITICAL VULNERABILITIES:\n"
            body += "========================\n"
            for vuln in analysis["critical_vulnerabilities"][:3]:
                body += f"- {vuln.get('vulnerability', 'Unknown')}\n"
                body += f"  Description: {vuln.get('description', 'No description')}\n"
                body += f"  Evidence: {vuln.get('evidence', 'No evidence')}\n\n"
        
        # Add recommendations
        if analysis["recommendations"]:
            body += "TOP RECOMMENDATIONS:\n"
            body += "===================\n"
            for rec in analysis["recommendations"][:3]:
                body += f"- [{rec['priority']}] {rec['title']}\n"
                body += f"  {rec['action']}\n\n"
        
        return body
    
    def send_email_notification(self, subject, body, attachment_file):
        """Send email notification"""
        
        email_config = self.config.get("notifications", {}).get("email", {})
        
        try:
            msg = MIMEMultipart()
            msg['From'] = email_config["from"]
            msg['To'] = email_config["to"]
            msg['Subject'] = subject
            
            msg.attach(MIMEText(body, 'plain'))
            
            # Attach report file
            if attachment_file and os.path.exists(attachment_file):
                with open(attachment_file, 'r') as f:
                    attachment = MIMEText(f.read())
                    attachment.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(attachment_file)}"')
                    msg.attach(attachment)
            
            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
            server.starttls()
            
            if email_config.get("username") and email_config.get("password"):
                server.login(email_config["username"], email_config["password"])
            
            text = msg.as_string()
            server.sendmail(email_config["from"], email_config["to"], text)
            server.quit()
            
            self.logger.info("Email notification sent successfully")
            
        except Exception as e:
            self.logger.error(f"Failed to send email notification: {e}")
    
    def send_webhook_notification(self, analysis, threshold_violations):
        """Send webhook notification"""
        
        webhook_config = self.config.get("notifications", {}).get("webhook", {})
        
        try:
            import requests
            
            payload = {
                "timestamp": analysis["timestamp"],
                "alert": bool(threshold_violations),
                "threshold_violations": threshold_violations,
                "summary": {
                    "total_vulnerabilities": analysis["total_vulnerabilities"],
                    "critical": analysis["severity_breakdown"]["critical"],
                    "high": analysis["severity_breakdown"]["high"],
                    "medium": analysis["severity_breakdown"]["medium"]
                },
                "recommendations": analysis["recommendations"][:3]
            }
            
            headers = webhook_config.get("headers", {})
            headers["Content-Type"] = "application/json"
            
            response = requests.post(
                webhook_config["url"],
                json=payload,
                headers=headers,
                timeout=30
            )
            
            if response.status_code == 200:
                self.logger.info("Webhook notification sent successfully")
            else:
                self.logger.error(f"Webhook notification failed: {response.status_code}")
                
        except Exception as e:
            self.logger.error(f"Failed to send webhook notification: {e}")
    
    def run_comprehensive_assessment(self, modes=None, targets=None):
        """Run comprehensive security assessment"""
        
        if not modes:
            modes = self.config.get("scanning", {}).get("modes", ["internal"])
        
        if not targets:
            targets = self.config.get("scanning", {}).get("targets", [])
        
        self.logger.info("Starting comprehensive Kube-hunter security assessment")
        
        all_results = []
        all_analyses = []
        
        # Run scans for each mode
        for mode in modes:
            if mode in ["remote", "network"] and targets:
                for target in targets:
                    scan_results, output_file = self.run_kube_hunter(mode, target)
                    if scan_results:
                        analysis = self.analyze_vulnerabilities(scan_results)
                        if analysis:
                            all_results.append((scan_results, output_file))
                            all_analyses.append(analysis)
            else:
                scan_results, output_file = self.run_kube_hunter(mode)
                if scan_results:
                    analysis = self.analyze_vulnerabilities(scan_results)
                    if analysis:
                        all_results.append((scan_results, output_file))
                        all_analyses.append(analysis)
        
        if not all_analyses:
            self.logger.error("No successful scans completed")
            return False
        
        # Combine analyses
        combined_analysis = self.combine_analyses(all_analyses)
        
        # Check thresholds
        threshold_violations = self.check_thresholds(combined_analysis)
        
        # Generate comprehensive report
        combined_results = {"scans": [result[0] for result in all_results]}
        report, report_file = self.generate_report(combined_analysis, combined_results, "combined-assessment.json")
        
        # Send notifications
        self.send_notifications(combined_analysis, report_file, threshold_violations)
        
        # Log summary
        self.logger.info(f"Assessment completed. Total vulnerabilities: {combined_analysis['total_vulnerabilities']}")
        if threshold_violations:
            self.logger.warning(f"Threshold violations: {threshold_violations}")
        
        return True
    
    def combine_analyses(self, analyses):
        """Combine multiple scan analyses"""
        
        combined = {
            "timestamp": datetime.now().isoformat(),
            "total_vulnerabilities": 0,
            "severity_breakdown": {
                "critical": 0,
                "high": 0,
                "medium": 0,
                "low": 0
            },
            "category_breakdown": {},
            "critical_vulnerabilities": [],
            "high_vulnerabilities": [],
            "recommendations": []
        }
        
        # Combine all analyses
        for analysis in analyses:
            combined["total_vulnerabilities"] += analysis["total_vulnerabilities"]
            
            # Combine severity counts
            for severity, count in analysis["severity_breakdown"].items():
                combined["severity_breakdown"][severity] += count
            
            # Combine category counts
            for category, count in analysis["category_breakdown"].items():
                if category not in combined["category_breakdown"]:
                    combined["category_breakdown"][category] = 0
                combined["category_breakdown"][category] += count
            
            # Combine vulnerabilities
            combined["critical_vulnerabilities"].extend(analysis["critical_vulnerabilities"])
            combined["high_vulnerabilities"].extend(analysis["high_vulnerabilities"])
        
        # Generate combined recommendations
        combined["recommendations"] = self.generate_recommendations(combined)
        
        return combined

# Usage
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Comprehensive Kube-hunter Security Assessment")
    parser.add_argument("--config", help="Configuration file path")
    parser.add_argument("--modes", nargs="+", choices=["internal", "remote", "pod", "network"], 
                       default=["internal"], help="Scanning modes")
    parser.add_argument("--targets", nargs="+", help="Targets for remote/network scanning")
    
    args = parser.parse_args()
    
    assessment = KubeHunterAssessment(args.config)
    success = assessment.run_comprehensive_assessment(args.modes, args.targets)
    
    sys.exit(0 if success else 1)

CI/CD Integration Script

python
#!/usr/bin/env python3
# Kube-hunter CI/CD integration

import subprocess
import json
import sys
import os

class KubeHunterCICD:
    def __init__(self, fail_on_high=True, fail_on_medium=False):
        self.fail_on_high = fail_on_high
        self.fail_on_medium = fail_on_medium
    
    def run_security_scan(self, mode="internal", target=None):
        """Run Kube-hunter security scan in CI/CD"""
        
        cmd = ["kube-hunter", "--report", "json"]
        
        if mode == "remote" and target:
            cmd.extend(["--remote", target])
        elif mode == "internal":
            cmd.append("--internal")
        elif mode == "pod":
            cmd.append("--pod")
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
            
            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                print(f"Kube-hunter scan failed: {result.stderr}")
                return None
                
        except Exception as e:
            print(f"Error running Kube-hunter: {e}")
            return None
    
    def evaluate_results(self, scan_results):
        """Evaluate scan results for CI/CD pipeline"""
        
        if not scan_results:
            return False, "Scan failed"
        
        vulnerabilities = scan_results.get("vulnerabilities", [])
        
        high_count = len([v for v in vulnerabilities if v.get("severity") == "high"])
        medium_count = len([v for v in vulnerabilities if v.get("severity") == "medium"])
        critical_count = len([v for v in vulnerabilities if v.get("severity") == "critical"])
        
        # Check failure conditions
        if critical_count > 0:
            return False, f"Critical vulnerabilities found: {critical_count}"
        
        if self.fail_on_high and high_count > 0:
            return False, f"High severity vulnerabilities found: {high_count}"
        
        if self.fail_on_medium and medium_count > 0:
            return False, f"Medium severity vulnerabilities found: {medium_count}"
        
        return True, f"Security scan passed. Vulnerabilities: {len(vulnerabilities)}"
    
    def generate_ci_report(self, scan_results, output_file="kube-hunter-ci-report.json"):
        """Generate CI/CD friendly report"""
        
        if not scan_results:
            return None
        
        vulnerabilities = scan_results.get("vulnerabilities", [])
        
        report = {
            "summary": {
                "total_vulnerabilities": len(vulnerabilities),
                "critical": len([v for v in vulnerabilities if v.get("severity") == "critical"]),
                "high": len([v for v in vulnerabilities if v.get("severity") == "high"]),
                "medium": len([v for v in vulnerabilities if v.get("severity") == "medium"]),
                "low": len([v for v in vulnerabilities if v.get("severity") == "low"])
            },
            "vulnerabilities": vulnerabilities,
            "scan_metadata": scan_results.get("metadata", {})
        }
        
        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2)
        
        return report

# Usage in CI/CD
if __name__ == "__main__":
    cicd = KubeHunterCICD(fail_on_high=True, fail_on_medium=False)
    
    # Run scan
    results = cicd.run_security_scan("internal")
    
    # Evaluate results
    passed, message = cicd.evaluate_results(results)
    
    # Generate report
    cicd.generate_ci_report(results)
    
    print(message)
    sys.exit(0 if passed else 1)

Integration Examples

Prometheus Monitoring

yaml
# kube-hunter-exporter.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kube-hunter-exporter
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kube-hunter-exporter
  template:
    metadata:
      labels:
        app: kube-hunter-exporter
    spec:
      serviceAccountName: kube-hunter-exporter
      containers:
      - name: kube-hunter-exporter
        image: custom/kube-hunter-exporter:latest
        ports:
        - containerPort: 8080
        env:
        - name: SCAN_INTERVAL
          value: "3600"  # 1 hour
        - name: METRICS_PORT
          value: "8080"
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
          requests:
            memory: "128Mi"
            cpu: "100m"

---
apiVersion: v1
kind: Service
metadata:
  name: kube-hunter-exporter
  namespace: monitoring
  labels:
    app: kube-hunter-exporter
spec:
  ports:
  - port: 8080
    targetPort: 8080
    name: metrics
  selector:
    app: kube-hunter-exporter

---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kube-hunter-exporter
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: kube-hunter-exporter
  endpoints:
  - port: metrics
    interval: 60s
    path: /metrics

Grafana Dashboard

json
{
  "dashboard": {
    "title": "Kube-hunter Security Dashboard",
    "panels": [
      {
        "title": "Total Vulnerabilities",
        "type": "stat",
        "targets": [
          {
            "expr": "kube_hunter_vulnerabilities_total",
            "legendFormat": "Total Vulnerabilities"
          }
        ]
      },
      {
        "title": "Vulnerabilities by Severity",
        "type": "piechart",
        "targets": [
          {
            "expr": "kube_hunter_vulnerabilities_by_severity",
            "legendFormat": "{{severity}}"
          }
        ]
      },
      {
        "title": "Security Scan History",
        "type": "graph",
        "targets": [
          {
            "expr": "kube_hunter_scan_duration_seconds",
            "legendFormat": "Scan Duration"
          }
        ]
      }
    ]
  }
}

Troubleshooting

Common Issues

Permission Problems:

bash
# Check Kubernetes access
kubectl auth can-i get pods
kubectl auth can-i list services

# Verify service account permissions
kubectl describe serviceaccount kube-hunter -n security
kubectl describe clusterrolebinding kube-hunter

# Check pod security policies
kubectl get psp
kubectl describe psp restricted

# Test network connectivity
kubectl run test-pod --image=busybox --rm -it -- nslookup kubernetes.default.svc.cluster.local

Network Connectivity Issues:

bash
# Check cluster networking
kubectl get nodes -o wide
kubectl get services -A

# Test API server connectivity
curl -k https://kubernetes.default.svc.cluster.local:443/version

# Check DNS resolution
nslookup kubernetes.default.svc.cluster.local

# Verify network policies
kubectl get networkpolicies -A
kubectl describe networkpolicy -n kube-system

Scanning Problems:

bash
# Run with debug logging
kube-hunter --internal --log DEBUG

# Check for timeouts
kube-hunter --internal --timeout 60

# Test specific components
kube-hunter --internal --services api-server

# Verify output format
kube-hunter --internal --report json | jq '.'

# Check for missing dependencies
pip list | grep kube-hunter
pip install --upgrade kube-hunter

Performance Optimization

Optimizing Kube-hunter performance:

bash
# Reduce scan scope
kube-hunter --internal --quick

# Limit thread count
kube-hunter --internal --threads 10

# Use specific interface
kube-hunter --internal --interface eth0

# Skip slow checks
kube-hunter --internal --skip-slow-checks

# Optimize for CI/CD
kube-hunter --pod --report json --timeout 120

Security Considerations

Safe Usage Practices

Scanning Ethics:

  • Only scan clusters you own or have explicit permission to test
  • Coordinate with operations teams before running scans
  • Use read-only scanning modes when possible
  • Implement proper access controls for scan results
  • Regular updates of Kube-hunter and its dependencies

Data Protection:

  • Encrypt sensitive scan results and reports
  • Implement secure storage for vulnerability data
  • Use secure channels for transmitting reports
  • Regular cleanup of temporary files and logs
  • Implement data retention policies for compliance

Operational Security

Monitoring and Alerting:

  • Monitor Kube-hunter execution and results
  • Set up alerting for critical security findings
  • Track vulnerability trends over time
  • Implement automated remediation workflows
  • Regular review of security configurations

Integration Security:

  • Secure CI/CD pipeline integration
  • Protect service account tokens and credentials
  • Implement proper RBAC for Kubernetes deployments
  • Monitor for unauthorized Kube-hunter usage
  • Regular security assessment of scanning infrastructure

References

  1. Kube-hunter GitHub Repository
  2. Kubernetes Security Best Practices
  3. CIS Kubernetes Benchmark
  4. NIST Container Security Guide
  5. Aqua Security Documentation