Kube-hunter hoja de trucos
Overview
Kube-hunter is a powerful open-source pruebas de penetración tool specifically designed to hunt for security weaknesses in Kubernetes clusters. Developed by Aqua Security, this tool performs comprehensive security assessments by simulating the techniques that attackers might use to compromise Kubernetes environments. Kube-hunter operates by scanning Kubernetes clusters from both internal and external perspectives, identifying misconfiguracións, exposed servicios, and potential attack vectors that could be exploited by malicious actors. The tool's strength lies in its ability to provide actionable security insights that help organizations strengthen their Kubernetes security posture before real attacks occur.
The core functionality of Kube-hunter revolves around its extensive knowledge base of Kubernetes-specific attack techniques and vulnerabilities. The tool performs automated discovery of Kubernetes components including API servers, etcd clusters, kubelet servicios, and dashboard interfaces, then systematically tests these components for common security issues such as anonymous access, escalada de privilegios oppuertounities, and insecure configuracións. Kube-hunter's hunting methodology includes testing for exposed sensitive information, weak autenticación mechanisms, container escape possibilities, and network segmentation issues that are commonly found in misconfigured Kubernetes deployments.
Kube-hunter's value proposition extends beyond simple vulnerabilidad scanning by providing detailed explanations of discovered issues, their potential Impacto, and specific remediation guidance. The tool suppuertos multiple execution modes including remote scanning for external assessments, internal scanning for comprehensive cluster analysis, and escaneo de red for discovering Kubernetes servicios across network ranges. With its ability to generate detailed repuertos in multiple formats and integrate into CI/CD pipelines, Kube-hunter has become an essential tool for DevSecOps teams, security professionals, and Kubernetes administrators who need to maintain robust security standards in their container orchestration environments.
instalación
Python Package instalación
Installing Kube-hunter using pip:
# Install Kube-hunter
pip install kube-hunter
# Alternative: Install from source
git clone https://github.com/aquasecurity/kube-hunter.git
cd kube-hunter
pip install -r requirements.txt
python setup.py install
# Verify instalación
kube-hunter --help
# Install additional dependencies
pip install requests urllib3 netaddr
# Update to latest version
pip install --upgrade kube-hunter
# Install specific version
pip install kube-hunter==0.6.8
Docker instalación
Running Kube-hunter in Docker containers:
# Run Kube-hunter container for remote scanning
docker run --rm -it aquasec/kube-hunter
# Run with network access for internal scanning
docker run --rm --network host aquasec/kube-hunter --internal
# Run with custom configuración
docker run --rm -v $(pwd)/config:/config aquasec/kube-hunter --config /config/kube-hunter.conf
# Run with output to file
docker run --rm -v $(pwd)/repuertos:/repuertos aquasec/kube-hunter --repuerto json --output /repuertos/kube-hunter-repuerto.json
# Run specific hunting mode
docker run --rm aquasec/kube-hunter --remote 10.0.0.0/24
# Run with custom interface
docker run --rm --network host aquasec/kube-hunter --interface eth0
Kubernetes Deployment
Deploying Kube-hunter as a Kubernetes job:
# kube-hunter-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: kube-hunter
namespace: security
spec:
template:
metadata:
labels:
app: kube-hunter
spec:
restartPolicy: Never
servicioAccountName: kube-hunter
containers:
- name: kube-hunter
image: aquasec/kube-hunter
comando: ["kube-hunter"]
args: ["--pod", "--repuerto", "json", "--log", "INFO"]
volumeMounts:
- name: output
mountPath: /tmp
env:
- name: KUBERNETES_servicio_host
value: "kubernetes.default.svc.cluster.local"
- name: KUBERNETES_servicio_puerto
value: "443"
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
volumes:
- name: output
emptyDir: \\\\{\\\\}
backoffLimit: 1
---
# servicioAccount for Kube-hunter
apiVersion: v1
kind: servicioAccount
metadata:
name: kube-hunter
namespace: security
---
# ClusterRole for Kube-hunter
apiVersion: rbac.autorización.k8s.io/v1
kind: ClusterRole
metadata:
name: kube-hunter
rules:
- apiGroups: [""]
resources: ["pods", "servicios", "nodes"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list"]
- apiGroups: ["extensions"]
resources: ["deployments"]
verbs: ["get", "list"]
---
# ClusterRoleBinding for Kube-hunter
apiVersion: rbac.autorización.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kube-hunter
roleRef:
apiGroup: rbac.autorización.k8s.io
kind: ClusterRole
name: kube-hunter
subjects:
- kind: servicioAccount
name: kube-hunter
namespace: security
Virtual Environment Setup
# Create virtual environment
python3 -m venv kube-hunter-env
source kube-hunter-env/bin/activate
# Install Kube-hunter
pip install kube-hunter
# Install additional tools
pip install kubernetes pyyaml
# Create configuración directory
mkdir -p ~/.kube-hunter
# Verify setup
kube-hunter --version
# Create wrapper script
cat > /usr/local/bin/kube-hunt ``<< 'EOF'
#!/bin/bash
source /path/to/kube-hunter-env/bin/activate
kube-hunter "$@"
EOF
chmod +x /usr/local/bin/kube-hunt
Basic uso
Remote Scanning
Scanning Kubernetes clusters from external perspective:
# Basic remote scan
kube-hunter --remote
# Scan specific IP address
kube-hunter --remote 10.0.0.100
# Scan IP range
kube-hunter --remote 10.0.0.0/24
# Scan multiple objetivos
kube-hunter --remote 10.0.0.100,10.0.0.101,10.0.0.102
# Scan with custom puertos
kube-hunter --remote 10.0.0.100 --puerto 6443,8080,10250
# Quick scan mode
kube-hunter --remote 10.0.0.100 --quick
# Verbose output
kube-hunter --remote 10.0.0.100 --log DEBUG
# Save results to file
kube-hunter --remote 10.0.0.100 --repuerto json --output remote-scan.json
Internal Scanning
Scanning from within the Kubernetes cluster:
# Internal cluster scan
kube-hunter --internal
# Pod-based scanning
kube-hunter --pod
# Scan with specific interface
kube-hunter --internal --interface eth0
# Internal scan with network discovery
kube-hunter --internal --network-scan
# Scan specific servicios
kube-hunter --internal --servicios api-server,kubelet
# Internal scan with custom timeout
kube-hunter --internal --timeout 30
# Scan with autenticación
kube-hunter --internal --kubeconfig ~/.kube/config
# Internal scan with specific namespace
kube-hunter --internal --namespace kube-system
escaneo de red
Discovering Kubernetes servicios across networks:
# Network discovery scan
kube-hunter --network 10.0.0.0/24
# Scan multiple networks
kube-hunter --network 10.0.0.0/24,192.168.1.0/24
# Network scan with puerto range
kube-hunter --network 10.0.0.0/24 --puerto-range 1-65535
# Fast network scan
kube-hunter --network 10.0.0.0/24 --quick
# Network scan with custom hilos
kube-hunter --network 10.0.0.0/24 --hilos 50
# Scan with servicio detection
kube-hunter --network 10.0.0.0/24 --detect-servicios
# Network scan with output
kube-hunter --network 10.0.0.0/24 --repuerto json --output network-scan.json
Advanced Features
Custom Hunting Modules
Creating and using custom hunting modules:
# custom_hunter.py - Custom Kube-hunter module
from kube_hunter.core.types impuerto Hunter, KubernetesCluster
from kube_hunter.core.events impuerto handler
class CustomSecurityHunter(Hunter):
"""Custom security hunter for specific vulnerabilities"""
def __init__(self, event):
self.event = event
@handler.subscribe(KubernetesCluster)
def hunt_custom_vulnerabilities(self, event):
"""Hunt for custom security vulnerabilities"""
# Custom vulnerabilidad detection logic
self.check_custom_misconfiguracións(event)
self.check_exposed_secrets(event)
self.check_privilege_escalation(event)
def check_custom_misconfiguracións(self, event):
"""Check for custom misconfiguracións"""
# Implementation for custom checks
pass
def check_exposed_secrets(self, event):
"""Check for exposed secrets"""
# Implementation for secret detection
pass
def check_privilege_escalation(self, event):
"""Check for escalada de privilegios oppuertounities"""
# Implementation for escalada de privilegios checks
pass
# Register the custom hunter
def register_custom_hunters():
"""Register custom hunting modules"""
from kube_hunter.modules impuerto discovery
discovery.hunters.append(CustomSecurityHunter)
# Use custom hunting modules
kube-hunter --internal --custom-modules custom_hunter.py
# Run with multiple custom modules
kube-hunter --internal --custom-modules custom_hunter.py,advanced_checks.py
# Load custom configuración
kube-hunter --internal --config custom-config.yaml
Advanced configuración
Creating comprehensive Kube-hunter configuracións:
# kube-hunter-config.yaml
hunting:
# Hunting modes
remote: false
internal: true
pod: true
# Network configuración
network_scan: true
interface: "eth0"
timeout: 30
hilos: 20
# objetivo configuración
objetivos:
- "10.0.0.0/24"
- "kubernetes.default.svc.cluster.local"
puertos:
- 6443 # API Server
- 8080 # Insecure API Server
- 10250 # Kubelet
- 10255 # Read-only Kubelet
- 2379 # etcd
- 2380 # etcd peer
- 30000-32767 # Nodepuerto range
# Hunting modules
modules:
enabled:
- "discovery.apiserver"
- "discovery.kubelet"
- "discovery.etcd"
- "discovery.dashboard"
- "hunting.apiserver"
- "hunting.kubelet"
- "hunting.etcd"
disabled:
- "hunting.arp" # Disable ARP hunting in some environments
# Repuertoing configuración
repuertoing:
format: "json"
output_file: "/tmp/kube-hunter-repuerto.json"
include_hunter_statistics: true
include_vulnerabilities_details: true
# Logging configuración
logging:
level: "INFO"
file: "/tmp/kube-hunter.log"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# autenticación
autenticación:
kubeconfig: "~/.kube/config"
servicio_account_token: "/var/run/secrets/kubernetes.io/servicioaccount/token"
# Custom hunters
custom_hunters:
- "custom_hunter.py"
- "advanced_checks.py"
# Run with configuración file
kube-hunter --config kube-hunter-config.yaml
# Override specific configuración
kube-hunter --config kube-hunter-config.yaml --remote 10.0.0.100
# Validate configuración
kube-hunter --config kube-hunter-config.yaml --validate-config
Automated Hunting
Creating automated hunting workflows:
#!/bin/bash
# automated-kube-hunting.sh - Automated Kubernetes security hunting
# configuración
CLUSTER_NAME="production"
OUTPUT_DIR="/var/log/kube-hunter"
REpuerto_FORMAT="json"
NOTIFICATION_EMAIL="security@company.com"
# Create output directory
mkdir -p "$OUTPUT_DIR"
# Function to run Kube-hunter scan
run_kube_hunter_scan() \\\{
local scan_type="$1"
local objetivo="$2"
local output_file="$3"
echo "Running $scan_type scan..."
case "$scan_type" in
"remote")
kube-hunter --remote "$objetivo" \
--repuerto "$REpuerto_FORMAT" \
--output "$output_file" \
--log INFO
;;
"internal")
kube-hunter --internal \
--repuerto "$REpuerto_FORMAT" \
--output "$output_file" \
--log INFO
;;
"network")
kube-hunter --network "$objetivo" \
--repuerto "$REpuerto_FORMAT" \
--output "$output_file" \
--log INFO
;;
esac
return $?
\\\}
# Function to analyze results
analyze_results() \\\{
local repuerto_file="$1"
if [[ ! -f "$repuerto_file" ]]; then
echo "Repuerto file not found: $repuerto_file"
return 1
fi
# Parse JSON repuerto
local vulnerabilities=$(jq '.vulnerabilities|length' "$repuerto_file")
| local high_severity=$(jq '[.vulnerabilities[] | select(.severity == "high")] | length' "$repuerto_file") |
| local medium_severity=$(jq '[.vulnerabilities[] | select(.severity == "medium")] | length' "$repuerto_file") |
echo "Analysis Results:"
echo " Total vulnerabilities: $vulnerabilities"
echo " High severity: $high_severity"
echo " Medium severity: $medium_severity"
# Check thresholds
if [[ "$high_severity" -gt 0 ]]; then
echo "WARNING: High severity vulnerabilities found!"
return 2
elif [[ "$medium_severity" -gt 5 ]]; then
echo "WARNING: Multiple medium severity vulnerabilities found!"
return 1
fi
return 0
\\\}
# Function to send notification
send_notification() \\\{
local subject="$1"
local body="$2"
local repuerto_file="$3"
# Send email notification (requires mail comando)
if comando -v mail >``/dev/null 2>&1; then
echo "$body"|mail -s "$subject" -a "$repuerto_file" "$NOTIFICATION_EMAIL"
fi
# Send to webhook (if configured)
if [[ -n "$WEBHOOK_URL" ]]; then
curl -X POST "$WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d "\\\\{\"subject\": \"$subject\", \"body\": \"$body\"\\\\}"
fi
\\\\}
# Main execution
main() \\\\{
echo "Starting automated Kube-hunter security assessment for $CLUSTER_NAME"
local timestamp=$(date +%Y%m%d-%H%M%S)
local repuerto_file="$OUTPUT_DIR/kube-hunter-$CLUSTER_NAME-$timestamp.json"
# Run internal scan
if run_kube_hunter_scan "internal" "" "$repuerto_file"; then
echo "Kube-hunter scan completed successfully"
# Analyze results
analyze_results "$repuerto_file"
local analysis_result=$?
case "$analysis_result" in
0)
echo "No critical security issues found"
send_notification "Kube-hunter Repuerto: $CLUSTER_NAME - Clean" \
"Kube-hunter scan completed with no critical issues." \
"$repuerto_file"
;;
1)
echo "Medium severity issues found"
send_notification "Kube-hunter Repuerto: $CLUSTER_NAME - Medium Issues" \
"Kube-hunter scan found medium severity security issues." \
"$repuerto_file"
;;
2)
echo "High severity issues found!"
send_notification "Kube-hunter ALERT: $CLUSTER_NAME - High Severity Issues" \
"ALERT: Kube-hunter scan found high severity security vulnerabilities!" \
"$repuerto_file"
;;
esac
else
echo "Kube-hunter scan failed"
send_notification "Kube-hunter Error: $CLUSTER_NAME" \
"Kube-hunter scan failed to complete." \
""
fi
# Cleanup old repuertos (keep last 30 days)
find "$OUTPUT_DIR" -name "kube-hunter-*.json" -mtime +30 -delete
echo "Automated hunting completed"
\\\\}
# Execute main function
main "$@"
Automation Scripts
Comprehensive Security Assessment
#!/usr/bin/env python3
# Comprehensive Kubernetes security assessment with Kube-hunter
impuerto subproceso
impuerto json
impuerto os
impuerto sys
impuerto argparse
from datetime impuerto datetime, timedelta
impuerto logging
impuerto smtplib
from email.mime.text impuerto MIMEText
from email.mime.multipart impuerto MIMEMultipart
class KubeHunterAssessment:
def __init__(self, config_file=None):
self.config_file = config_file
self.load_configuración()
self.setup_logging()
def load_configuración(self):
"""Load assessment configuración"""
default_config = \\\\{
"scanning": \\\\{
"modes": ["internal", "remote"],
"objetivos": [],
"timeout": 300,
"hilos": 20
\\\\},
"repuertoing": \\\\{
"format": "json",
"output_dir": "/tmp/kube-hunter-repuertos",
"include_statistics": True
\\\\},
"notifications": \\\\{
"email": \\\\{
"enabled": False,
"smtp_server": "localhost",
"smtp_puerto": 587,
"nombre de usuario": "",
"contraseña": "",
"from": "kube-hunter@ejemplo.com",
"to": "security@ejemplo.com"
\\\\},
"webhook": \\\\{
"enabled": False,
"url": "",
"headers": \\\\{\\\\}
\\\\}
\\\\},
"thresholds": \\\\{
"high_severity_max": 0,
"medium_severity_max": 5,
"total_vulnerabilities_max": 10
\\\\}
\\\\}
if self.config_file and os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
impuerto yaml
user_config = yaml.safe_load(f)
# Merge configuracións
self.config = \\\\{**default_config, **user_config\\\\}
else:
self.config = default_config
def setup_logging(self):
"""Setup logging configuración"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('kube-hunter-assessment.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def run_kube_hunter(self, mode, objetivo=None, output_file=None):
"""Run Kube-hunter scan"""
if not output_file:
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
output_file = f"kube-hunter-\\\\{mode\\\\}-\\\\{timestamp\\\\}.json"
# Build comando
cmd = ["kube-hunter"]
if mode == "remote" and objetivo:
cmd.extend(["--remote", objetivo])
elif mode == "internal":
cmd.append("--internal")
elif mode == "pod":
cmd.append("--pod")
elif mode == "network" and objetivo:
cmd.extend(["--network", objetivo])
# Add common opcións
cmd.extend([
"--repuerto", "json",
"--output", output_file,
"--log", "INFO"
])
# Add timeout if configured
timeout = self.config.get("scanning", \\\\{\\\\}).get("timeout", 300)
try:
self.logger.info(f"Running Kube-hunter in \\\\{mode\\\\} mode...")
result = subproceso.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode == 0:
self.logger.info(f"Kube-hunter scan completed: \\\\{output_file\\\\}")
# Load and return results
with open(output_file, 'r') as f:
scan_results = json.load(f)
return scan_results, output_file
else:
self.logger.error(f"Kube-hunter scan failed: \\\\{result.stderr\\\\}")
return None, None
except subproceso.TimeoutExpired:
self.logger.error(f"Kube-hunter scan timed out after \\\\{timeout\\\\} seconds")
return None, None
except Exception as e:
self.logger.error(f"Error running Kube-hunter: \\\\{e\\\\}")
return None, None
def analyze_vulnerabilities(self, scan_results):
"""Analyze scan results for vulnerabilities"""
if not scan_results:
return None
vulnerabilities = scan_results.get("vulnerabilities", [])
analysis = \\\\{
"timestamp": datetime.now().isoformat(),
"total_vulnerabilities": len(vulnerabilities),
"severity_breakdown": \\\\{
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
\\\\},
"category_breakdown": \\\\{\\\\},
"critical_vulnerabilities": [],
"high_vulnerabilities": [],
"recommendations": []
\\\\}
# Analyze each vulnerabilidad
for vuln in vulnerabilities:
severity = vuln.get("severity", "unknown").lower()
category = vuln.get("category", "unknown")
# Count by severity
if severity in analysis["severity_breakdown"]:
analysis["severity_breakdown"][severity] += 1
# Count by category
if category not in analysis["category_breakdown"]:
analysis["category_breakdown"][category] = 0
analysis["category_breakdown"][category] += 1
# Collect critical and high severity vulnerabilities
if severity == "critical":
analysis["critical_vulnerabilities"].append(vuln)
elif severity == "high":
analysis["high_vulnerabilities"].append(vuln)
# Generate recommendations
analysis["recommendations"] = self.generate_recommendations(analysis)
return analysis
def generate_recommendations(self, analysis):
"""Generate security recommendations based on analysis"""
recommendations = []
# Critical vulnerabilities
if analysis["critical_vulnerabilities"]:
recommendations.append(\\\\{
"priority": "CRITICAL",
"title": "Address Critical Vulnerabilities Immediately",
"Descripción": f"Found \\\\{len(analysis['critical_vulnerabilities'])\\\\} critical vulnerabilities that require immediate attention.",
"action": "Review and remediate all critical vulnerabilities within 24 hours."
\\\\})
# High severity vulnerabilities
if analysis["high_vulnerabilities"]:
recommendations.append(\\\\{
"priority": "HIGH",
"title": "Remediate High Severity Issues",
"Descripción": f"Found \\\\{len(analysis['high_vulnerabilities'])\\\\} high severity vulnerabilities.",
"action": "Plan remediation for high severity vulnerabilities within 1 week."
\\\\})
# Category-specific recommendations
categories = analysis["category_breakdown"]
if categories.get("Access Risk", 0) > 0:
recommendations.append(\\\\{
"priority": "HIGH",
"title": "Review Control de Accesos",
"Descripción": "Control de Acceso vulnerabilities detected.",
"action": "Review and strengthen RBAC policies and autenticación mechanisms."
\\\\})
if categories.get("Information Disclosure", 0) > 0:
recommendations.append(\\\\{
"priority": "MEDIUM",
"title": "Prevent Information Disclosure",
"Descripción": "Information disclosure vulnerabilities found.",
"action": "Review exposed servicios and implement proper Control de Accesos."
\\\\})
if categories.get("Remote Code Execution", 0) > 0:
recommendations.append(\\\\{
"priority": "CRITICAL",
"title": "Address Remote Code Execution Risks",
"Descripción": "Remote code execution vulnerabilities detected.",
"action": "Immediately patch or isolate affected components."
\\\\})
return recommendations
def check_thresholds(self, analysis):
"""Check if vulnerabilities exceed configured thresholds"""
thresholds = self.config.get("thresholds", \\\\{\\\\})
high_severity_count = analysis["severity_breakdown"]["high"] + analysis["severity_breakdown"]["critical"]
medium_severity_count = analysis["severity_breakdown"]["medium"]
total_vulnerabilities = analysis["total_vulnerabilities"]
threshold_violations = []
# Check high severity threshold
high_max = thresholds.get("high_severity_max", 0)
if high_severity_count > high_max:
threshold_violations.append(f"High/Critical severity: \\\\{high_severity_count\\\\} > \\\\{high_max\\\\}")
# Check medium severity threshold
medium_max = thresholds.get("medium_severity_max", 5)
if medium_severity_count > medium_max:
threshold_violations.append(f"Medium severity: \\\\{medium_severity_count\\\\} > \\\\{medium_max\\\\}")
# Check total vulnerabilities threshold
total_max = thresholds.get("total_vulnerabilities_max", 10)
if total_vulnerabilities > total_max:
threshold_violations.append(f"Total vulnerabilities: \\\\{total_vulnerabilities\\\\} > \\\\{total_max\\\\}")
return threshold_violations
def generate_repuerto(self, analysis, scan_results, output_file):
"""Generate comprehensive security repuerto"""
repuerto = \\\\{
"metadata": \\\\{
"generated_at": datetime.now().isoformat(),
"tool": "Kube-hunter",
"assessment_type": "Kubernetes Security Assessment"
\\\\},
"executive_summary": \\\\{
"total_vulnerabilities": analysis["total_vulnerabilities"],
"critical_count": analysis["severity_breakdown"]["critical"],
"high_count": analysis["severity_breakdown"]["high"],
"medium_count": analysis["severity_breakdown"]["medium"],
"low_count": analysis["severity_breakdown"]["low"],
"risk_level": self.calculate_risk_level(analysis)
\\\\},
"detailed_analysis": analysis,
"raw_results": scan_results,
"recommendations": analysis["recommendations"]
\\\\}
# Save repuerto
repuerto_file = output_file.replace('.json', '-repuerto.json')
with open(repuerto_file, 'w') as f:
json.dump(repuerto, f, indent=2)
self.logger.info(f"Comprehensive repuerto generated: \\\\{repuerto_file\\\\}")
return repuerto, repuerto_file
def calculate_risk_level(self, analysis):
"""Calculate overall risk level"""
critical_count = analysis["severity_breakdown"]["critical"]
high_count = analysis["severity_breakdown"]["high"]
medium_count = analysis["severity_breakdown"]["medium"]
if critical_count > 0:
return "CRITICAL"
elif high_count > 3:
return "HIGH"
elif high_count > 0 or medium_count > 5:
return "MEDIUM"
elif medium_count > 0:
return "LOW"
else:
return "MINIMAL"
def send_notifications(self, analysis, repuerto_file, threshold_violations):
"""Send notifications about assessment results"""
# Prepare notification content
subject = "Kube-hunter Security Assessment Results"
if threshold_violations:
subject += " - ALERT"
body = self.generate_notification_body(analysis, threshold_violations)
# Send email notification
if self.config.get("notifications", \\\\{\\\\}).get("email", \\\\{\\\\}).get("enabled", False):
self.send_email_notification(subject, body, repuerto_file)
# Send webhook notification
if self.config.get("notifications", \\\\{\\\\}).get("webhook", \\\\{\\\\}).get("enabled", False):
self.send_webhook_notification(analysis, threshold_violations)
def generate_notification_body(self, analysis, threshold_violations):
"""Generate notification message body"""
body = f"""
Kube-hunter Security Assessment Results
Generated: \\\\{analysis['timestamp']\\\\}
EXECUTIVE SUMMARY:
==================
Total Vulnerabilities: \\\\{analysis['total_vulnerabilities']\\\\}
Critical: \\\\{analysis['severity_breakdown']['critical']\\\\}
High: \\\\{analysis['severity_breakdown']['high']\\\\}
Medium: \\\\{analysis['severity_breakdown']['medium']\\\\}
Low: \\\\{analysis['severity_breakdown']['low']\\\\}
"""
if threshold_violations:
body += "⚠️ THRESHOLD VIOLATIONS:\n"
for violation in threshold_violations:
body += f" - \\\\{violation\\\\}\n"
body += "\n"
# Add critical vulnerabilities
if analysis["critical_vulnerabilities"]:
body += "CRITICAL VULNERABILITIES:\n"
body += "========================\n"
for vuln in analysis["critical_vulnerabilities"][:3]:
body += f"- \\\\{vuln.get('vulnerabilidad', 'Unknown')\\\\}\n"
body += f" Descripción: \\\\{vuln.get('Descripción', 'No Descripción')\\\\}\n"
body += f" Evidence: \\\\{vuln.get('evidence', 'No evidence')\\\\}\n\n"
# Add recommendations
if analysis["recommendations"]:
body += "TOP RECOMMENDATIONS:\n"
body += "===================\n"
for rec in analysis["recommendations"][:3]:
body += f"- [\\\\{rec['priority']\\\\}] \\\\{rec['title']\\\\}\n"
body += f" \\\\{rec['action']\\\\}\n\n"
return body
def send_email_notification(self, subject, body, attachment_file):
"""Send email notification"""
email_config = self.config.get("notifications", \\\\{\\\\}).get("email", \\\\{\\\\})
try:
msg = MIMEMultipart()
msg['From'] = email_config["from"]
msg['To'] = email_config["to"]
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# Attach repuerto file
if attachment_file and os.path.exists(attachment_file):
with open(attachment_file, 'r') as f:
attachment = MIMEText(f.read())
attachment.add_header('Content-Disposition', f'attachment; filename="\\\\{os.path.basename(attachment_file)\\\\}"')
msg.attach(attachment)
server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_puerto"])
server.starttls()
if email_config.get("nombre de usuario") and email_config.get("contraseña"):
server.login(email_config["nombre de usuario"], email_config["contraseña"])
text = msg.as_string()
server.sendmail(email_config["from"], email_config["to"], text)
server.quit()
self.logger.info("Email notification sent successfully")
except Exception as e:
self.logger.error(f"Failed to send email notification: \\\\{e\\\\}")
def send_webhook_notification(self, analysis, threshold_violations):
"""Send webhook notification"""
webhook_config = self.config.get("notifications", \\\\{\\\\}).get("webhook", \\\\{\\\\})
try:
impuerto requests
payload = \\\\{
"timestamp": analysis["timestamp"],
"alert": bool(threshold_violations),
"threshold_violations": threshold_violations,
"summary": \\\\{
"total_vulnerabilities": analysis["total_vulnerabilities"],
"critical": analysis["severity_breakdown"]["critical"],
"high": analysis["severity_breakdown"]["high"],
"medium": analysis["severity_breakdown"]["medium"]
\\\\},
"recommendations": analysis["recommendations"][:3]
\\\\}
headers = webhook_config.get("headers", \\\\{\\\\})
headers["Content-Type"] = "application/json"
response = requests.post(
webhook_config["url"],
json=payload,
headers=headers,
timeout=30
)
if response.status_code == 200:
self.logger.info("Webhook notification sent successfully")
else:
self.logger.error(f"Webhook notification failed: \\\\{response.status_code\\\\}")
except Exception as e:
self.logger.error(f"Failed to send webhook notification: \\\\{e\\\\}")
def run_comprehensive_assessment(self, modes=None, objetivos=None):
"""Run comprehensive security assessment"""
if not modes:
modes = self.config.get("scanning", \\\\{\\\\}).get("modes", ["internal"])
if not objetivos:
objetivos = self.config.get("scanning", \\\\{\\\\}).get("objetivos", [])
self.logger.info("Starting comprehensive Kube-hunter security assessment")
all_results = []
all_analyses = []
# Run scans for each mode
for mode in modes:
if mode in ["remote", "network"] and objetivos:
for objetivo in objetivos:
scan_results, output_file = self.run_kube_hunter(mode, objetivo)
if scan_results:
analysis = self.analyze_vulnerabilities(scan_results)
if analysis:
all_results.append((scan_results, output_file))
all_analyses.append(analysis)
else:
scan_results, output_file = self.run_kube_hunter(mode)
if scan_results:
analysis = self.analyze_vulnerabilities(scan_results)
if analysis:
all_results.append((scan_results, output_file))
all_analyses.append(analysis)
if not all_analyses:
self.logger.error("No successful scans completed")
return False
# Combine analyses
combined_analysis = self.combine_analyses(all_analyses)
# Check thresholds
threshold_violations = self.check_thresholds(combined_analysis)
# Generate comprehensive repuerto
combined_results = \\\\{"scans": [result[0] for result in all_results]\\\\}
repuerto, repuerto_file = self.generate_repuerto(combined_analysis, combined_results, "combined-assessment.json")
# Send notifications
self.send_notifications(combined_analysis, repuerto_file, threshold_violations)
# Log summary
self.logger.info(f"Assessment completed. Total vulnerabilities: \\\\{combined_analysis['total_vulnerabilities']\\\\}")
if threshold_violations:
self.logger.warning(f"Threshold violations: \\\\{threshold_violations\\\\}")
return True
def combine_analyses(self, analyses):
"""Combine multiple scan analyses"""
combined = \\\\{
"timestamp": datetime.now().isoformat(),
"total_vulnerabilities": 0,
"severity_breakdown": \\\\{
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
\\\\},
"category_breakdown": \\\\{\\\\},
"critical_vulnerabilities": [],
"high_vulnerabilities": [],
"recommendations": []
\\\\}
# Combine all analyses
for analysis in analyses:
combined["total_vulnerabilities"] += analysis["total_vulnerabilities"]
# Combine severity counts
for severity, count in analysis["severity_breakdown"].items():
combined["severity_breakdown"][severity] += count
# Combine category counts
for category, count in analysis["category_breakdown"].items():
if category not in combined["category_breakdown"]:
combined["category_breakdown"][category] = 0
combined["category_breakdown"][category] += count
# Combine vulnerabilities
combined["critical_vulnerabilities"].extend(analysis["critical_vulnerabilities"])
combined["high_vulnerabilities"].extend(analysis["high_vulnerabilities"])
# Generate combined recommendations
combined["recommendations"] = self.generate_recommendations(combined)
return combined
# uso
if __name__ == "__main__":
parser = argparse.ArgumentParser(Descripción="Comprehensive Kube-hunter Security Assessment")
parser.add_argument("--config", help="configuración file path")
parser.add_argument("--modes", nargs="+", choices=["internal", "remote", "pod", "network"],
default=["internal"], help="Scanning modes")
parser.add_argument("--objetivos", nargs="+", help="objetivos for remote/escaneo de red")
args = parser.parse_args()
assessment = KubeHunterAssessment(args.config)
success = assessment.run_comprehensive_assessment(args.modes, args.objetivos)
sys.exit(0 if success else 1)
CI/CD Integration Script
#!/usr/bin/env python3
# Kube-hunter CI/CD integration
impuerto subproceso
impuerto json
impuerto sys
impuerto os
class KubeHunterCICD:
def __init__(self, fail_on_high=True, fail_on_medium=False):
self.fail_on_high = fail_on_high
self.fail_on_medium = fail_on_medium
def run_security_scan(self, mode="internal", objetivo=None):
"""Run Kube-hunter security scan in CI/CD"""
cmd = ["kube-hunter", "--repuerto", "json"]
if mode == "remote" and objetivo:
cmd.extend(["--remote", objetivo])
elif mode == "internal":
cmd.append("--internal")
elif mode == "pod":
cmd.append("--pod")
try:
result = subproceso.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
return json.loads(result.stdout)
else:
print(f"Kube-hunter scan failed: \\\\{result.stderr\\\\}")
return None
except Exception as e:
print(f"Error running Kube-hunter: \\\\{e\\\\}")
return None
def evaluate_results(self, scan_results):
"""Evaluate scan results for CI/CD pipeline"""
if not scan_results:
return False, "Scan failed"
vulnerabilities = scan_results.get("vulnerabilities", [])
high_count = len([v for v in vulnerabilities if v.get("severity") == "high"])
medium_count = len([v for v in vulnerabilities if v.get("severity") == "medium"])
critical_count = len([v for v in vulnerabilities if v.get("severity") == "critical"])
# Check failure conditions
if critical_count > 0:
return False, f"Critical vulnerabilities found: \\\\{critical_count\\\\}"
if self.fail_on_high and high_count > 0:
return False, f"High severity vulnerabilities found: \\\\{high_count\\\\}"
if self.fail_on_medium and medium_count > 0:
return False, f"Medium severity vulnerabilities found: \\\\{medium_count\\\\}"
return True, f"Security scan passed. Vulnerabilities: \\\\{len(vulnerabilities)\\\\}"
def generate_ci_repuerto(self, scan_results, output_file="kube-hunter-ci-repuerto.json"):
"""Generate CI/CD friendly repuerto"""
if not scan_results:
return None
vulnerabilities = scan_results.get("vulnerabilities", [])
repuerto = \\\\{
"summary": \\\\{
"total_vulnerabilities": len(vulnerabilities),
"critical": len([v for v in vulnerabilities if v.get("severity") == "critical"]),
"high": len([v for v in vulnerabilities if v.get("severity") == "high"]),
"medium": len([v for v in vulnerabilities if v.get("severity") == "medium"]),
"low": len([v for v in vulnerabilities if v.get("severity") == "low"])
\\\\},
"vulnerabilities": vulnerabilities,
"scan_metadata": scan_results.get("metadata", \\\\{\\\\})
\\\\}
with open(output_file, 'w') as f:
json.dump(repuerto, f, indent=2)
return repuerto
# uso in CI/CD
if __name__ == "__main__":
cicd = KubeHunterCICD(fail_on_high=True, fail_on_medium=False)
# Run scan
results = cicd.run_security_scan("internal")
# Evaluate results
passed, message = cicd.evaluate_results(results)
# Generate repuerto
cicd.generate_ci_repuerto(results)
print(message)
sys.exit(0 if passed else 1)
Integration ejemplos
Prometheus Monitoring
# kube-hunter-expuertoer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kube-hunter-expuertoer
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: kube-hunter-expuertoer
template:
metadata:
labels:
app: kube-hunter-expuertoer
spec:
servicioAccountName: kube-hunter-expuertoer
containers:
- name: kube-hunter-expuertoer
image: custom/kube-hunter-expuertoer:latest
puertos:
- containerpuerto: 8080
env:
- name: SCAN_INTERVAL
value: "3600" # 1 hour
- name: METRICS_puerto
value: "8080"
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
---
apiVersion: v1
kind: servicio
metadata:
name: kube-hunter-expuertoer
namespace: monitoring
labels:
app: kube-hunter-expuertoer
spec:
puertos:
- puerto: 8080
objetivopuerto: 8080
name: metrics
selector:
app: kube-hunter-expuertoer
---
apiVersion: monitoring.coreos.com/v1
kind: servicioMonitor
metadata:
name: kube-hunter-expuertoer
namespace: monitoring
spec:
selector:
matchLabels:
app: kube-hunter-expuertoer
endpoints:
- puerto: metrics
interval: 60s
path: /metrics
Grafana Dashboard
\\\\{
"dashboard": \\\\{
"title": "Kube-hunter Security Dashboard",
"panels": [
\\\\{
"title": "Total Vulnerabilities",
"type": "stat",
"objetivos": [
\\\\{
"expr": "kube_hunter_vulnerabilities_total",
"legendFormat": "Total Vulnerabilities"
\\\\}
]
\\\\},
\\\\{
"title": "Vulnerabilities by Severity",
"type": "piechart",
"objetivos": [
\\\\{
"expr": "kube_hunter_vulnerabilities_by_severity",
"legendFormat": "\\\\{\\\\{severity\\\\}\\\\}"
\\\\}
]
\\\\},
\\\\{
"title": "Security Scan History",
"type": "graph",
"objetivos": [
\\\\{
"expr": "kube_hunter_scan_duration_seconds",
"legendFormat": "Scan Duration"
\\\\}
]
\\\\}
]
\\\\}
\\\\}
solución de problemas
Common Issues
Permission Problems:
# Check Kubernetes access
kubectl auth can-i get pods
kubectl auth can-i list servicios
# Verify servicio account permissions
kubectl describe servicioaccount kube-hunter -n security
kubectl describe clusterrolebinding kube-hunter
# Check pod security policies
kubectl get psp
kubectl describe psp restricted
# Test network connectivity
kubectl run test-pod --image=busybox --rm -it -- nslookup kubernetes.default.svc.cluster.local
Network Connectivity Issues:
# Check cluster networking
kubectl get nodes -o wide
kubectl get servicios -A
# Test API server connectivity
curl -k https://kubernetes.default.svc.cluster.local:443/version
# Check DNS resolution
nslookup kubernetes.default.svc.cluster.local
# Verify network policies
kubectl get networkpolicies -A
kubectl describe networkpolicy -n kube-system
Scanning Problems:
# Run with debug logging
kube-hunter --internal --log DEBUG
# Check for timeouts
kube-hunter --internal --timeout 60
# Test specific components
kube-hunter --internal --servicios api-server
# Verify output format
kube-hunter --internal --repuerto json|jq '.'
# Check for missing dependencies
pip list|grep kube-hunter
pip install --upgrade kube-hunter
Performance Optimization
Optimizing Kube-hunter performance:
# Reduce scan scope
kube-hunter --internal --quick
# Limit hilo count
kube-hunter --internal --hilos 10
# Use specific interface
kube-hunter --internal --interface eth0
# Skip slow checks
kube-hunter --internal --skip-slow-checks
# Optimize for CI/CD
kube-hunter --pod --repuerto json --timeout 120
Security Considerations
Safe uso Practices
Scanning Ethics: - Only scan clusters you own or have explicit permission to test - Coordinate with operations teams before running scans - Use read-only scanning modes when possible - Implement proper Control de Accesos for scan results - Regular updates of Kube-hunter and its dependencies
Data Protection: - Encrypt sensitive scan results and repuertos - Implement secure storage for vulnerabilidad data - Use secure channels for transmitting repuertos - Regular cleanup of temporary files and logs - Implement data retention policies for compliance
Operational Security
Monitoring and Alerting: - Monitor Kube-hunter execution and results - Set up alerting for critical security findings - Track vulnerabilidad trends over time - Implement automated remediation workflows - Regular review of security configuracións
Integration Security: - Secure CI/CD pipeline integration - Protect servicio account tokens and credenciales - Implement proper RBAC for Kubernetes deployments - Monitor for unauthorized Kube-hunter uso - Regular security assessment of scanning infrastructure