Boofuzz Cheat Sheet
Überblick
Boofuzz ist ein modernes, leistungsfähiges und erweiterbares Fuzzing-Framework, das für Netzwerkprotokolltests und Schwachstellenentdeckungen konzipiert ist. Es ist eine Gabel und Nachfolger des Sulley Fuzzing Frameworks und bietet erweiterte Fähigkeiten für Protokoll-Fuzzing, Crash-Erkennung und Testfallmanagement. Boofuzz zeichnet sich durch intelligente Mutation-basierte Tests durch Sicherheitslücken in Netzwerkprotokollen, Dateiformaten und Anwendungen aus.
RECHT *Key Features: Protokoll-Aware Fuzzing, intelligente Mutationsalgorithmen, Crash-Erkennung und -Überwachung, Session-Management, Web-Schnittstelle zur Überwachung, erweiterte Architektur und umfassende Protokollierung.
Installation und Inbetriebnahme
Python Installation
```bash
Install via pip (recommended)
pip install boofuzz
Install with optional dependencies
pip install boofuzz[dev]
Install from source
git clone https://github.com/jtpereyda/boofuzz.git cd boofuzz pip install -e .
Verify installation
python -c "import boofuzz; print(boofuzz.version)"
Install additional dependencies for advanced features
pip install pyserial # For serial communication pip install paramiko # For SSH monitoring pip install psutil # For process monitoring ```_
Virtual Environment Setup
```bash
Create virtual environment
python3 -m venv boofuzz-env source boofuzz-env/bin/activate
Install boofuzz and dependencies
pip install --upgrade pip pip install boofuzz pip install boofuzz[dev]
Install additional tools
pip install scapy wireshark-python
Create activation script
cat > activate_boofuzz.sh << 'EOF'
!/bin/bash
source boofuzz-env/bin/activate echo "Boofuzz environment activated" echo "Version: $(python -c 'import boofuzz; print(boofuzz.version)')" EOF
chmod +x activate_boofuzz.sh ```_
Docker Installation
```bash
Create Dockerfile
cat > Dockerfile << 'EOF' FROM python:3.9-slim
RUN apt-get update && apt-get install -y \ gcc \ git \ tcpdump \ netcat \ && rm -rf /var/lib/apt/lists/*
WORKDIR /app
RUN pip install boofuzz boofuzz[dev] scapy
COPY . .
EXPOSE 26001
CMD ["python", "-m", "boofuzz.web.app"] EOF
Build Docker image
docker build -t boofuzz .
Run Boofuzz in Docker
docker run -it --rm -p 26001:26001 boofuzz
Run with network access
docker run -it --rm --network host boofuzz python your_fuzzer.py
Create alias for easier usage
echo 'alias boofuzz-docker="docker run -it --rm --network host -v $(pwd):/app boofuzz"' >> ~/.bashrc source ~/.bashrc ```_
Entwicklung Umwelt Setup
```bash
Clone development repository
git clone https://github.com/jtpereyda/boofuzz.git cd boofuzz
Install in development mode
pip install -e .[dev]
Install pre-commit hooks
pre-commit install
Run tests
python -m pytest
Install additional development tools
pip install black flake8 mypy sphinx
Create development configuration
cat > dev_config.py << 'EOF' import boofuzz
Development configuration
BOOFUZZ_CONFIG = { 'web_port': 26001, 'log_level': 'DEBUG', 'crash_threshold': 3, 'restart_sleep_time': 5, 'max_mutations': 1000, 'receive_data_after_each_request': True, 'receive_data_after_fuzz': True }
Export configuration
boofuzz.helpers.update_config(BOOFUZZ_CONFIG) EOF ```_
Konfiguration und Abhängigkeiten
```bash
Install system dependencies (Ubuntu/Debian)
sudo apt-get update sudo apt-get install -y python3-dev gcc tcpdump wireshark-common
Install system dependencies (CentOS/RHEL)
sudo yum install -y python3-devel gcc tcpdump wireshark
Install system dependencies (macOS)
brew install python tcpdump wireshark
Configure permissions for packet capture
sudo setcap cap_net_raw+ep $(which tcpdump) sudo usermod -a -G wireshark $USER
Create boofuzz configuration directory
mkdir -p ~/.boofuzz
Create default configuration
cat > ~/.boofuzz/config.json << 'EOF' { "default_session": { "target_ip": "127.0.0.1", "target_port": 80, "receive_data_after_each_request": true, "receive_data_after_fuzz": true, "check_data_received_each_request": false, "restart_sleep_time": 5, "crash_threshold": 3 }, "logging": { "level": "INFO", "console_level": "INFO", "file_level": "DEBUG", "log_directory": "~/.boofuzz/logs" }, "web_interface": { "enabled": true, "host": "0.0.0.0", "port": 26001 }, "monitoring": { "process_monitor_enabled": true, "network_monitor_enabled": true, "crash_detection_enabled": true } } EOF
Set environment variables
export BOOFUZZ_CONFIG=~/.boofuzz/config.json export BOOFUZZ_LOG_DIR=~/.boofuzz/logs ```_
Grundlegende Fuzzing-Konzepte und Nutzung
Einfaches HTTP Fuzzing Beispiel
```python
!/usr/bin/env python3
Basic HTTP fuzzing with Boofuzz
import boofuzz from boofuzz import *
def main(): # Create a session session = Session( target=Target( connection=SocketConnection("127.0.0.1", 80, proto='tcp') ), # Optional: Add process monitor # monitors=[ProcessMonitor("127.0.0.1", 26002)] )
# Define HTTP request structure
s_initialize("HTTP_GET")
# HTTP method
s_string("GET", name="method")
s_delim(" ")
# URL path (this will be fuzzed)
s_string("/", name="path", fuzzable=True)
s_string("index.html", name="file", fuzzable=True)
s_delim(" ")
# HTTP version
s_string("HTTP/1.1", name="version")
s_static("\r\n")
# Host header
s_string("Host: ", name="host_header")
s_string("localhost", name="host_value", fuzzable=True)
s_static("\r\n")
# User-Agent header
s_string("User-Agent: ", name="ua_header")
s_string("Boofuzz/1.0", name="ua_value", fuzzable=True)
s_static("\r\n")
# Additional headers (fuzzable)
if s_block_start("custom_headers"):
s_string("X-Custom-Header: ", name="custom_header_name", fuzzable=True)
s_string("CustomValue", name="custom_header_value", fuzzable=True)
s_static("\r\n")
s_block_end()
# End of headers
s_static("\r\n")
# Connect the request to the session
session.connect(s_get("HTTP_GET"))
# Start fuzzing
print("Starting HTTP fuzzing session...")
session.fuzz()
if name == "main": main() ```_
TCP Protokoll Fuzzing
```python
!/usr/bin/env python3
TCP protocol fuzzing example
import boofuzz from boofuzz import * import time
def create_tcp_fuzzer(target_ip, target_port, protocol_name="TCP_PROTOCOL"): """Create a generic TCP protocol fuzzer"""
# Create session with monitoring
session = Session(
target=Target(
connection=SocketConnection(target_ip, target_port, proto='tcp')
),
monitors=[
# Process monitor (if target is local)
# ProcessMonitor(target_ip, 26002),
# Network monitor
NetworkMonitor(
host=target_ip,
start_commands=["tcpdump -i any -w capture.pcap"],
stop_commands=["pkill tcpdump"]
)
],
# Session configuration
receive_data_after_each_request=True,
receive_data_after_fuzz=True,
check_data_received_each_request=False,
restart_sleep_time=5,
crash_threshold=3
)
# Define protocol structure
s_initialize(protocol_name)
# Protocol header
if s_block_start("header"):
# Magic bytes or protocol identifier
s_binary("\\x41\\x42\\x43\\x44", name="magic", fuzzable=False)
# Version field
s_byte(0x01, name="version", fuzzable=True)
# Message type
s_byte(0x10, name="msg_type", fuzzable=True)
# Length field (will be calculated)
s_size("payload", length=4, name="length", endian=">")
# Flags
s_word(0x0000, name="flags", fuzzable=True, endian=">")
# Sequence number
s_dword(0x12345678, name="seq_num", fuzzable=True, endian=">")
s_block_end()
# Payload section
if s_block_start("payload"):
# Command field
s_string("COMMAND", name="command", fuzzable=True)
s_delim(" ")
# Parameters
s_string("param1", name="param1", fuzzable=True)
s_delim(" ")
s_string("param2", name="param2", fuzzable=True)
s_delim("\r\n")
# Data section
s_random("\\x00", min_length=0, max_length=1024, name="data")
s_block_end()
# Checksum (calculated over payload)
s_checksum("payload", algorithm="crc32", name="checksum", endian=">")
# End marker
s_static("\\x0D\\x0A")
return session, s_get(protocol_name)
def main(): # Configuration TARGET_IP = "127.0.0.1" TARGET_PORT = 9999
# Create fuzzer
session, request = create_tcp_fuzzer(TARGET_IP, TARGET_PORT)
# Connect request to session
session.connect(request)
# Configure fuzzing parameters
session.fuzz_data_logger.log_level = 1 # Detailed logging
# Start fuzzing
print(f"Starting TCP protocol fuzzing on {TARGET_IP}:{TARGET_PORT}")
print("Web interface available at: http://localhost:26001")
try:
session.fuzz()
except KeyboardInterrupt:
print("\\nFuzzing interrupted by user")
except Exception as e:
print(f"Fuzzing error: {e}")
finally:
print("Fuzzing session completed")
if name == "main": main() ```_
UDP Protokoll Fuzzing
```python
!/usr/bin/env python3
UDP protocol fuzzing example
import boofuzz from boofuzz import * import socket
def create_udp_fuzzer(target_ip, target_port): """Create UDP protocol fuzzer"""
# Create session
session = Session(
target=Target(
connection=UDPSocketConnection(target_ip, target_port)
),
# UDP-specific configuration
receive_data_after_each_request=True,
receive_data_after_fuzz=False, # UDP is connectionless
check_data_received_each_request=False,
restart_sleep_time=1, # Faster for UDP
crash_threshold=5
)
# Define UDP packet structure
s_initialize("UDP_PACKET")
# Simple UDP protocol
if s_block_start("udp_header"):
# Protocol identifier
s_static("UDP")
s_delim(":")
# Message ID
s_word(0x1234, name="msg_id", fuzzable=True, endian=">")
s_delim(":")
# Data length
s_size("udp_data", length=2, name="data_len", endian=">")
s_delim(":")
s_block_end()
# Data payload
if s_block_start("udp_data"):
# Command
s_string("PING", name="command", fuzzable=True)
s_delim("|")
# Timestamp
s_dword(0x12345678, name="timestamp", fuzzable=True, endian=">")
s_delim("|")
# Payload data
s_string("Hello World", name="payload", fuzzable=True)
s_block_end()
return session, s_get("UDP_PACKET")
def test_udp_server(): """Simple UDP server for testing"""
import threading
import socket
def udp_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('127.0.0.1', 9998))
print("UDP test server listening on 127.0.0.1:9998")
try:
while True:
data, addr = sock.recvfrom(1024)
print(f"Received from {addr}: {data}")
# Echo response
response = f"ECHO:{data.decode('utf-8', errors='ignore')}"
sock.sendto(response.encode(), addr)
except Exception as e:
print(f"UDP server error: {e}")
finally:
sock.close()
# Start server in background thread
server_thread = threading.Thread(target=udp_server, daemon=True)
server_thread.start()
return server_thread
def main(): # Start test server (optional) # test_udp_server() # time.sleep(1) # Give server time to start
# Configuration
TARGET_IP = "127.0.0.1"
TARGET_PORT = 9998
# Create fuzzer
session, request = create_udp_fuzzer(TARGET_IP, TARGET_PORT)
# Connect request to session
session.connect(request)
# Start fuzzing
print(f"Starting UDP protocol fuzzing on {TARGET_IP}:{TARGET_PORT}")
try:
session.fuzz()
except KeyboardInterrupt:
print("\\nFuzzing interrupted by user")
except Exception as e:
print(f"Fuzzing error: {e}")
if name == "main": main() ```_
Fortgeschrittene Fuzzing-Techniken
Datei Format Fuzzing
```python
!/usr/bin/env python3
File format fuzzing example
import boofuzz from boofuzz import * import os import tempfile
def create_pdf_fuzzer(): """Create a PDF file format fuzzer"""
s_initialize("PDF_FILE")
# PDF Header
s_static("%PDF-1.4\\n")
# Object 1 - Catalog
if s_block_start("catalog_obj"):
s_static("1 0 obj\\n")
s_static("<<\\n")
s_static("/Type /Catalog\\n")
s_static("/Pages 2 0 R\\n")
# Fuzzable catalog entries
if s_block_start("catalog_entries"):
s_string("/OpenAction", name="open_action_key", fuzzable=True)
s_delim(" ")
s_string("3 0 R", name="open_action_value", fuzzable=True)
s_static("\\n")
s_string("/PageMode", name="page_mode_key", fuzzable=True)
s_delim(" ")
s_string("/UseNone", name="page_mode_value", fuzzable=True)
s_static("\\n")
s_block_end()
s_static(">>\\n")
s_static("endobj\\n\\n")
s_block_end()
# Object 2 - Pages
if s_block_start("pages_obj"):
s_static("2 0 obj\\n")
s_static("<<\\n")
s_static("/Type /Pages\\n")
s_static("/Kids [4 0 R]\\n")
s_static("/Count 1\\n")
s_static(">>\\n")
s_static("endobj\\n\\n")
s_block_end()
# Object 3 - JavaScript Action (fuzzable)
if s_block_start("js_action_obj"):
s_static("3 0 obj\\n")
s_static("<<\\n")
s_static("/Type /Action\\n")
s_static("/S /JavaScript\\n")
s_static("/JS (")
# Fuzzable JavaScript content
s_string("app.alert('Hello World');", name="javascript_code", fuzzable=True)
s_static(")\\n")
s_static(">>\\n")
s_static("endobj\\n\\n")
s_block_end()
# Object 4 - Page
if s_block_start("page_obj"):
s_static("4 0 obj\\n")
s_static("<<\\n")
s_static("/Type /Page\\n")
s_static("/Parent 2 0 R\\n")
s_static("/MediaBox [0 0 612 792]\\n")
s_static("/Contents 5 0 R\\n")
s_static(">>\\n")
s_static("endobj\\n\\n")
s_block_end()
# Object 5 - Content Stream
if s_block_start("content_obj"):
s_static("5 0 obj\\n")
s_static("<<\\n")
s_size("content_stream", length=4, name="content_length")
s_static("\\n")
s_static(">>\\n")
s_static("stream\\n")
# Fuzzable content stream
if s_block_start("content_stream"):
s_static("BT\\n")
s_static("/F1 12 Tf\\n")
s_static("100 700 Td\\n")
s_static("(")
s_string("Hello, World!", name="text_content", fuzzable=True)
s_static(") Tj\\n")
s_static("ET")
s_block_end()
s_static("\\nendstream\\n")
s_static("endobj\\n\\n")
s_block_end()
# Cross-reference table
s_static("xref\\n")
s_static("0 6\\n")
s_static("0000000000 65535 f \\n")
s_static("0000000010 00000 n \\n")
s_static("0000000079 00000 n \\n")
s_static("0000000173 00000 n \\n")
s_static("0000000301 00000 n \\n")
s_static("0000000380 00000 n \\n")
# Trailer
s_static("trailer\\n")
s_static("<<\\n")
s_static("/Size 6\\n")
s_static("/Root 1 0 R\\n")
s_static(">>\\n")
s_static("startxref\\n")
s_static("492\\n")
s_static("%%EOF")
return s_get("PDF_FILE")
def create_image_fuzzer(): """Create an image file format fuzzer (PNG)"""
s_initialize("PNG_FILE")
# PNG signature
s_static("\\x89PNG\\r\\n\\x1a\\n")
# IHDR chunk
if s_block_start("ihdr_chunk"):
# Chunk length
s_dword(13, name="ihdr_length", endian=">")
# Chunk type
s_static("IHDR")
# IHDR data
s_dword(100, name="width", fuzzable=True, endian=">") # Width
s_dword(100, name="height", fuzzable=True, endian=">") # Height
s_byte(8, name="bit_depth", fuzzable=True) # Bit depth
s_byte(2, name="color_type", fuzzable=True) # Color type
s_byte(0, name="compression", fuzzable=True) # Compression
s_byte(0, name="filter", fuzzable=True) # Filter
s_byte(0, name="interlace", fuzzable=True) # Interlace
# CRC (simplified - should be calculated)
s_dword(0x12345678, name="ihdr_crc", fuzzable=True, endian=">")
s_block_end()
# Custom chunk (fuzzable)
if s_block_start("custom_chunk"):
# Chunk length
s_size("custom_data", length=4, name="custom_length", endian=">")
# Chunk type (fuzzable)
s_string("tEXt", name="chunk_type", fuzzable=True)
# Chunk data
if s_block_start("custom_data"):
s_string("Title", name="text_keyword", fuzzable=True)
s_static("\\x00")
s_string("Fuzzed Image", name="text_value", fuzzable=True)
s_block_end()
# CRC
s_dword(0x87654321, name="custom_crc", fuzzable=True, endian=">")
s_block_end()
# IDAT chunk (simplified)
if s_block_start("idat_chunk"):
s_size("idat_data", length=4, name="idat_length", endian=">")
s_static("IDAT")
# Compressed image data (fuzzable)
if s_block_start("idat_data"):
s_random("\\x00", min_length=10, max_length=1000, name="compressed_data")
s_block_end()
s_dword(0xABCDEF00, name="idat_crc", fuzzable=True, endian=">")
s_block_end()
# IEND chunk
s_dword(0, name="iend_length", endian=">")
s_static("IEND")
s_dword(0xAE426082, name="iend_crc", endian=">")
return s_get("PNG_FILE")
def file_format_fuzzer(file_format="pdf", output_dir="fuzzed_files"): """File format fuzzing session"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Create appropriate fuzzer
if file_format.lower() == "pdf":
request = create_pdf_fuzzer()
file_extension = ".pdf"
elif file_format.lower() == "png":
request = create_image_fuzzer()
file_extension = ".png"
else:
raise ValueError(f"Unsupported file format: {file_format}")
# Custom target for file generation
class FileTarget:
def __init__(self, output_dir, file_extension):
self.output_dir = output_dir
self.file_extension = file_extension
self.file_count = 0
def send(self, data):
self.file_count += 1
filename = f"fuzzed_{self.file_count:06d}{self.file_extension}"
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'wb') as f:
f.write(data)
print(f"Generated: {filepath}")
return len(data)
def recv(self, max_bytes):
return b"" # No response expected for file generation
# Create session with custom target
session = Session(
target=Target(
connection=FileTarget(output_dir, file_extension)
),
receive_data_after_each_request=False,
receive_data_after_fuzz=False
)
# Connect request to session
session.connect(request)
return session
def main(): print("File Format Fuzzing Example") print("===========================")
# Create PDF fuzzer
pdf_session = file_format_fuzzer("pdf", "fuzzed_pdfs")
print("Starting PDF fuzzing (generating 10 files)...")
try:
# Limit to 10 test cases for demonstration
pdf_session.fuzz(max_depth=1)
except KeyboardInterrupt:
print("\\nFuzzing interrupted")
print("PDF fuzzing completed")
# Create PNG fuzzer
png_session = file_format_fuzzer("png", "fuzzed_pngs")
print("Starting PNG fuzzing (generating 10 files)...")
try:
png_session.fuzz(max_depth=1)
except KeyboardInterrupt:
print("\\nFuzzing interrupted")
print("PNG fuzzing completed")
if name == "main": main() ```_
Web Application Fuzzing
```python
!/usr/bin/env python3
Advanced web application fuzzing
import boofuzz from boofuzz import * import requests import json
def create_http_post_fuzzer(): """Create HTTP POST request fuzzer"""
s_initialize("HTTP_POST")
# HTTP method and path
s_string("POST", name="method")
s_delim(" ")
s_string("/api/login", name="path", fuzzable=True)
s_delim(" ")
s_string("HTTP/1.1", name="version")
s_static("\\r\\n")
# Headers
s_string("Host: ", name="host_header")
s_string("localhost", name="host_value", fuzzable=True)
s_static("\\r\\n")
s_string("Content-Type: ", name="content_type_header")
s_string("application/json", name="content_type_value", fuzzable=True)
s_static("\\r\\n")
s_string("Content-Length: ", name="content_length_header")
s_size("post_body", length=4, name="content_length", signed=False)
s_static("\\r\\n")
# Custom headers (fuzzable)
if s_block_start("custom_headers"):
s_string("X-Requested-With: ", name="xhr_header")
s_string("XMLHttpRequest", name="xhr_value", fuzzable=True)
s_static("\\r\\n")
s_string("Authorization: ", name="auth_header")
s_string("Bearer token123", name="auth_value", fuzzable=True)
s_static("\\r\\n")
s_block_end()
# End of headers
s_static("\\r\\n")
# POST body (JSON)
if s_block_start("post_body"):
s_static('{"')
s_string("username", name="username_key", fuzzable=True)
s_static('":"')
s_string("admin", name="username_value", fuzzable=True)
s_static('","')
s_string("password", name="password_key", fuzzable=True)
s_static('":"')
s_string("password123", name="password_value", fuzzable=True)
s_static('","')
s_string("remember", name="remember_key", fuzzable=True)
s_static('":')
s_string("true", name="remember_value", fuzzable=True)
s_static('}')
s_block_end()
return s_get("HTTP_POST")
def create_xml_fuzzer(): """Create XML payload fuzzer"""
s_initialize("XML_PAYLOAD")
# XML declaration
s_static('<?xml version="1.0" encoding="UTF-8"?>\\n')
# Root element
s_static('<')
s_string("request", name="root_element", fuzzable=True)
s_static('>\\n')
# User element
s_static(' <')
s_string("user", name="user_element", fuzzable=True)
# User attributes
s_static(' ')
s_string("id", name="user_id_attr", fuzzable=True)
s_static('="')
s_string("123", name="user_id_value", fuzzable=True)
s_static('"')
s_static(' ')
s_string("role", name="user_role_attr", fuzzable=True)
s_static('="')
s_string("admin", name="user_role_value", fuzzable=True)
s_static('"')
s_static('>\\n')
# User data
s_static(' <')
s_string("name", name="name_element", fuzzable=True)
s_static('>')
s_string("John Doe", name="name_value", fuzzable=True)
s_static('</')
s_string("name", name="name_close", fuzzable=False)
s_static('>\\n')
s_static(' <')
s_string("email", name="email_element", fuzzable=True)
s_static('>')
s_string("john@example.com", name="email_value", fuzzable=True)
s_static('</')
s_string("email", name="email_close", fuzzable=False)
s_static('>\\n')
# Custom data section (highly fuzzable)
s_static(' <')
s_string("data", name="data_element", fuzzable=True)
s_static('>\\n')
# CDATA section
s_static(' <![CDATA[')
s_string("Some data here", name="cdata_content", fuzzable=True)
s_static(']]>\\n')
s_static(' </')
s_string("data", name="data_close", fuzzable=False)
s_static('>\\n')
# Close user element
s_static(' </')
s_string("user", name="user_close", fuzzable=False)
s_static('>\\n')
# Close root element
s_static('</')
s_string("request", name="root_close", fuzzable=False)
s_static('>')
return s_get("XML_PAYLOAD")
def create_soap_fuzzer(): """Create SOAP envelope fuzzer"""
s_initialize("SOAP_ENVELOPE")
# SOAP envelope
s_static('<?xml version="1.0" encoding="UTF-8"?>\\n')
s_static('<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">\\n')
# SOAP header (optional, fuzzable)
if s_block_start("soap_header"):
s_static(' <soap:Header>\\n')
# Authentication header
s_static(' <')
s_string("auth", name="auth_header_name", fuzzable=True)
s_static(' xmlns="http://example.com/auth">\\n')
s_static(' <')
s_string("username", name="auth_username_element", fuzzable=True)
s_static('>')
s_string("user123", name="auth_username_value", fuzzable=True)
s_static('</')
s_string("username", name="auth_username_close", fuzzable=False)
s_static('>\\n')
s_static(' <')
s_string("token", name="auth_token_element", fuzzable=True)
s_static('>')
s_string("abc123def456", name="auth_token_value", fuzzable=True)
s_static('</')
s_string("token", name="auth_token_close", fuzzable=False)
s_static('>\\n')
s_static(' </')
s_string("auth", name="auth_header_close", fuzzable=False)
s_static('>\\n')
s_static(' </soap:Header>\\n')
s_block_end()
# SOAP body
s_static(' <soap:Body>\\n')
# Method call
s_static(' <')
s_string("GetUserInfo", name="method_name", fuzzable=True)
s_static(' xmlns="http://example.com/webservice">\\n')
# Parameters
s_static(' <')
s_string("userId", name="param1_name", fuzzable=True)
s_static('>')
s_string("12345", name="param1_value", fuzzable=True)
s_static('</')
s_string("userId", name="param1_close", fuzzable=False)
s_static('>\\n')
s_static(' <')
s_string("includeDetails", name="param2_name", fuzzable=True)
s_static('>')
s_string("true", name="param2_value", fuzzable=True)
s_static('</')
s_string("includeDetails", name="param2_close", fuzzable=False)
s_static('>\\n')
# Complex parameter
s_static(' <')
s_string("options", name="complex_param_name", fuzzable=True)
s_static('>\\n')
s_static(' <')
s_string("format", name="option1_name", fuzzable=True)
s_static('>')
s_string("json", name="option1_value", fuzzable=True)
s_static('</')
s_string("format", name="option1_close", fuzzable=False)
s_static('>\\n')
s_static(' <')
s_string("maxResults", name="option2_name", fuzzable=True)
s_static('>')
s_string("100", name="option2_value", fuzzable=True)
s_static('</')
s_string("maxResults", name="option2_close", fuzzable=False)
s_static('>\\n')
s_static(' </')
s_string("options", name="complex_param_close", fuzzable=False)
s_static('>\\n')
# Close method
s_static(' </')
s_string("GetUserInfo", name="method_close", fuzzable=False)
s_static('>\\n')
s_static(' </soap:Body>\\n')
s_static('</soap:Envelope>')
return s_get("SOAP_ENVELOPE")
def web_application_fuzzer(target_ip, target_port, fuzzer_type="http_post"): """Web application fuzzing session"""
# Create appropriate fuzzer
if fuzzer_type == "http_post":
request = create_http_post_fuzzer()
elif fuzzer_type == "xml":
request = create_xml_fuzzer()
elif fuzzer_type == "soap":
request = create_soap_fuzzer()
else:
raise ValueError(f"Unsupported fuzzer type: {fuzzer_type}")
# Create session
session = Session(
target=Target(
connection=SocketConnection(target_ip, target_port, proto='tcp')
),
receive_data_after_each_request=True,
receive_data_after_fuzz=True,
check_data_received_each_request=True,
restart_sleep_time=2,
crash_threshold=5
)
# Connect request to session
session.connect(request)
return session
def main(): print("Web Application Fuzzing Example") print("===============================")
# Configuration
TARGET_IP = "127.0.0.1"
TARGET_PORT = 8080
# Test different fuzzer types
fuzzer_types = ["http_post", "xml", "soap"]
for fuzzer_type in fuzzer_types:
print(f"\\nStarting {fuzzer_type.upper()} fuzzing...")
try:
session = web_application_fuzzer(TARGET_IP, TARGET_PORT, fuzzer_type)
# Limit test cases for demonstration
session.fuzz(max_depth=1)
except KeyboardInterrupt:
print(f"\\n{fuzzer_type.upper()} fuzzing interrupted")
except Exception as e:
print(f"{fuzzer_type.upper()} fuzzing error: {e}")
print(f"{fuzzer_type.upper()} fuzzing completed")
if name == "main": main() ```_
Überwachung und Crash-Detection
Prozessüberwachung
```python
!/usr/bin/env python3
Advanced process monitoring with Boofuzz
import boofuzz from boofuzz import * import psutil import time import threading
class AdvancedProcessMonitor: """Enhanced process monitor with detailed crash detection"""
def __init__(self, host, port, process_name=None, pid=None):
self.host = host
self.port = port
self.process_name = process_name
self.pid = pid
self.baseline_metrics = {}
self.monitoring = False
self.crash_detected = False
def start_monitoring(self):
"""Start monitoring the target process"""
self.monitoring = True
self.crash_detected = False
# Find process if not specified
if not self.pid and self.process_name:
self.pid = self._find_process_by_name(self.process_name)
if not self.pid:
raise ValueError("Process not found or PID not specified")
# Collect baseline metrics
self._collect_baseline_metrics()
# Start monitoring thread
monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
monitor_thread.start()
print(f"Started monitoring process {self.pid} ({self.process_name})")
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
print("Stopped process monitoring")
def _find_process_by_name(self, name):
"""Find process PID by name"""
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == name:
return proc.info['pid']
return None
def _collect_baseline_metrics(self):
"""Collect baseline performance metrics"""
try:
process = psutil.Process(self.pid)
self.baseline_metrics = {
'cpu_percent': process.cpu_percent(),
'memory_percent': process.memory_percent(),
'memory_info': process.memory_info(),
'num_threads': process.num_threads(),
'num_fds': process.num_fds() if hasattr(process, 'num_fds') else 0,
'connections': len(process.connections()),
'status': process.status()
}
print(f"Baseline metrics collected: {self.baseline_metrics}")
except psutil.NoSuchProcess:
print(f"Process {self.pid} not found for baseline collection")
def _monitor_loop(self):
"""Main monitoring loop"""
consecutive_errors = 0
max_errors = 5
while self.monitoring:
try:
# Check if process still exists
if not psutil.pid_exists(self.pid):
self.crash_detected = True
print(f"CRASH DETECTED: Process {self.pid} no longer exists")
break
process = psutil.Process(self.pid)
# Check process status
status = process.status()
if status in [psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD]:
self.crash_detected = True
print(f"CRASH DETECTED: Process {self.pid} status: {status}")
break
# Monitor resource usage
current_metrics = {
'cpu_percent': process.cpu_percent(),
'memory_percent': process.memory_percent(),
'memory_info': process.memory_info(),
'num_threads': process.num_threads(),
'num_fds': process.num_fds() if hasattr(process, 'num_fds') else 0,
'connections': len(process.connections()),
'status': status
}
# Check for anomalies
self._check_anomalies(current_metrics)
consecutive_errors = 0
except psutil.NoSuchProcess:
self.crash_detected = True
print(f"CRASH DETECTED: Process {self.pid} disappeared")
break
except Exception as e:
consecutive_errors += 1
print(f"Monitoring error: {e}")
if consecutive_errors >= max_errors:
print(f"Too many monitoring errors, stopping")
break
time.sleep(1) # Monitor every second
def _check_anomalies(self, current_metrics):
"""Check for performance anomalies that might indicate issues"""
# Memory leak detection
if current_metrics['memory_percent'] > self.baseline_metrics['memory_percent'] * 2:
print(f"WARNING: Memory usage increased significantly: {current_metrics['memory_percent']:.1f}%")
# Thread explosion detection
if current_metrics['num_threads'] > self.baseline_metrics['num_threads'] * 3:
print(f"WARNING: Thread count increased significantly: {current_metrics['num_threads']}")
# File descriptor leak detection
if current_metrics['num_fds'] > self.baseline_metrics['num_fds'] * 2:
print(f"WARNING: File descriptor count increased: {current_metrics['num_fds']}")
# CPU usage spike detection
if current_metrics['cpu_percent'] > 90:
print(f"WARNING: High CPU usage: {current_metrics['cpu_percent']:.1f}%")
def is_crashed(self):
"""Check if crash was detected"""
return self.crash_detected
def restart_target(self):
"""Restart the target process (placeholder)"""
print(f"Restarting target process...")
# Implementation depends on how the target is started
# This could involve running a script, service restart, etc.
time.sleep(5) # Simulate restart time
print("Target process restarted")
def create_monitored_fuzzing_session(target_ip, target_port, process_name=None, pid=None): """Create fuzzing session with advanced monitoring"""
# Create process monitor
process_monitor = AdvancedProcessMonitor(target_ip, 26002, process_name, pid)
# Create network monitor
network_monitor = NetworkMonitor(
host=target_ip,
start_commands=[
f"tcpdump -i any -w fuzzing_capture_{int(time.time())}.pcap host {target_ip}"
],
stop_commands=["pkill tcpdump"]
)
# Create session with monitors
session = Session(
target=Target(
connection=SocketConnection(target_ip, target_port, proto='tcp')
),
monitors=[process_monitor, network_monitor],
receive_data_after_each_request=True,
receive_data_after_fuzz=True,
restart_sleep_time=10,
crash_threshold=3
)
return session, process_monitor
def main(): print("Advanced Monitoring Example") print("==========================")
# Configuration
TARGET_IP = "127.0.0.1"
TARGET_PORT = 9999
PROCESS_NAME = "target_app" # Replace with actual process name
# Create simple protocol fuzzer
s_initialize("MONITORED_PROTOCOL")
s_string("HELLO", name="greeting", fuzzable=True)
s_delim(" ")
s_string("WORLD", name="target", fuzzable=True)
s_static("\\r\\n")
request = s_get("MONITORED_PROTOCOL")
# Create monitored session
session, process_monitor = create_monitored_fuzzing_session(
TARGET_IP, TARGET_PORT, PROCESS_NAME
)
# Connect request to session
session.connect(request)
# Start monitoring
process_monitor.start_monitoring()
try:
print(f"Starting monitored fuzzing session...")
print(f"Web interface: http://localhost:26001")
# Start fuzzing
session.fuzz()
except KeyboardInterrupt:
print("\\nFuzzing interrupted by user")
except Exception as e:
print(f"Fuzzing error: {e}")
finally:
# Stop monitoring
process_monitor.stop_monitoring()
print("Monitoring stopped")
if name == "main": main() ```_
Benutzerdefinierte Crash-Detektion
```python
!/usr/bin/env python3
Custom crash detection mechanisms
import boofuzz from boofuzz import * import subprocess import re import time import os
class CustomCrashDetector: """Custom crash detection with multiple detection methods"""
def __init__(self, target_ip, target_port):
self.target_ip = target_ip
self.target_port = target_port
self.crash_indicators = []
def add_crash_indicator(self, indicator_type, pattern, description=""):
"""Add a crash detection indicator"""
self.crash_indicators.append({
'type': indicator_type,
'pattern': pattern,
'description': description
})
def check_for_crashes(self, response_data=None, log_data=None):
"""Check for crashes using multiple methods"""
crashes_detected = []
# Method 1: Network connectivity check
if not self._check_network_connectivity():
crashes_detected.append({
'type': 'network_failure',
'description': 'Target not responding to network requests'
})
# Method 2: Response data analysis
if response_data:
response_crashes = self._analyze_response_data(response_data)
crashes_detected.extend(response_crashes)
# Method 3: Log file analysis
if log_data:
log_crashes = self._analyze_log_data(log_data)
crashes_detected.extend(log_crashes)
# Method 4: System-level checks
system_crashes = self._check_system_indicators()
crashes_detected.extend(system_crashes)
return crashes_detected
def _check_network_connectivity(self):
"""Check if target is still responding"""
try:
# Try to connect to target
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((self.target_ip, self.target_port))
sock.close()
return result == 0
except:
return False
def _analyze_response_data(self, response_data):
"""Analyze response data for crash indicators"""
crashes = []
for indicator in self.crash_indicators:
if indicator['type'] == 'response_pattern':
if re.search(indicator['pattern'], response_data, re.IGNORECASE):
crashes.append({
'type': 'response_pattern_match',
'pattern': indicator['pattern'],
'description': indicator['description']
})
# Common crash patterns
common_patterns = [
(r'segmentation fault', 'Segmentation fault detected'),
(r'access violation', 'Access violation detected'),
(r'stack overflow', 'Stack overflow detected'),
(r'heap corruption', 'Heap corruption detected'),
(r'assertion failed', 'Assertion failure detected'),
(r'unhandled exception', 'Unhandled exception detected'),
(r'fatal error', 'Fatal error detected'),
(r'core dumped', 'Core dump detected')
]
for pattern, description in common_patterns:
if re.search(pattern, response_data, re.IGNORECASE):
crashes.append({
'type': 'common_crash_pattern',
'pattern': pattern,
'description': description
})
return crashes
def _analyze_log_data(self, log_data):
"""Analyze log data for crash indicators"""
crashes = []
# Look for crash indicators in logs
log_patterns = [
(r'\\[ERROR\\].*crash', 'Error log indicates crash'),
(r'\\[FATAL\\]', 'Fatal error in logs'),
(r'exception.*stack trace', 'Exception with stack trace'),
(r'signal \\d+', 'Signal received (potential crash)'),
(r'terminated unexpectedly', 'Unexpected termination'),
(r'memory.*corrupt', 'Memory corruption detected'),
(r'buffer overflow', 'Buffer overflow detected')
]
for pattern, description in log_patterns:
if re.search(pattern, log_data, re.IGNORECASE):
crashes.append({
'type': 'log_pattern_match',
'pattern': pattern,
'description': description
})
return crashes
def _check_system_indicators(self):
"""Check system-level crash indicators"""
crashes = []
# Check for core dumps
core_files = self._find_core_dumps()
if core_files:
crashes.append({
'type': 'core_dump',
'description': f'Core dump files found: {core_files}'
})
# Check system logs for crashes
system_crashes = self._check_system_logs()
crashes.extend(system_crashes)
return crashes
def _find_core_dumps(self):
"""Find recent core dump files"""
core_files = []
search_paths = ['/tmp', '/var/crash', '.']
for path in search_paths:
try:
for file in os.listdir(path):
if file.startswith('core') or file.endswith('.core'):
filepath = os.path.join(path, file)
# Check if file is recent (within last hour)
if time.time() - os.path.getmtime(filepath) < 3600:
core_files.append(filepath)
except:
continue
return core_files
def _check_system_logs(self):
"""Check system logs for crash indicators"""
crashes = []
try:
# Check dmesg for kernel messages
result = subprocess.run(['dmesg', '-T'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
dmesg_output = result.stdout
# Look for crash-related kernel messages
crash_patterns = [
r'segfault',
r'general protection fault',
r'kernel BUG',
r'Oops:',
r'Call Trace:'
]
for pattern in crash_patterns:
if re.search(pattern, dmesg_output, re.IGNORECASE):
crashes.append({
'type': 'kernel_message',
'pattern': pattern,
'description': f'Kernel crash indicator: {pattern}'
})
except:
pass
return crashes
class CrashAwareFuzzingSession: """Fuzzing session with enhanced crash detection"""
def __init__(self, target_ip, target_port):
self.target_ip = target_ip
self.target_port = target_port
self.crash_detector = CustomCrashDetector(target_ip, target_port)
self.crash_log = []
# Setup crash detection patterns
self._setup_crash_patterns()
def _setup_crash_patterns(self):
"""Setup application-specific crash patterns"""
# Add custom crash indicators
self.crash_detector.add_crash_indicator(
'response_pattern',
r'HTTP/1\\.1 500',
'HTTP 500 Internal Server Error'
)
self.crash_detector.add_crash_indicator(
'response_pattern',
r'Connection reset by peer',
'Connection reset - possible crash'
)
self.crash_detector.add_crash_indicator(
'response_pattern',
r'timeout',
'Request timeout - possible hang'
)
def create_session(self):
"""Create Boofuzz session with crash detection"""
# Custom target with crash detection
class CrashDetectingTarget:
def __init__(self, ip, port, crash_detector):
self.ip = ip
self.port = port
self.crash_detector = crash_detector
self.socket = None
def send(self, data):
try:
import socket
if not self.socket:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.ip, self.port))
self.socket.send(data)
return len(data)
except Exception as e:
# Check for crashes when send fails
crashes = self.crash_detector.check_for_crashes()
if crashes:
print(f"Crashes detected during send: {crashes}")
raise e
def recv(self, max_bytes):
try:
if self.socket:
data = self.socket.recv(max_bytes)
# Analyze response for crash indicators
crashes = self.crash_detector.check_for_crashes(
response_data=data.decode('utf-8', errors='ignore')
)
if crashes:
print(f"Crashes detected in response: {crashes}")
return data
return b""
except Exception as e:
# Check for crashes when recv fails
crashes = self.crash_detector.check_for_crashes()
if crashes:
print(f"Crashes detected during recv: {crashes}")
return b""
# Create session
session = Session(
target=Target(
connection=CrashDetectingTarget(self.target_ip, self.target_port, self.crash_detector)
),
receive_data_after_each_request=True,
receive_data_after_fuzz=True,
restart_sleep_time=10,
crash_threshold=1 # Detect crashes immediately
)
return session
def log_crash(self, crash_info, test_case_data):
"""Log crash information"""
crash_entry = {
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'crash_info': crash_info,
'test_case': test_case_data,
'target': f"{self.target_ip}:{self.target_port}"
}
self.crash_log.append(crash_entry)
# Save to file
with open(f"crash_log_{int(time.time())}.json", 'w') as f:
import json
json.dump(crash_entry, f, indent=2)
print(f"Crash logged: {crash_info}")
def main(): print("Custom Crash Detection Example") print("==============================")
# Configuration
TARGET_IP = "127.0.0.1"
TARGET_PORT = 9999
# Create crash-aware fuzzing session
fuzzing_session = CrashAwareFuzzingSession(TARGET_IP, TARGET_PORT)
# Create simple protocol for testing
s_initialize("CRASH_TEST_PROTOCOL")
s_string("TEST", name="command", fuzzable=True)
s_delim(" ")
s_string("DATA", name="data", fuzzable=True)
s_static("\\r\\n")
request = s_get("CRASH_TEST_PROTOCOL")
# Create session
session = fuzzing_session.create_session()
session.connect(request)
try:
print(f"Starting crash-aware fuzzing on {TARGET_IP}:{TARGET_PORT}")
session.fuzz()
except KeyboardInterrupt:
print("\\nFuzzing interrupted")
except Exception as e:
print(f"Fuzzing error: {e}")
print("Crash detection fuzzing completed")
if name == "main": main() ```_
Leistungsoptimierung und Fehlerbehebung
Leistung Tuning
```bash
Boofuzz performance optimization techniques
1. Memory optimization for large fuzzing campaigns
export PYTHONOPTIMIZE=1 export PYTHONDONTWRITEBYTECODE=1
2. Increase system limits for network connections
ulimit -n 65536 # Increase file descriptor limit echo 'net.core.somaxconn = 65536' | sudo tee -a /etc/sysctl.conf echo 'net.ipv4.tcp_max_syn_backlog = 65536' | sudo tee -a /etc/sysctl.conf sudo sysctl -p
3. Optimize Python garbage collection
export PYTHONGC=0 # Disable automatic garbage collection
4. Performance monitoring script
!/bin/bash
monitor_boofuzz_performance() { local output_file="boofuzz_performance_$(date +%s).log"
echo "Monitoring Boofuzz performance..."
echo "Timestamp,CPU%,Memory(MB),Network_Connections,Test_Cases" > "$output_file"
while true; do
if pgrep -f "boofuzz" > /dev/null; then
local pid=$(pgrep -f "boofuzz")
local cpu=$(ps -p $pid -o %cpu --no-headers)
local mem=$(ps -p $pid -o rss --no-headers | awk '{print $1/1024}')
local connections=$(ss -tuln | wc -l)
local timestamp=$(date +%s)
echo "$timestamp,$cpu,$mem,$connections,N/A" >> "$output_file"
fi
sleep 5
done
}
5. Optimize network settings for high-throughput fuzzing
optimize_network_for_fuzzing() { # Increase network buffers echo 'net.core.rmem_max = 134217728' | sudo tee -a /etc/sysctl.conf echo 'net.core.wmem_max = 134217728' | sudo tee -a /etc/sysctl.conf echo 'net.ipv4.tcp_rmem = 4096 87380 134217728' | sudo tee -a /etc/sysctl.conf echo 'net.ipv4.tcp_wmem = 4096 65536 134217728' | sudo tee -a /etc/sysctl.conf
# Optimize connection handling
echo 'net.ipv4.tcp_fin_timeout = 15' | sudo tee -a /etc/sysctl.conf
echo 'net.ipv4.tcp_tw_reuse = 1' | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
echo "Network optimizations applied"
}
6. Memory-efficient fuzzing configuration
create_efficient_config() { cat > efficient_boofuzz_config.py << 'EOF' import boofuzz
Memory-efficient configuration
EFFICIENT_CONFIG = { # Reduce memory usage 'receive_data_after_each_request': False, 'receive_data_after_fuzz': False, 'check_data_received_each_request': False,
# Optimize restart behavior
'restart_sleep_time': 1,
'crash_threshold': 1,
# Limit concurrent connections
'max_connections': 10,
# Reduce logging overhead
'log_level': 'WARNING',
'console_level': 'ERROR'
}
def apply_efficient_config(session): for key, value in EFFICIENT_CONFIG.items(): if hasattr(session, key): setattr(session, key, value) return session EOF }
Run optimizations
optimize_network_for_fuzzing create_efficient_config ```_
Probleme bei der Fehlerbehebung
```python
!/usr/bin/env python3
Boofuzz troubleshooting utilities
import boofuzz import sys import subprocess import socket import time import psutil
class BoofuzzTroubleshooter: """Comprehensive troubleshooting for Boofuzz issues"""
def __init__(self):
self.issues_found = []
self.solutions = []
def run_diagnostics(self):
"""Run comprehensive diagnostics"""
print("Boofuzz Troubleshooting Diagnostics")
print("===================================")
# Check Python environment
self._check_python_environment()
# Check Boofuzz installation
self._check_boofuzz_installation()
# Check system resources
self._check_system_resources()
# Check network configuration
self._check_network_configuration()
# Check permissions
self._check_permissions()
# Generate report
self._generate_report()
def _check_python_environment(self):
"""Check Python environment and dependencies"""
print("\\n1. Checking Python Environment...")
# Python version
python_version = sys.version_info
if python_version < (3, 6):
self.issues_found.append("Python version too old")
self.solutions.append("Upgrade to Python 3.6 or newer")
else:
print(f"✓ Python version: {python_version.major}.{python_version.minor}.{python_version.micro}")
# Required modules
required_modules = [
'boofuzz', 'socket', 'threading', 'time', 'struct',
'logging', 'json', 'subprocess', 'psutil'
]
missing_modules = []
for module in required_modules:
try:
__import__(module)
print(f"✓ Module '{module}': Available")
except ImportError:
missing_modules.append(module)
print(f"✗ Module '{module}': Missing")
if missing_modules:
self.issues_found.append(f"Missing modules: {missing_modules}")
self.solutions.append(f"Install missing modules: pip install {' '.join(missing_modules)}")
def _check_boofuzz_installation(self):
"""Check Boofuzz installation and configuration"""
print("\\n2. Checking Boofuzz Installation...")
try:
import boofuzz
print(f"✓ Boofuzz version: {boofuzz.__version__}")
# Test basic functionality
try:
session = boofuzz.Session(
target=boofuzz.Target(
connection=boofuzz.SocketConnection("127.0.0.1", 12345, proto='tcp')
)
)
print("✓ Boofuzz session creation: OK")
except Exception as e:
self.issues_found.append(f"Boofuzz session creation failed: {e}")
self.solutions.append("Check Boofuzz installation: pip install --upgrade boofuzz")
except ImportError:
self.issues_found.append("Boofuzz not installed")
self.solutions.append("Install Boofuzz: pip install boofuzz")
def _check_system_resources(self):
"""Check system resources and limits"""
print("\\n3. Checking System Resources...")
# Memory
memory = psutil.virtual_memory()
if memory.percent > 90:
self.issues_found.append("High memory usage")
self.solutions.append("Free up memory or add more RAM")
else:
print(f"✓ Memory usage: {memory.percent:.1f}%")
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 90:
self.issues_found.append("High CPU usage")
self.solutions.append("Reduce CPU load or upgrade hardware")
else:
print(f"✓ CPU usage: {cpu_percent:.1f}%")
# Disk space
disk = psutil.disk_usage('/')
if disk.percent > 90:
self.issues_found.append("Low disk space")
self.solutions.append("Free up disk space")
else:
print(f"✓ Disk usage: {disk.percent:.1f}%")
# File descriptor limits
try:
import resource
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
if soft_limit < 1024:
self.issues_found.append("Low file descriptor limit")
self.solutions.append("Increase file descriptor limit: ulimit -n 65536")
else:
print(f"✓ File descriptor limit: {soft_limit}")
except:
print("? File descriptor limit: Unable to check")
def _check_network_configuration(self):
"""Check network configuration and connectivity"""
print("\\n4. Checking Network Configuration...")
# Test local connectivity
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
port = sock.getsockname()[1]
sock.close()
print(f"✓ Local socket binding: OK (test port {port})")
except Exception as e:
self.issues_found.append(f"Local socket binding failed: {e}")
self.solutions.append("Check network configuration and firewall settings")
# Test external connectivity
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex(('8.8.8.8', 53))
sock.close()
if result == 0:
print("✓ External connectivity: OK")
else:
self.issues_found.append("No external connectivity")
self.solutions.append("Check internet connection and firewall")
except Exception as e:
self.issues_found.append(f"External connectivity test failed: {e}")
self.solutions.append("Check network configuration")
# Check for port conflicts
common_ports = [26001, 26002, 80, 443, 8080, 9999]
for port in common_ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
if result == 0:
print(f"⚠ Port {port}: In use")
else:
print(f"✓ Port {port}: Available")
except:
print(f"? Port {port}: Unable to check")
def _check_permissions(self):
"""Check file and network permissions"""
print("\\n5. Checking Permissions...")
# Check if running as root (not recommended)
import os
if os.geteuid() == 0:
self.issues_found.append("Running as root")
self.solutions.append("Run as non-root user for security")
else:
print("✓ Running as non-root user")
# Check write permissions in current directory
try:
test_file = "boofuzz_permission_test.tmp"
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
print("✓ Write permissions: OK")
except Exception as e:
self.issues_found.append(f"No write permissions: {e}")
self.solutions.append("Check directory permissions")
# Check if can bind to privileged ports (requires root)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 80))
sock.close()
print("✓ Can bind to privileged ports")
except PermissionError:
print("⚠ Cannot bind to privileged ports (normal for non-root)")
except Exception as e:
print(f"? Privileged port binding: {e}")
def _generate_report(self):
"""Generate troubleshooting report"""
print("\\n" + "="*50)
print("TROUBLESHOOTING REPORT")
print("="*50)
if not self.issues_found:
print("✓ No issues found! Boofuzz should work correctly.")
else:
print(f"Found {len(self.issues_found)} issue(s):")
print()
for i, (issue, solution) in enumerate(zip(self.issues_found, self.solutions), 1):
print(f"{i}. Issue: {issue}")
print(f" Solution: {solution}")
print()
# Additional recommendations
print("GENERAL RECOMMENDATIONS:")
print("- Use virtual environments for Python projects")
print("- Monitor system resources during fuzzing")
print("- Keep Boofuzz updated to the latest version")
print("- Use process monitors for better crash detection")
print("- Configure appropriate timeouts for network operations")
print("- Use the web interface for monitoring fuzzing progress")
def test_basic_functionality(): """Test basic Boofuzz functionality"""
print("\\nTesting Basic Boofuzz Functionality...")
print("=====================================")
try:
# Test session creation
session = boofuzz.Session(
target=boofuzz.Target(
connection=boofuzz.SocketConnection("127.0.0.1", 12345, proto='tcp')
),
receive_data_after_each_request=False,
receive_data_after_fuzz=False
)
print("✓ Session creation: OK")
# Test protocol definition
boofuzz.s_initialize("TEST_PROTOCOL")
boofuzz.s_string("TEST", name="test_string")
boofuzz.s_delim(" ")
boofuzz.s_string("DATA", name="test_data", fuzzable=True)
boofuzz.s_static("\\r\\n")
request = boofuzz.s_get("TEST_PROTOCOL")
print("✓ Protocol definition: OK")
# Test session connection
session.connect(request)
print("✓ Session connection: OK")
print("✓ Basic functionality test passed!")
except Exception as e:
print(f"✗ Basic functionality test failed: {e}")
return False
return True
def fix_common_issues(): """Provide fixes for common Boofuzz issues"""
print("\\nCommon Boofuzz Issues and Fixes")
print("===============================")
fixes = {
"ImportError: No module named 'boofuzz'": [
"pip install boofuzz",
"pip install --upgrade boofuzz",
"Check if you're in the correct virtual environment"
],
"ConnectionRefusedError": [
"Check if target application is running",
"Verify target IP address and port",
"Check firewall settings",
"Ensure target is listening on specified port"
],
"PermissionError: [Errno 13] Permission denied": [
"Don't run as root unless necessary",
"Check file/directory permissions",
"Use non-privileged ports (>1024)",
"Run with appropriate user permissions"
],
"OSError: [Errno 24] Too many open files": [
"Increase file descriptor limit: ulimit -n 65536",
"Add 'ulimit -n 65536' to ~/.bashrc",
"Reduce concurrent connections in fuzzing session",
"Close unused file handles in custom code"
],
"Memory errors during fuzzing": [
"Reduce fuzzing depth and breadth",
"Use receive_data_after_each_request=False",
"Implement custom memory management",
"Monitor memory usage and restart sessions periodically"
],
"Web interface not accessible": [
"Check if port 26001 is available",
"Verify firewall allows access to port 26001",
"Try accessing via http://localhost:26001",
"Check if another Boofuzz instance is running"
],
"Slow fuzzing performance": [
"Reduce restart_sleep_time",
"Disable unnecessary data reception",
"Use faster connection types",
"Optimize target application",
"Use multiple fuzzing instances"
],
"Crashes not detected": [
"Implement custom crash detection",
"Use process monitors",
"Check crash_threshold setting",
"Monitor system logs for crash indicators",
"Implement network connectivity checks"
]
}
for issue, solutions in fixes.items():
print(f"\\nIssue: {issue}")
for i, solution in enumerate(solutions, 1):
print(f" {i}. {solution}")
def main(): """Main troubleshooting function"""
# Run diagnostics
troubleshooter = BoofuzzTroubleshooter()
troubleshooter.run_diagnostics()
# Test basic functionality
test_basic_functionality()
# Show common fixes
fix_common_issues()
if name == "main": main() ```_
Ressourcen und Dokumentation
Offizielle Mittel
- Boofuzz GitHub Repository - Hauptrepository und Quellcode
- Boofuzz Dokumentation - Umfassende Dokumentation und Tutorials
- Boofuzz Beispiele - Beispiel fuzzing Scripts und Protokolle
Gemeinschaftsmittel
- Fuzzing Community - Discord Community for fuzzers
- OWASP Testing Guide - Web Application Security Testing Methoden
- Fuzzing Book - Umfassender Leitfaden für Fuzzing-Techniken
Integrationsbeispiele
- Protocol Fuzzing Beispiele - Various Protocol fuzzing Implementations
- Security Testing Frameworks - Integration mit anderen Sicherheitstools
- Vulnerability Research - Forschungsmethoden und Werkzeuge
- Network Protocol Analysis - Protokollanalyse und Prüfwerkzeuge