Appearance
Kube-hunter Cheat Sheet
Overview
Kube-hunter is a powerful open-source penetration testing tool specifically designed to hunt for security weaknesses in Kubernetes clusters. Developed by Aqua Security, this tool performs comprehensive security assessments by simulating the techniques that attackers might use to compromise Kubernetes environments. Kube-hunter operates by scanning Kubernetes clusters from both internal and external perspectives, identifying misconfigurations, exposed services, and potential attack vectors that could be exploited by malicious actors. The tool's strength lies in its ability to provide actionable security insights that help organizations strengthen their Kubernetes security posture before real attacks occur.
The core functionality of Kube-hunter revolves around its extensive knowledge base of Kubernetes-specific attack techniques and vulnerabilities. The tool performs automated discovery of Kubernetes components including API servers, etcd clusters, kubelet services, and dashboard interfaces, then systematically tests these components for common security issues such as anonymous access, privilege escalation opportunities, and insecure configurations. Kube-hunter's hunting methodology includes testing for exposed sensitive information, weak authentication mechanisms, container escape possibilities, and network segmentation issues that are commonly found in misconfigured Kubernetes deployments.
Kube-hunter's value proposition extends beyond simple vulnerability scanning by providing detailed explanations of discovered issues, their potential impact, and specific remediation guidance. The tool supports multiple execution modes including remote scanning for external assessments, internal scanning for comprehensive cluster analysis, and network scanning for discovering Kubernetes services across network ranges. With its ability to generate detailed reports in multiple formats and integrate into CI/CD pipelines, Kube-hunter has become an essential tool for DevSecOps teams, security professionals, and Kubernetes administrators who need to maintain robust security standards in their container orchestration environments.
Installation
Python Package Installation
Installing Kube-hunter using pip:
bash
# Install Kube-hunter
pip install kube-hunter
# Alternative: Install from source
git clone https://github.com/aquasecurity/kube-hunter.git
cd kube-hunter
pip install -r requirements.txt
python setup.py install
# Verify installation
kube-hunter --help
# Install additional dependencies
pip install requests urllib3 netaddr
# Update to latest version
pip install --upgrade kube-hunter
# Install specific version
pip install kube-hunter==0.6.8
Docker Installation
Running Kube-hunter in Docker containers:
bash
# Run Kube-hunter container for remote scanning
docker run --rm -it aquasec/kube-hunter
# Run with network access for internal scanning
docker run --rm --network host aquasec/kube-hunter --internal
# Run with custom configuration
docker run --rm -v $(pwd)/config:/config aquasec/kube-hunter --config /config/kube-hunter.conf
# Run with output to file
docker run --rm -v $(pwd)/reports:/reports aquasec/kube-hunter --report json --output /reports/kube-hunter-report.json
# Run specific hunting mode
docker run --rm aquasec/kube-hunter --remote 10.0.0.0/24
# Run with custom interface
docker run --rm --network host aquasec/kube-hunter --interface eth0
Kubernetes Deployment
Deploying Kube-hunter as a Kubernetes job:
yaml
# kube-hunter-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: kube-hunter
namespace: security
spec:
template:
metadata:
labels:
app: kube-hunter
spec:
restartPolicy: Never
serviceAccountName: kube-hunter
containers:
- name: kube-hunter
image: aquasec/kube-hunter
command: ["kube-hunter"]
args: ["--pod", "--report", "json", "--log", "INFO"]
volumeMounts:
- name: output
mountPath: /tmp
env:
- name: KUBERNETES_SERVICE_HOST
value: "kubernetes.default.svc.cluster.local"
- name: KUBERNETES_SERVICE_PORT
value: "443"
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
volumes:
- name: output
emptyDir: {}
backoffLimit: 1
---
# ServiceAccount for Kube-hunter
apiVersion: v1
kind: ServiceAccount
metadata:
name: kube-hunter
namespace: security
---
# ClusterRole for Kube-hunter
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kube-hunter
rules:
- apiGroups: [""]
resources: ["pods", "services", "nodes"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list"]
- apiGroups: ["extensions"]
resources: ["deployments"]
verbs: ["get", "list"]
---
# ClusterRoleBinding for Kube-hunter
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kube-hunter
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kube-hunter
subjects:
- kind: ServiceAccount
name: kube-hunter
namespace: security
Virtual Environment Setup
bash
# Create virtual environment
python3 -m venv kube-hunter-env
source kube-hunter-env/bin/activate
# Install Kube-hunter
pip install kube-hunter
# Install additional tools
pip install kubernetes pyyaml
# Create configuration directory
mkdir -p ~/.kube-hunter
# Verify setup
kube-hunter --version
# Create wrapper script
cat > /usr/local/bin/kube-hunt << 'EOF'
#!/bin/bash
source /path/to/kube-hunter-env/bin/activate
kube-hunter "$@"
EOF
chmod +x /usr/local/bin/kube-hunt
Basic Usage
Remote Scanning
Scanning Kubernetes clusters from external perspective:
bash
# Basic remote scan
kube-hunter --remote
# Scan specific IP address
kube-hunter --remote 10.0.0.100
# Scan IP range
kube-hunter --remote 10.0.0.0/24
# Scan multiple targets
kube-hunter --remote 10.0.0.100,10.0.0.101,10.0.0.102
# Scan with custom ports
kube-hunter --remote 10.0.0.100 --port 6443,8080,10250
# Quick scan mode
kube-hunter --remote 10.0.0.100 --quick
# Verbose output
kube-hunter --remote 10.0.0.100 --log DEBUG
# Save results to file
kube-hunter --remote 10.0.0.100 --report json --output remote-scan.json
Internal Scanning
Scanning from within the Kubernetes cluster:
bash
# Internal cluster scan
kube-hunter --internal
# Pod-based scanning
kube-hunter --pod
# Scan with specific interface
kube-hunter --internal --interface eth0
# Internal scan with network discovery
kube-hunter --internal --network-scan
# Scan specific services
kube-hunter --internal --services api-server,kubelet
# Internal scan with custom timeout
kube-hunter --internal --timeout 30
# Scan with authentication
kube-hunter --internal --kubeconfig ~/.kube/config
# Internal scan with specific namespace
kube-hunter --internal --namespace kube-system
Network Scanning
Discovering Kubernetes services across networks:
bash
# Network discovery scan
kube-hunter --network 10.0.0.0/24
# Scan multiple networks
kube-hunter --network 10.0.0.0/24,192.168.1.0/24
# Network scan with port range
kube-hunter --network 10.0.0.0/24 --port-range 1-65535
# Fast network scan
kube-hunter --network 10.0.0.0/24 --quick
# Network scan with custom threads
kube-hunter --network 10.0.0.0/24 --threads 50
# Scan with service detection
kube-hunter --network 10.0.0.0/24 --detect-services
# Network scan with output
kube-hunter --network 10.0.0.0/24 --report json --output network-scan.json
Advanced Features
Custom Hunting Modules
Creating and using custom hunting modules:
python
# custom_hunter.py - Custom Kube-hunter module
from kube_hunter.core.types import Hunter, KubernetesCluster
from kube_hunter.core.events import handler
class CustomSecurityHunter(Hunter):
"""Custom security hunter for specific vulnerabilities"""
def __init__(self, event):
self.event = event
@handler.subscribe(KubernetesCluster)
def hunt_custom_vulnerabilities(self, event):
"""Hunt for custom security vulnerabilities"""
# Custom vulnerability detection logic
self.check_custom_misconfigurations(event)
self.check_exposed_secrets(event)
self.check_privilege_escalation(event)
def check_custom_misconfigurations(self, event):
"""Check for custom misconfigurations"""
# Implementation for custom checks
pass
def check_exposed_secrets(self, event):
"""Check for exposed secrets"""
# Implementation for secret detection
pass
def check_privilege_escalation(self, event):
"""Check for privilege escalation opportunities"""
# Implementation for privilege escalation checks
pass
# Register the custom hunter
def register_custom_hunters():
"""Register custom hunting modules"""
from kube_hunter.modules import discovery
discovery.hunters.append(CustomSecurityHunter)
bash
# Use custom hunting modules
kube-hunter --internal --custom-modules custom_hunter.py
# Run with multiple custom modules
kube-hunter --internal --custom-modules custom_hunter.py,advanced_checks.py
# Load custom configuration
kube-hunter --internal --config custom-config.yaml
Advanced Configuration
Creating comprehensive Kube-hunter configurations:
yaml
# kube-hunter-config.yaml
hunting:
# Hunting modes
remote: false
internal: true
pod: true
# Network configuration
network_scan: true
interface: "eth0"
timeout: 30
threads: 20
# Target configuration
targets:
- "10.0.0.0/24"
- "kubernetes.default.svc.cluster.local"
ports:
- 6443 # API Server
- 8080 # Insecure API Server
- 10250 # Kubelet
- 10255 # Read-only Kubelet
- 2379 # etcd
- 2380 # etcd peer
- 30000-32767 # NodePort range
# Hunting modules
modules:
enabled:
- "discovery.apiserver"
- "discovery.kubelet"
- "discovery.etcd"
- "discovery.dashboard"
- "hunting.apiserver"
- "hunting.kubelet"
- "hunting.etcd"
disabled:
- "hunting.arp" # Disable ARP hunting in some environments
# Reporting configuration
reporting:
format: "json"
output_file: "/tmp/kube-hunter-report.json"
include_hunter_statistics: true
include_vulnerabilities_details: true
# Logging configuration
logging:
level: "INFO"
file: "/tmp/kube-hunter.log"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Authentication
authentication:
kubeconfig: "~/.kube/config"
service_account_token: "/var/run/secrets/kubernetes.io/serviceaccount/token"
# Custom hunters
custom_hunters:
- "custom_hunter.py"
- "advanced_checks.py"
bash
# Run with configuration file
kube-hunter --config kube-hunter-config.yaml
# Override specific configuration
kube-hunter --config kube-hunter-config.yaml --remote 10.0.0.100
# Validate configuration
kube-hunter --config kube-hunter-config.yaml --validate-config
Automated Hunting
Creating automated hunting workflows:
bash
#!/bin/bash
# automated-kube-hunting.sh - Automated Kubernetes security hunting
# Configuration
CLUSTER_NAME="production"
OUTPUT_DIR="/var/log/kube-hunter"
REPORT_FORMAT="json"
NOTIFICATION_EMAIL="security@company.com"
# Create output directory
mkdir -p "$OUTPUT_DIR"
# Function to run Kube-hunter scan
run_kube_hunter_scan() {
local scan_type="$1"
local target="$2"
local output_file="$3"
echo "Running $scan_type scan..."
case "$scan_type" in
"remote")
kube-hunter --remote "$target" \
--report "$REPORT_FORMAT" \
--output "$output_file" \
--log INFO
;;
"internal")
kube-hunter --internal \
--report "$REPORT_FORMAT" \
--output "$output_file" \
--log INFO
;;
"network")
kube-hunter --network "$target" \
--report "$REPORT_FORMAT" \
--output "$output_file" \
--log INFO
;;
esac
return $?
}
# Function to analyze results
analyze_results() {
local report_file="$1"
if [[ ! -f "$report_file" ]]; then
echo "Report file not found: $report_file"
return 1
fi
# Parse JSON report
local vulnerabilities=$(jq '.vulnerabilities | length' "$report_file")
local high_severity=$(jq '[.vulnerabilities[] | select(.severity == "high")] | length' "$report_file")
local medium_severity=$(jq '[.vulnerabilities[] | select(.severity == "medium")] | length' "$report_file")
echo "Analysis Results:"
echo " Total vulnerabilities: $vulnerabilities"
echo " High severity: $high_severity"
echo " Medium severity: $medium_severity"
# Check thresholds
if [[ "$high_severity" -gt 0 ]]; then
echo "WARNING: High severity vulnerabilities found!"
return 2
elif [[ "$medium_severity" -gt 5 ]]; then
echo "WARNING: Multiple medium severity vulnerabilities found!"
return 1
fi
return 0
}
# Function to send notification
send_notification() {
local subject="$1"
local body="$2"
local report_file="$3"
# Send email notification (requires mail command)
if command -v mail >/dev/null 2>&1; then
echo "$body" | mail -s "$subject" -a "$report_file" "$NOTIFICATION_EMAIL"
fi
# Send to webhook (if configured)
if [[ -n "$WEBHOOK_URL" ]]; then
curl -X POST "$WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d "{\"subject\": \"$subject\", \"body\": \"$body\"}"
fi
}
# Main execution
main() {
echo "Starting automated Kube-hunter security assessment for $CLUSTER_NAME"
local timestamp=$(date +%Y%m%d-%H%M%S)
local report_file="$OUTPUT_DIR/kube-hunter-$CLUSTER_NAME-$timestamp.json"
# Run internal scan
if run_kube_hunter_scan "internal" "" "$report_file"; then
echo "Kube-hunter scan completed successfully"
# Analyze results
analyze_results "$report_file"
local analysis_result=$?
case "$analysis_result" in
0)
echo "No critical security issues found"
send_notification "Kube-hunter Report: $CLUSTER_NAME - Clean" \
"Kube-hunter scan completed with no critical issues." \
"$report_file"
;;
1)
echo "Medium severity issues found"
send_notification "Kube-hunter Report: $CLUSTER_NAME - Medium Issues" \
"Kube-hunter scan found medium severity security issues." \
"$report_file"
;;
2)
echo "High severity issues found!"
send_notification "Kube-hunter ALERT: $CLUSTER_NAME - High Severity Issues" \
"ALERT: Kube-hunter scan found high severity security vulnerabilities!" \
"$report_file"
;;
esac
else
echo "Kube-hunter scan failed"
send_notification "Kube-hunter Error: $CLUSTER_NAME" \
"Kube-hunter scan failed to complete." \
""
fi
# Cleanup old reports (keep last 30 days)
find "$OUTPUT_DIR" -name "kube-hunter-*.json" -mtime +30 -delete
echo "Automated hunting completed"
}
# Execute main function
main "$@"
Automation Scripts
Comprehensive Security Assessment
python
#!/usr/bin/env python3
# Comprehensive Kubernetes security assessment with Kube-hunter
import subprocess
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class KubeHunterAssessment:
def __init__(self, config_file=None):
self.config_file = config_file
self.load_configuration()
self.setup_logging()
def load_configuration(self):
"""Load assessment configuration"""
default_config = {
"scanning": {
"modes": ["internal", "remote"],
"targets": [],
"timeout": 300,
"threads": 20
},
"reporting": {
"format": "json",
"output_dir": "/tmp/kube-hunter-reports",
"include_statistics": True
},
"notifications": {
"email": {
"enabled": False,
"smtp_server": "localhost",
"smtp_port": 587,
"username": "",
"password": "",
"from": "kube-hunter@example.com",
"to": "security@example.com"
},
"webhook": {
"enabled": False,
"url": "",
"headers": {}
}
},
"thresholds": {
"high_severity_max": 0,
"medium_severity_max": 5,
"total_vulnerabilities_max": 10
}
}
if self.config_file and os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
import yaml
user_config = yaml.safe_load(f)
# Merge configurations
self.config = {**default_config, **user_config}
else:
self.config = default_config
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('kube-hunter-assessment.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def run_kube_hunter(self, mode, target=None, output_file=None):
"""Run Kube-hunter scan"""
if not output_file:
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
output_file = f"kube-hunter-{mode}-{timestamp}.json"
# Build command
cmd = ["kube-hunter"]
if mode == "remote" and target:
cmd.extend(["--remote", target])
elif mode == "internal":
cmd.append("--internal")
elif mode == "pod":
cmd.append("--pod")
elif mode == "network" and target:
cmd.extend(["--network", target])
# Add common options
cmd.extend([
"--report", "json",
"--output", output_file,
"--log", "INFO"
])
# Add timeout if configured
timeout = self.config.get("scanning", {}).get("timeout", 300)
try:
self.logger.info(f"Running Kube-hunter in {mode} mode...")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode == 0:
self.logger.info(f"Kube-hunter scan completed: {output_file}")
# Load and return results
with open(output_file, 'r') as f:
scan_results = json.load(f)
return scan_results, output_file
else:
self.logger.error(f"Kube-hunter scan failed: {result.stderr}")
return None, None
except subprocess.TimeoutExpired:
self.logger.error(f"Kube-hunter scan timed out after {timeout} seconds")
return None, None
except Exception as e:
self.logger.error(f"Error running Kube-hunter: {e}")
return None, None
def analyze_vulnerabilities(self, scan_results):
"""Analyze scan results for vulnerabilities"""
if not scan_results:
return None
vulnerabilities = scan_results.get("vulnerabilities", [])
analysis = {
"timestamp": datetime.now().isoformat(),
"total_vulnerabilities": len(vulnerabilities),
"severity_breakdown": {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
},
"category_breakdown": {},
"critical_vulnerabilities": [],
"high_vulnerabilities": [],
"recommendations": []
}
# Analyze each vulnerability
for vuln in vulnerabilities:
severity = vuln.get("severity", "unknown").lower()
category = vuln.get("category", "unknown")
# Count by severity
if severity in analysis["severity_breakdown"]:
analysis["severity_breakdown"][severity] += 1
# Count by category
if category not in analysis["category_breakdown"]:
analysis["category_breakdown"][category] = 0
analysis["category_breakdown"][category] += 1
# Collect critical and high severity vulnerabilities
if severity == "critical":
analysis["critical_vulnerabilities"].append(vuln)
elif severity == "high":
analysis["high_vulnerabilities"].append(vuln)
# Generate recommendations
analysis["recommendations"] = self.generate_recommendations(analysis)
return analysis
def generate_recommendations(self, analysis):
"""Generate security recommendations based on analysis"""
recommendations = []
# Critical vulnerabilities
if analysis["critical_vulnerabilities"]:
recommendations.append({
"priority": "CRITICAL",
"title": "Address Critical Vulnerabilities Immediately",
"description": f"Found {len(analysis['critical_vulnerabilities'])} critical vulnerabilities that require immediate attention.",
"action": "Review and remediate all critical vulnerabilities within 24 hours."
})
# High severity vulnerabilities
if analysis["high_vulnerabilities"]:
recommendations.append({
"priority": "HIGH",
"title": "Remediate High Severity Issues",
"description": f"Found {len(analysis['high_vulnerabilities'])} high severity vulnerabilities.",
"action": "Plan remediation for high severity vulnerabilities within 1 week."
})
# Category-specific recommendations
categories = analysis["category_breakdown"]
if categories.get("Access Risk", 0) > 0:
recommendations.append({
"priority": "HIGH",
"title": "Review Access Controls",
"description": "Access control vulnerabilities detected.",
"action": "Review and strengthen RBAC policies and authentication mechanisms."
})
if categories.get("Information Disclosure", 0) > 0:
recommendations.append({
"priority": "MEDIUM",
"title": "Prevent Information Disclosure",
"description": "Information disclosure vulnerabilities found.",
"action": "Review exposed services and implement proper access controls."
})
if categories.get("Remote Code Execution", 0) > 0:
recommendations.append({
"priority": "CRITICAL",
"title": "Address Remote Code Execution Risks",
"description": "Remote code execution vulnerabilities detected.",
"action": "Immediately patch or isolate affected components."
})
return recommendations
def check_thresholds(self, analysis):
"""Check if vulnerabilities exceed configured thresholds"""
thresholds = self.config.get("thresholds", {})
high_severity_count = analysis["severity_breakdown"]["high"] + analysis["severity_breakdown"]["critical"]
medium_severity_count = analysis["severity_breakdown"]["medium"]
total_vulnerabilities = analysis["total_vulnerabilities"]
threshold_violations = []
# Check high severity threshold
high_max = thresholds.get("high_severity_max", 0)
if high_severity_count > high_max:
threshold_violations.append(f"High/Critical severity: {high_severity_count} > {high_max}")
# Check medium severity threshold
medium_max = thresholds.get("medium_severity_max", 5)
if medium_severity_count > medium_max:
threshold_violations.append(f"Medium severity: {medium_severity_count} > {medium_max}")
# Check total vulnerabilities threshold
total_max = thresholds.get("total_vulnerabilities_max", 10)
if total_vulnerabilities > total_max:
threshold_violations.append(f"Total vulnerabilities: {total_vulnerabilities} > {total_max}")
return threshold_violations
def generate_report(self, analysis, scan_results, output_file):
"""Generate comprehensive security report"""
report = {
"metadata": {
"generated_at": datetime.now().isoformat(),
"tool": "Kube-hunter",
"assessment_type": "Kubernetes Security Assessment"
},
"executive_summary": {
"total_vulnerabilities": analysis["total_vulnerabilities"],
"critical_count": analysis["severity_breakdown"]["critical"],
"high_count": analysis["severity_breakdown"]["high"],
"medium_count": analysis["severity_breakdown"]["medium"],
"low_count": analysis["severity_breakdown"]["low"],
"risk_level": self.calculate_risk_level(analysis)
},
"detailed_analysis": analysis,
"raw_results": scan_results,
"recommendations": analysis["recommendations"]
}
# Save report
report_file = output_file.replace('.json', '-report.json')
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
self.logger.info(f"Comprehensive report generated: {report_file}")
return report, report_file
def calculate_risk_level(self, analysis):
"""Calculate overall risk level"""
critical_count = analysis["severity_breakdown"]["critical"]
high_count = analysis["severity_breakdown"]["high"]
medium_count = analysis["severity_breakdown"]["medium"]
if critical_count > 0:
return "CRITICAL"
elif high_count > 3:
return "HIGH"
elif high_count > 0 or medium_count > 5:
return "MEDIUM"
elif medium_count > 0:
return "LOW"
else:
return "MINIMAL"
def send_notifications(self, analysis, report_file, threshold_violations):
"""Send notifications about assessment results"""
# Prepare notification content
subject = "Kube-hunter Security Assessment Results"
if threshold_violations:
subject += " - ALERT"
body = self.generate_notification_body(analysis, threshold_violations)
# Send email notification
if self.config.get("notifications", {}).get("email", {}).get("enabled", False):
self.send_email_notification(subject, body, report_file)
# Send webhook notification
if self.config.get("notifications", {}).get("webhook", {}).get("enabled", False):
self.send_webhook_notification(analysis, threshold_violations)
def generate_notification_body(self, analysis, threshold_violations):
"""Generate notification message body"""
body = f"""
Kube-hunter Security Assessment Results
Generated: {analysis['timestamp']}
EXECUTIVE SUMMARY:
==================
Total Vulnerabilities: {analysis['total_vulnerabilities']}
Critical: {analysis['severity_breakdown']['critical']}
High: {analysis['severity_breakdown']['high']}
Medium: {analysis['severity_breakdown']['medium']}
Low: {analysis['severity_breakdown']['low']}
"""
if threshold_violations:
body += "⚠️ THRESHOLD VIOLATIONS:\n"
for violation in threshold_violations:
body += f" - {violation}\n"
body += "\n"
# Add critical vulnerabilities
if analysis["critical_vulnerabilities"]:
body += "CRITICAL VULNERABILITIES:\n"
body += "========================\n"
for vuln in analysis["critical_vulnerabilities"][:3]:
body += f"- {vuln.get('vulnerability', 'Unknown')}\n"
body += f" Description: {vuln.get('description', 'No description')}\n"
body += f" Evidence: {vuln.get('evidence', 'No evidence')}\n\n"
# Add recommendations
if analysis["recommendations"]:
body += "TOP RECOMMENDATIONS:\n"
body += "===================\n"
for rec in analysis["recommendations"][:3]:
body += f"- [{rec['priority']}] {rec['title']}\n"
body += f" {rec['action']}\n\n"
return body
def send_email_notification(self, subject, body, attachment_file):
"""Send email notification"""
email_config = self.config.get("notifications", {}).get("email", {})
try:
msg = MIMEMultipart()
msg['From'] = email_config["from"]
msg['To'] = email_config["to"]
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# Attach report file
if attachment_file and os.path.exists(attachment_file):
with open(attachment_file, 'r') as f:
attachment = MIMEText(f.read())
attachment.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(attachment_file)}"')
msg.attach(attachment)
server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
server.starttls()
if email_config.get("username") and email_config.get("password"):
server.login(email_config["username"], email_config["password"])
text = msg.as_string()
server.sendmail(email_config["from"], email_config["to"], text)
server.quit()
self.logger.info("Email notification sent successfully")
except Exception as e:
self.logger.error(f"Failed to send email notification: {e}")
def send_webhook_notification(self, analysis, threshold_violations):
"""Send webhook notification"""
webhook_config = self.config.get("notifications", {}).get("webhook", {})
try:
import requests
payload = {
"timestamp": analysis["timestamp"],
"alert": bool(threshold_violations),
"threshold_violations": threshold_violations,
"summary": {
"total_vulnerabilities": analysis["total_vulnerabilities"],
"critical": analysis["severity_breakdown"]["critical"],
"high": analysis["severity_breakdown"]["high"],
"medium": analysis["severity_breakdown"]["medium"]
},
"recommendations": analysis["recommendations"][:3]
}
headers = webhook_config.get("headers", {})
headers["Content-Type"] = "application/json"
response = requests.post(
webhook_config["url"],
json=payload,
headers=headers,
timeout=30
)
if response.status_code == 200:
self.logger.info("Webhook notification sent successfully")
else:
self.logger.error(f"Webhook notification failed: {response.status_code}")
except Exception as e:
self.logger.error(f"Failed to send webhook notification: {e}")
def run_comprehensive_assessment(self, modes=None, targets=None):
"""Run comprehensive security assessment"""
if not modes:
modes = self.config.get("scanning", {}).get("modes", ["internal"])
if not targets:
targets = self.config.get("scanning", {}).get("targets", [])
self.logger.info("Starting comprehensive Kube-hunter security assessment")
all_results = []
all_analyses = []
# Run scans for each mode
for mode in modes:
if mode in ["remote", "network"] and targets:
for target in targets:
scan_results, output_file = self.run_kube_hunter(mode, target)
if scan_results:
analysis = self.analyze_vulnerabilities(scan_results)
if analysis:
all_results.append((scan_results, output_file))
all_analyses.append(analysis)
else:
scan_results, output_file = self.run_kube_hunter(mode)
if scan_results:
analysis = self.analyze_vulnerabilities(scan_results)
if analysis:
all_results.append((scan_results, output_file))
all_analyses.append(analysis)
if not all_analyses:
self.logger.error("No successful scans completed")
return False
# Combine analyses
combined_analysis = self.combine_analyses(all_analyses)
# Check thresholds
threshold_violations = self.check_thresholds(combined_analysis)
# Generate comprehensive report
combined_results = {"scans": [result[0] for result in all_results]}
report, report_file = self.generate_report(combined_analysis, combined_results, "combined-assessment.json")
# Send notifications
self.send_notifications(combined_analysis, report_file, threshold_violations)
# Log summary
self.logger.info(f"Assessment completed. Total vulnerabilities: {combined_analysis['total_vulnerabilities']}")
if threshold_violations:
self.logger.warning(f"Threshold violations: {threshold_violations}")
return True
def combine_analyses(self, analyses):
"""Combine multiple scan analyses"""
combined = {
"timestamp": datetime.now().isoformat(),
"total_vulnerabilities": 0,
"severity_breakdown": {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
},
"category_breakdown": {},
"critical_vulnerabilities": [],
"high_vulnerabilities": [],
"recommendations": []
}
# Combine all analyses
for analysis in analyses:
combined["total_vulnerabilities"] += analysis["total_vulnerabilities"]
# Combine severity counts
for severity, count in analysis["severity_breakdown"].items():
combined["severity_breakdown"][severity] += count
# Combine category counts
for category, count in analysis["category_breakdown"].items():
if category not in combined["category_breakdown"]:
combined["category_breakdown"][category] = 0
combined["category_breakdown"][category] += count
# Combine vulnerabilities
combined["critical_vulnerabilities"].extend(analysis["critical_vulnerabilities"])
combined["high_vulnerabilities"].extend(analysis["high_vulnerabilities"])
# Generate combined recommendations
combined["recommendations"] = self.generate_recommendations(combined)
return combined
# Usage
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Comprehensive Kube-hunter Security Assessment")
parser.add_argument("--config", help="Configuration file path")
parser.add_argument("--modes", nargs="+", choices=["internal", "remote", "pod", "network"],
default=["internal"], help="Scanning modes")
parser.add_argument("--targets", nargs="+", help="Targets for remote/network scanning")
args = parser.parse_args()
assessment = KubeHunterAssessment(args.config)
success = assessment.run_comprehensive_assessment(args.modes, args.targets)
sys.exit(0 if success else 1)
CI/CD Integration Script
python
#!/usr/bin/env python3
# Kube-hunter CI/CD integration
import subprocess
import json
import sys
import os
class KubeHunterCICD:
def __init__(self, fail_on_high=True, fail_on_medium=False):
self.fail_on_high = fail_on_high
self.fail_on_medium = fail_on_medium
def run_security_scan(self, mode="internal", target=None):
"""Run Kube-hunter security scan in CI/CD"""
cmd = ["kube-hunter", "--report", "json"]
if mode == "remote" and target:
cmd.extend(["--remote", target])
elif mode == "internal":
cmd.append("--internal")
elif mode == "pod":
cmd.append("--pod")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
return json.loads(result.stdout)
else:
print(f"Kube-hunter scan failed: {result.stderr}")
return None
except Exception as e:
print(f"Error running Kube-hunter: {e}")
return None
def evaluate_results(self, scan_results):
"""Evaluate scan results for CI/CD pipeline"""
if not scan_results:
return False, "Scan failed"
vulnerabilities = scan_results.get("vulnerabilities", [])
high_count = len([v for v in vulnerabilities if v.get("severity") == "high"])
medium_count = len([v for v in vulnerabilities if v.get("severity") == "medium"])
critical_count = len([v for v in vulnerabilities if v.get("severity") == "critical"])
# Check failure conditions
if critical_count > 0:
return False, f"Critical vulnerabilities found: {critical_count}"
if self.fail_on_high and high_count > 0:
return False, f"High severity vulnerabilities found: {high_count}"
if self.fail_on_medium and medium_count > 0:
return False, f"Medium severity vulnerabilities found: {medium_count}"
return True, f"Security scan passed. Vulnerabilities: {len(vulnerabilities)}"
def generate_ci_report(self, scan_results, output_file="kube-hunter-ci-report.json"):
"""Generate CI/CD friendly report"""
if not scan_results:
return None
vulnerabilities = scan_results.get("vulnerabilities", [])
report = {
"summary": {
"total_vulnerabilities": len(vulnerabilities),
"critical": len([v for v in vulnerabilities if v.get("severity") == "critical"]),
"high": len([v for v in vulnerabilities if v.get("severity") == "high"]),
"medium": len([v for v in vulnerabilities if v.get("severity") == "medium"]),
"low": len([v for v in vulnerabilities if v.get("severity") == "low"])
},
"vulnerabilities": vulnerabilities,
"scan_metadata": scan_results.get("metadata", {})
}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
return report
# Usage in CI/CD
if __name__ == "__main__":
cicd = KubeHunterCICD(fail_on_high=True, fail_on_medium=False)
# Run scan
results = cicd.run_security_scan("internal")
# Evaluate results
passed, message = cicd.evaluate_results(results)
# Generate report
cicd.generate_ci_report(results)
print(message)
sys.exit(0 if passed else 1)
Integration Examples
Prometheus Monitoring
yaml
# kube-hunter-exporter.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kube-hunter-exporter
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: kube-hunter-exporter
template:
metadata:
labels:
app: kube-hunter-exporter
spec:
serviceAccountName: kube-hunter-exporter
containers:
- name: kube-hunter-exporter
image: custom/kube-hunter-exporter:latest
ports:
- containerPort: 8080
env:
- name: SCAN_INTERVAL
value: "3600" # 1 hour
- name: METRICS_PORT
value: "8080"
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
---
apiVersion: v1
kind: Service
metadata:
name: kube-hunter-exporter
namespace: monitoring
labels:
app: kube-hunter-exporter
spec:
ports:
- port: 8080
targetPort: 8080
name: metrics
selector:
app: kube-hunter-exporter
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: kube-hunter-exporter
namespace: monitoring
spec:
selector:
matchLabels:
app: kube-hunter-exporter
endpoints:
- port: metrics
interval: 60s
path: /metrics
Grafana Dashboard
json
{
"dashboard": {
"title": "Kube-hunter Security Dashboard",
"panels": [
{
"title": "Total Vulnerabilities",
"type": "stat",
"targets": [
{
"expr": "kube_hunter_vulnerabilities_total",
"legendFormat": "Total Vulnerabilities"
}
]
},
{
"title": "Vulnerabilities by Severity",
"type": "piechart",
"targets": [
{
"expr": "kube_hunter_vulnerabilities_by_severity",
"legendFormat": "{{severity}}"
}
]
},
{
"title": "Security Scan History",
"type": "graph",
"targets": [
{
"expr": "kube_hunter_scan_duration_seconds",
"legendFormat": "Scan Duration"
}
]
}
]
}
}
Troubleshooting
Common Issues
Permission Problems:
bash
# Check Kubernetes access
kubectl auth can-i get pods
kubectl auth can-i list services
# Verify service account permissions
kubectl describe serviceaccount kube-hunter -n security
kubectl describe clusterrolebinding kube-hunter
# Check pod security policies
kubectl get psp
kubectl describe psp restricted
# Test network connectivity
kubectl run test-pod --image=busybox --rm -it -- nslookup kubernetes.default.svc.cluster.local
Network Connectivity Issues:
bash
# Check cluster networking
kubectl get nodes -o wide
kubectl get services -A
# Test API server connectivity
curl -k https://kubernetes.default.svc.cluster.local:443/version
# Check DNS resolution
nslookup kubernetes.default.svc.cluster.local
# Verify network policies
kubectl get networkpolicies -A
kubectl describe networkpolicy -n kube-system
Scanning Problems:
bash
# Run with debug logging
kube-hunter --internal --log DEBUG
# Check for timeouts
kube-hunter --internal --timeout 60
# Test specific components
kube-hunter --internal --services api-server
# Verify output format
kube-hunter --internal --report json | jq '.'
# Check for missing dependencies
pip list | grep kube-hunter
pip install --upgrade kube-hunter
Performance Optimization
Optimizing Kube-hunter performance:
bash
# Reduce scan scope
kube-hunter --internal --quick
# Limit thread count
kube-hunter --internal --threads 10
# Use specific interface
kube-hunter --internal --interface eth0
# Skip slow checks
kube-hunter --internal --skip-slow-checks
# Optimize for CI/CD
kube-hunter --pod --report json --timeout 120
Security Considerations
Safe Usage Practices
Scanning Ethics:
- Only scan clusters you own or have explicit permission to test
- Coordinate with operations teams before running scans
- Use read-only scanning modes when possible
- Implement proper access controls for scan results
- Regular updates of Kube-hunter and its dependencies
Data Protection:
- Encrypt sensitive scan results and reports
- Implement secure storage for vulnerability data
- Use secure channels for transmitting reports
- Regular cleanup of temporary files and logs
- Implement data retention policies for compliance
Operational Security
Monitoring and Alerting:
- Monitor Kube-hunter execution and results
- Set up alerting for critical security findings
- Track vulnerability trends over time
- Implement automated remediation workflows
- Regular review of security configurations
Integration Security:
- Secure CI/CD pipeline integration
- Protect service account tokens and credentials
- Implement proper RBAC for Kubernetes deployments
- Monitor for unauthorized Kube-hunter usage
- Regular security assessment of scanning infrastructure