RECETTES Feuille de chaleur
Aperçu général
REVEN by Tetrane est une plate-forme avancée d'analyse binaire dynamique et de débogage inverse qui fournit des capacités d'enregistrement et de rejouage de tout le système. Il permet le débogage temps-voyage, permettant aux analystes d'enregistrer l'exécution de programme, puis de naviguer en arrière et en avant à travers le calendrier d'exécution pour comprendre les comportements complexes, trouver des vulnérabilités et analyser les logiciels malveillants.
C'est-à-dire Principales forces: Débogage des voyages dans le temps, enregistrement du système entier, visualisation de la mémoire, traçage de l'exécution, recherche sur la vulnérabilité et capacités avancées d'analyse dynamique.
Installation et configuration
Exigences du système et installation
# System Requirements:
# - Linux (Ubuntu 18.04+ or CentOS 7+)
# - Intel CPU with VT-x support
# - 32GB RAM minimum (64GB+ recommended)
# - 1TB+ SSD storage for recordings
# - NVIDIA GPU (optional, for acceleration)
# REVEN Installation (Enterprise License Required)
# Contact Tetrane for licensing: https://www.tetrane.com/
# Download REVEN installer
wget https://download.tetrane.com/reven-installer.run
# Make installer executable
chmod +x reven-installer.run
# Run installer as root
sudo ./reven-installer.run
# Follow installation wizard
# - Choose installation directory (/opt/reven recommended)
# - Configure license server
# - Set up user permissions
# - Configure storage paths
# Post-installation setup
sudo usermod -a -G reven $USER
sudo systemctl enable reven-server
sudo systemctl start reven-server
# Verify installation
reven --version
reven-server status
Configuration de la licence
# License Server Configuration
# Edit license configuration
sudo nano /opt/reven/etc/license.conf
# Example license configuration:
[license]
server_host = license.company.com
server_port = 27000
license_file = /opt/reven/etc/reven.lic
# Floating license configuration
[floating_license]
enabled = true
server_host = 192.168.1.100
server_port = 27000
checkout_timeout = 3600
# Verify license
reven license status
reven license info
# Test license connectivity
reven license test-connection
```_
### Aménagement de l'environnement
```bash
# REVEN Environment Variables
export REVEN_HOME=/opt/reven
export REVEN_PROJECTS=/data/reven/projects
export REVEN_RECORDINGS=/data/reven/recordings
export PATH=$REVEN_HOME/bin:$PATH
# Add to ~/.bashrc for persistence
echo 'export REVEN_HOME=/opt/reven' >> ~/.bashrc
echo 'export REVEN_PROJECTS=/data/reven/projects' >> ~/.bashrc
echo 'export REVEN_RECORDINGS=/data/reven/recordings' >> ~/.bashrc
echo 'export PATH=$REVEN_HOME/bin:$PATH' >> ~/.bashrc
# Create project directories
mkdir -p $REVEN_PROJECTS
mkdir -p $REVEN_RECORDINGS
sudo chown -R $USER:reven $REVEN_PROJECTS
sudo chown -R $USER:reven $REVEN_RECORDINGS
# Configure storage quotas
sudo setquota -u $USER 100G 120G 0 0 /data
# Set up VM templates directory
mkdir -p $REVEN_HOME/vm-templates
```_
## Enregistrement et rediffusion
### Configuration et configuration de VM
```bash
# Create VM for recording
reven vm create --name "analysis-vm" \
--os windows10 \
--memory 4096 \
--disk 50G \
--iso /path/to/windows10.iso
# Configure VM settings
reven vm configure analysis-vm \
--cpu-count 2 \
--enable-nested-virtualization \
--disable-aslr \
--enable-debug-symbols
# Install VM operating system
reven vm install analysis-vm \
--unattended \
--admin-password "Password123!" \
--timezone "UTC"
# Create VM snapshot for recording
reven vm snapshot analysis-vm \
--name "clean-state" \
--description "Clean Windows 10 installation"
# List available VMs
reven vm list
# VM management commands
reven vm start analysis-vm
reven vm stop analysis-vm
reven vm reset analysis-vm
reven vm delete analysis-vm
Enregistrement de l'exécution
# Start recording session
reven record start \
--vm analysis-vm \
--snapshot clean-state \
--name "malware-analysis-001" \
--duration 300 \
--output-dir $REVEN_RECORDINGS/malware-001
# Recording with specific triggers
reven record start \
--vm analysis-vm \
--snapshot clean-state \
--name "exploit-analysis" \
--trigger-on-process "notepad.exe" \
--trigger-on-module "kernel32.dll" \
--max-size 10G
# Interactive recording session
reven record interactive \
--vm analysis-vm \
--snapshot clean-state \
--name "interactive-session"
# Recording with custom configuration
reven record start \
--vm analysis-vm \
--config recording-config.json \
--name "custom-recording"
# Example recording configuration (recording-config.json)
cat > recording-config.json << 'EOF'
{
"recording": {
"max_duration": 600,
"max_size": "20GB",
"compression": "lz4",
"memory_tracking": true,
"syscall_tracking": true,
"network_tracking": true
},
"triggers": {
"start_triggers": [
{"type": "process", "name": "malware.exe"},
{"type": "file_access", "path": "C:\\Windows\\System32\\"}
],
"stop_triggers": [
{"type": "process_exit", "name": "malware.exe"},
{"type": "timeout", "seconds": 300}
]
},
"filters": {
"exclude_processes": ["dwm.exe", "winlogon.exe"],
"include_modules": ["ntdll.dll", "kernel32.dll", "user32.dll"]
}
}
EOF
# Monitor recording progress
reven record status malware-analysis-001
reven record list
# Stop recording
reven record stop malware-analysis-001
# Recording management
reven record pause malware-analysis-001
reven record resume malware-analysis-001
reven record cancel malware-analysis-001
Rejouer et analyser la configuration
# Load recording for analysis
reven replay load $REVEN_RECORDINGS/malware-001
# Create analysis project
reven project create \
--name "malware-analysis" \
--recording $REVEN_RECORDINGS/malware-001 \
--description "Analysis of malware sample XYZ"
# Open project in REVEN GUI
reven gui --project malware-analysis
# Command-line replay navigation
reven replay goto --instruction 1000
reven replay goto --time 10.5
reven replay goto --address 0x401000
# Replay control commands
reven replay step-forward
reven replay step-backward
reven replay run-forward --count 100
reven replay run-backward --count 50
# Set breakpoints in replay
reven replay breakpoint set --address 0x401000
reven replay breakpoint set --function "CreateFileA"
reven replay breakpoint set --module "ntdll.dll"
# List and manage breakpoints
reven replay breakpoint list
reven replay breakpoint delete --id 1
reven replay breakpoint disable --id 2
Débogage temps-voyage
Navigation et contrôle
# REVEN Python API for time-travel debugging
import reven2
# Connect to REVEN server
server = reven2.RevenServer("localhost", 13370)
# Open recording
recording = server.open_recording("/data/reven/recordings/malware-001")
# Get execution trace
trace = recording.trace
# Navigate through execution
print(f"Total instructions: {len(trace)}")
# Go to specific instruction
instruction = trace[1000]
print(f"Instruction: {instruction}")
print(f"Address: {hex(instruction.address)}")
print(f"Mnemonic: {instruction.mnemonic}")
# Time-travel navigation
def navigate_execution(trace, start_idx, end_idx):
"""Navigate through execution range"""
for i in range(start_idx, min(end_idx, len(trace))):
instruction = trace[i]
print(f"[{i:08d}] {hex(instruction.address):>10} {instruction.mnemonic}")
# Check for interesting events
if instruction.mnemonic.startswith("call"):
print(f" -> Function call detected")
if instruction.mnemonic.startswith("ret"):
print(f" <- Function return detected")
# Navigate execution range
navigate_execution(trace, 1000, 1100)
# Find specific instructions
def find_instructions(trace, mnemonic_pattern):
"""Find instructions matching pattern"""
matches = []
for i, instruction in enumerate(trace):
if mnemonic_pattern.lower() in instruction.mnemonic.lower():
matches.append((i, instruction))
return matches
# Find all call instructions
call_instructions = find_instructions(trace, "call")
print(f"Found {len(call_instructions)} call instructions")
# Display first 10 calls
for i, (idx, instruction) in enumerate(call_instructions[:10]):
print(f"Call {i+1}: [{idx:08d}] {hex(instruction.address)} {instruction.mnemonic}")
Analyse et suivi de la mémoire
# Memory analysis with REVEN
def analyze_memory_access(trace, address_range):
"""Analyze memory accesses in address range"""
memory_accesses = []
for i, instruction in enumerate(trace):
# Get memory accesses for this instruction
for access in instruction.memory_accesses():
if address_range[0] <= access.address <= address_range[1]:
memory_accesses.append({
'instruction_index': i,
'address': access.address,
'size': access.size,
'type': access.type, # read/write
'value': access.value
})
return memory_accesses
# Analyze heap memory accesses
heap_start = 0x00400000
heap_end = 0x00500000
heap_accesses = analyze_memory_access(trace, (heap_start, heap_end))
print(f"Heap memory accesses: {len(heap_accesses)}")
# Track specific memory location
def track_memory_location(trace, target_address):
"""Track all accesses to specific memory location"""
accesses = []
for i, instruction in enumerate(trace):
for access in instruction.memory_accesses():
if access.address == target_address:
accesses.append({
'instruction_index': i,
'instruction': instruction,
'type': access.type,
'value': access.value,
'timestamp': instruction.timestamp
})
return accesses
# Track critical memory location
critical_address = 0x00401234
memory_timeline = track_memory_location(trace, critical_address)
print(f"Memory location {hex(critical_address)} accessed {len(memory_timeline)} times")
# Display memory timeline
for access in memory_timeline:
print(f"[{access['instruction_index']:08d}] {access['type']}: {access['value']:08x}")
# Memory diff analysis
def memory_diff_analysis(trace, start_idx, end_idx, memory_range):
"""Analyze memory changes between two points"""
start_instruction = trace[start_idx]
end_instruction = trace[end_idx]
# Get memory state at start
start_memory = {}
for addr in range(memory_range[0], memory_range[1], 4):
try:
start_memory[addr] = start_instruction.memory_read(addr, 4)
except:
pass
# Get memory state at end
end_memory = {}
for addr in range(memory_range[0], memory_range[1], 4):
try:
end_memory[addr] = end_instruction.memory_read(addr, 4)
except:
pass
# Find differences
changes = []
for addr in start_memory:
if addr in end_memory and start_memory[addr] != end_memory[addr]:
changes.append({
'address': addr,
'old_value': start_memory[addr],
'new_value': end_memory[addr]
})
return changes
# Analyze memory changes during function execution
function_start = 1000
function_end = 2000
stack_range = (0x7fff0000, 0x7fff1000)
memory_changes = memory_diff_analysis(trace, function_start, function_end, stack_range)
print(f"Memory changes during function: {len(memory_changes)}")
for change in memory_changes[:10]: # Show first 10 changes
print(f"{hex(change['address'])}: {change['old_value']:08x} -> {change['new_value']:08x}")
Fonction Analyse des appels
# Function call analysis and tracing
def analyze_function_calls(trace, start_idx=0, end_idx=None):
"""Analyze function calls in execution trace"""
if end_idx is None:
end_idx = len(trace)
call_stack = []
function_calls = []
for i in range(start_idx, min(end_idx, len(trace))):
instruction = trace[i]
if instruction.mnemonic.startswith("call"):
# Function call
call_info = {
'instruction_index': i,
'caller_address': instruction.address,
'target_address': instruction.operands[0] if instruction.operands else None,
'stack_depth': len(call_stack),
'timestamp': instruction.timestamp
}
call_stack.append(call_info)
function_calls.append(call_info)
elif instruction.mnemonic.startswith("ret"):
# Function return
if call_stack:
call_info = call_stack.pop()
call_info['return_index'] = i
call_info['duration'] = i - call_info['instruction_index']
return function_calls, call_stack
# Analyze function calls
function_calls, remaining_stack = analyze_function_calls(trace, 0, 10000)
print(f"Function calls analyzed: {len(function_calls)}")
print(f"Unclosed calls: {len(remaining_stack)}")
# Display function call hierarchy
def display_call_hierarchy(function_calls, max_depth=5):
"""Display function call hierarchy"""
for call in function_calls:
if call['stack_depth'] <= max_depth:
indent = " " * call['stack_depth']
target = hex(call['target_address']) if call['target_address'] else "unknown"
duration = call.get('duration', 'ongoing')
print(f"{indent}[{call['instruction_index']:08d}] Call to {target} (duration: {duration})")
display_call_hierarchy(function_calls[:20])
# API call tracking
def track_api_calls(trace, api_addresses):
"""Track specific API calls"""
api_calls = []
for i, instruction in enumerate(trace):
if instruction.mnemonic.startswith("call"):
target = instruction.operands[0] if instruction.operands else None
if target in api_addresses:
api_name = api_addresses[target]
# Get function arguments (simplified)
args = []
try:
# Assume x86-64 calling convention
registers = instruction.registers
args = [
registers.get('rcx', 0),
registers.get('rdx', 0),
registers.get('r8', 0),
registers.get('r9', 0)
]
except:
pass
api_calls.append({
'instruction_index': i,
'api_name': api_name,
'arguments': args,
'caller_address': instruction.address
})
return api_calls
# Define API addresses (example)
api_addresses = {
0x77701234: "CreateFileA",
0x77701456: "ReadFile",
0x77701678: "WriteFile",
0x77701890: "CloseHandle"
}
api_calls = track_api_calls(trace, api_addresses)
print(f"API calls tracked: {len(api_calls)}")
for call in api_calls[:10]:
print(f"[{call['instruction_index']:08d}] {call['api_name']} called from {hex(call['caller_address'])}")
print(f" Arguments: {[hex(arg) for arg in call['arguments']]}")
Techniques d'analyse avancées
Analyse du flux de données
# Data flow analysis with REVEN
def trace_data_flow(trace, source_address, target_address):
"""Trace data flow from source to target"""
data_flow = []
tracked_values = set()
for i, instruction in enumerate(trace):
# Check if instruction reads from source
for access in instruction.memory_accesses():
if access.address == source_address and access.type == 'read':
tracked_values.add(access.value)
data_flow.append({
'instruction_index': i,
'type': 'source_read',
'address': access.address,
'value': access.value
})
# Check if instruction writes tracked value to target
for access in instruction.memory_accesses():
if (access.address == target_address and
access.type == 'write' and
access.value in tracked_values):
data_flow.append({
'instruction_index': i,
'type': 'target_write',
'address': access.address,
'value': access.value
})
return data_flow
# Trace data flow from input buffer to output
input_buffer = 0x00401000
output_buffer = 0x00402000
data_flow = trace_data_flow(trace, input_buffer, output_buffer)
print(f"Data flow events: {len(data_flow)}")
for event in data_flow:
event_type = event['type']
addr = hex(event['address'])
value = hex(event['value'])
print(f"[{event['instruction_index']:08d}] {event_type}: {addr} = {value}")
# Taint analysis
def taint_analysis(trace, taint_sources, max_instructions=10000):
"""Perform taint analysis on execution trace"""
tainted_memory = set()
tainted_registers = set()
taint_events = []
for i, instruction in enumerate(trace[:max_instructions]):
# Check for taint sources
for access in instruction.memory_accesses():
if access.address in taint_sources:
tainted_memory.add(access.address)
taint_events.append({
'instruction_index': i,
'type': 'taint_source',
'address': access.address,
'value': access.value
})
# Propagate taint through memory operations
for access in instruction.memory_accesses():
if access.type == 'read' and access.address in tainted_memory:
# Taint spreads to destination
if instruction.destination:
if instruction.destination.type == 'memory':
tainted_memory.add(instruction.destination.address)
elif instruction.destination.type == 'register':
tainted_registers.add(instruction.destination.name)
taint_events.append({
'instruction_index': i,
'type': 'taint_propagation',
'source': access.address,
'destination': instruction.destination
})
# Check for tainted data usage in critical operations
if instruction.mnemonic in ['call', 'jmp', 'cmp']:
for operand in instruction.operands:
if (operand.type == 'memory' and operand.address in tainted_memory) or \
(operand.type == 'register' and operand.name in tainted_registers):
taint_events.append({
'instruction_index': i,
'type': 'tainted_control_flow',
'instruction': instruction,
'tainted_operand': operand
})
return taint_events, tainted_memory, tainted_registers
# Perform taint analysis
taint_sources = {0x00401000, 0x00401004, 0x00401008} # Input buffer addresses
taint_events, tainted_mem, tainted_regs = taint_analysis(trace, taint_sources)
print(f"Taint events: {len(taint_events)}")
print(f"Tainted memory locations: {len(tainted_mem)}")
print(f"Tainted registers: {len(tainted_regs)}")
# Display critical taint events
critical_events = [e for e in taint_events if e['type'] == 'tainted_control_flow']
print(f"Critical taint events (control flow): {len(critical_events)}")
for event in critical_events[:5]:
idx = event['instruction_index']
instr = event['instruction']
print(f"[{idx:08d}] Tainted control flow: {hex(instr.address)} {instr.mnemonic}")
Analyse de vulnérabilité
# Vulnerability detection and analysis
def detect_buffer_overflows(trace, buffer_ranges):
"""Detect potential buffer overflow vulnerabilities"""
potential_overflows = []
for i, instruction in enumerate(trace):
for access in instruction.memory_accesses():
if access.type == 'write':
# Check if write is outside buffer bounds
for buffer_start, buffer_end in buffer_ranges:
if buffer_start <= access.address < buffer_end:
# Write within buffer - check for boundary violations
if access.address + access.size > buffer_end:
potential_overflows.append({
'instruction_index': i,
'instruction': instruction,
'buffer_start': buffer_start,
'buffer_end': buffer_end,
'write_address': access.address,
'write_size': access.size,
'overflow_bytes': (access.address + access.size) - buffer_end
})
return potential_overflows
# Define buffer ranges to monitor
buffer_ranges = [
(0x00401000, 0x00401100), # 256-byte buffer
(0x7fff0000, 0x7fff1000), # Stack buffer
]
overflows = detect_buffer_overflows(trace, buffer_ranges)
print(f"Potential buffer overflows detected: {len(overflows)}")
for overflow in overflows:
idx = overflow['instruction_index']
addr = hex(overflow['write_address'])
size = overflow['write_size']
overflow_bytes = overflow['overflow_bytes']
print(f"[{idx:08d}] Buffer overflow: write to {addr} (size {size}, overflow {overflow_bytes} bytes)")
# Use-after-free detection
def detect_use_after_free(trace, heap_operations):
"""Detect use-after-free vulnerabilities"""
freed_memory = set()
uaf_violations = []
for i, instruction in enumerate(trace):
# Track heap operations
if instruction.address in heap_operations:
operation = heap_operations[instruction.address]
if operation['type'] == 'free':
# Get freed address from function argument
freed_address = operation['address']
freed_memory.add(freed_address)
elif operation['type'] == 'malloc':
# Remove from freed set if reallocated
allocated_address = operation['address']
freed_memory.discard(allocated_address)
# Check for accesses to freed memory
for access in instruction.memory_accesses():
if access.address in freed_memory:
uaf_violations.append({
'instruction_index': i,
'instruction': instruction,
'freed_address': access.address,
'access_type': access.type,
'access_size': access.size
})
return uaf_violations
# Define heap operations (simplified)
heap_operations = {
0x77701234: {'type': 'malloc', 'address': 0x00500000},
0x77701456: {'type': 'free', 'address': 0x00500000},
}
uaf_violations = detect_use_after_free(trace, heap_operations)
print(f"Use-after-free violations: {len(uaf_violations)}")
for violation in uaf_violations:
idx = violation['instruction_index']
addr = hex(violation['freed_address'])
access_type = violation['access_type']
print(f"[{idx:08d}] Use-after-free: {access_type} access to freed memory {addr}")
# Format string vulnerability detection
def detect_format_string_vulns(trace, printf_functions):
"""Detect format string vulnerabilities"""
format_vulns = []
for i, instruction in enumerate(trace):
if instruction.mnemonic.startswith("call"):
target = instruction.operands[0] if instruction.operands else None
if target in printf_functions:
# Analyze format string argument
try:
# Get format string (first argument)
format_string_addr = instruction.registers.get('rcx', 0) # x86-64
format_string = instruction.memory_read_string(format_string_addr)
# Check for user-controlled format string
if '%n' in format_string or format_string.count('%') > 10:
format_vulns.append({
'instruction_index': i,
'function': printf_functions[target],
'format_string': format_string,
'format_string_address': format_string_addr
})
except:
pass
return format_vulns
# Define printf-family functions
printf_functions = {
0x77701234: "printf",
0x77701456: "sprintf",
0x77701678: "fprintf",
}
format_vulns = detect_format_string_vulns(trace, printf_functions)
print(f"Format string vulnerabilities: {len(format_vulns)}")
for vuln in format_vulns:
idx = vuln['instruction_index']
func = vuln['function']
fmt_str = vuln['format_string'][:50] # Truncate for display
print(f"[{idx:08d}] Format string vuln in {func}: '{fmt_str}'")
Analyse cryptographique
# Cryptographic algorithm detection and analysis
def detect_crypto_algorithms(trace, crypto_patterns):
"""Detect cryptographic algorithms in execution"""
crypto_detections = []
for i, instruction in enumerate(trace):
# Check for crypto-specific instruction patterns
if instruction.mnemonic in ['aes', 'sha', 'xor']:
crypto_detections.append({
'instruction_index': i,
'type': 'crypto_instruction',
'algorithm': instruction.mnemonic,
'address': instruction.address
})
# Check for crypto constants
for access in instruction.memory_accesses():
if access.type == 'read':
value = access.value
# Check against known crypto constants
for algorithm, constants in crypto_patterns.items():
if value in constants:
crypto_detections.append({
'instruction_index': i,
'type': 'crypto_constant',
'algorithm': algorithm,
'constant': value,
'address': access.address
})
return crypto_detections
# Define crypto patterns and constants
crypto_patterns = {
'AES': [0x63636363, 0x7c7c7c7c, 0x77777777, 0x7b7b7b7b], # AES S-box constants
'SHA1': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476], # SHA1 initial values
'SHA256': [0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a], # SHA256 initial values
'MD5': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476], # MD5 initial values
}
crypto_detections = detect_crypto_algorithms(trace, crypto_patterns)
print(f"Cryptographic algorithm detections: {len(crypto_detections)}")
# Group by algorithm
crypto_by_algorithm = {}
for detection in crypto_detections:
algorithm = detection['algorithm']
if algorithm not in crypto_by_algorithm:
crypto_by_algorithm[algorithm] = []
crypto_by_algorithm[algorithm].append(detection)
for algorithm, detections in crypto_by_algorithm.items():
print(f"{algorithm}: {len(detections)} detections")
# Analyze crypto key usage
def analyze_crypto_keys(trace, key_addresses):
"""Analyze cryptographic key usage"""
key_usage = []
for i, instruction in enumerate(trace):
for access in instruction.memory_accesses():
if access.address in key_addresses:
key_usage.append({
'instruction_index': i,
'key_address': access.address,
'access_type': access.type,
'instruction': instruction,
'context': 'crypto_operation'
})
return key_usage
# Define crypto key addresses
key_addresses = {0x00401200, 0x00401210, 0x00401220}
key_usage = analyze_crypto_keys(trace, key_addresses)
print(f"Cryptographic key usage events: {len(key_usage)}")
for usage in key_usage[:10]:
idx = usage['instruction_index']
addr = hex(usage['key_address'])
access_type = usage['access_type']
print(f"[{idx:08d}] Key {access_type}: {addr}")
Scénario et automatisation
API Python REVEN
# Advanced REVEN Python scripting
import reven2
import json
import time
from datetime import datetime
class REVENAnalyzer:
def __init__(self, server_host="localhost", server_port=13370):
self.server = reven2.RevenServer(server_host, server_port)
self.recording = None
self.trace = None
self.results = {}
def load_recording(self, recording_path):
"""Load recording for analysis"""
try:
self.recording = self.server.open_recording(recording_path)
self.trace = self.recording.trace
print(f"Loaded recording: {recording_path}")
print(f"Total instructions: {len(self.trace)}")
return True
except Exception as e:
print(f"Error loading recording: {e}")
return False
def analyze_execution_flow(self, start_idx=0, end_idx=None):
"""Analyze execution flow patterns"""
if not self.trace:
print("No trace loaded")
return None
if end_idx is None:
end_idx = min(len(self.trace), start_idx + 10000) # Limit for performance
flow_analysis = {
'total_instructions': end_idx - start_idx,
'function_calls': 0,
'function_returns': 0,
'jumps': 0,
'conditional_jumps': 0,
'loops_detected': 0,
'call_stack_depth': 0,
'unique_addresses': set()
}
call_stack = []
for i in range(start_idx, end_idx):
instruction = self.trace[i]
flow_analysis['unique_addresses'].add(instruction.address)
mnemonic = instruction.mnemonic.lower()
if mnemonic.startswith('call'):
flow_analysis['function_calls'] += 1
call_stack.append(instruction.address)
flow_analysis['call_stack_depth'] = max(
flow_analysis['call_stack_depth'],
len(call_stack)
)
elif mnemonic.startswith('ret'):
flow_analysis['function_returns'] += 1
if call_stack:
call_stack.pop()
elif mnemonic.startswith('j'):
flow_analysis['jumps'] += 1
if mnemonic in ['je', 'jne', 'jz', 'jnz', 'jl', 'jg', 'jle', 'jge']:
flow_analysis['conditional_jumps'] += 1
flow_analysis['unique_addresses'] = len(flow_analysis['unique_addresses'])
flow_analysis['final_call_stack_depth'] = len(call_stack)
return flow_analysis
def detect_anomalies(self, start_idx=0, end_idx=None):
"""Detect execution anomalies"""
if not self.trace:
return None
if end_idx is None:
end_idx = min(len(self.trace), start_idx + 10000)
anomalies = {
'suspicious_instructions': [],
'unusual_memory_patterns': [],
'potential_exploits': [],
'anti_analysis_techniques': []
}
for i in range(start_idx, end_idx):
instruction = self.trace[i]
# Detect suspicious instructions
if self.is_suspicious_instruction(instruction):
anomalies['suspicious_instructions'].append({
'index': i,
'address': instruction.address,
'mnemonic': instruction.mnemonic,
'reason': self.get_suspicion_reason(instruction)
})
# Detect unusual memory patterns
memory_anomaly = self.check_memory_anomaly(instruction)
if memory_anomaly:
anomalies['unusual_memory_patterns'].append({
'index': i,
'address': instruction.address,
'anomaly': memory_anomaly
})
# Detect potential exploits
exploit_indicator = self.check_exploit_indicators(instruction)
if exploit_indicator:
anomalies['potential_exploits'].append({
'index': i,
'address': instruction.address,
'indicator': exploit_indicator
})
return anomalies
def is_suspicious_instruction(self, instruction):
"""Check if instruction is suspicious"""
suspicious_patterns = [
'int 0x80', # System call
'sysenter', # System call
'syscall', # System call
'rdtsc', # Timestamp counter (anti-debugging)
'cpuid', # CPU identification (anti-VM)
]
return any(pattern in instruction.mnemonic.lower() for pattern in suspicious_patterns)
def get_suspicion_reason(self, instruction):
"""Get reason for instruction suspicion"""
mnemonic = instruction.mnemonic.lower()
if 'int' in mnemonic or 'syscall' in mnemonic or 'sysenter' in mnemonic:
return "System call detected"
elif 'rdtsc' in mnemonic:
return "Timestamp counter access (anti-debugging)"
elif 'cpuid' in mnemonic:
return "CPU identification (anti-VM)"
else:
return "Unknown suspicious pattern"
def check_memory_anomaly(self, instruction):
"""Check for memory access anomalies"""
for access in instruction.memory_accesses():
# Check for executable memory writes
if access.type == 'write' and self.is_executable_memory(access.address):
return "Write to executable memory (code injection)"
# Check for large memory operations
if access.size > 1024:
return f"Large memory operation ({access.size} bytes)"
# Check for null pointer dereference
if access.address < 0x1000:
return "Potential null pointer dereference"
return None
def is_executable_memory(self, address):
"""Check if memory address is in executable region"""
# Simplified check - in practice would use memory map
executable_ranges = [
(0x00400000, 0x00500000), # Typical executable range
(0x77700000, 0x77800000), # DLL range
]
return any(start <= address < end for start, end in executable_ranges)
def check_exploit_indicators(self, instruction):
"""Check for exploit indicators"""
# Check for ROP gadgets
if instruction.mnemonic.startswith('ret') and len(instruction.operands) > 0:
return "Potential ROP gadget"
# Check for stack pivot
if 'esp' in instruction.mnemonic or 'rsp' in instruction.mnemonic:
if instruction.mnemonic.startswith('mov') or instruction.mnemonic.startswith('xchg'):
return "Potential stack pivot"
# Check for shellcode patterns
if instruction.mnemonic in ['nop', 'inc', 'dec'] and self.is_shellcode_context(instruction):
return "Potential shellcode pattern"
return None
def is_shellcode_context(self, instruction):
"""Check if instruction is in shellcode context"""
# Simplified heuristic - check for executable heap/stack
return 0x00400000 <= instruction.address <= 0x00500000
def generate_timeline(self, events, output_file="timeline.json"):
"""Generate execution timeline"""
timeline = {
'metadata': {
'generated_at': datetime.now().isoformat(),
'total_events': len(events),
'recording_path': str(self.recording.path) if self.recording else None
},
'events': []
}
for event in events:
timeline_event = {
'timestamp': event.get('timestamp', 0),
'instruction_index': event.get('instruction_index', 0),
'address': hex(event.get('address', 0)),
'type': event.get('type', 'unknown'),
'description': event.get('description', ''),
'severity': event.get('severity', 'info')
}
timeline['events'].append(timeline_event)
# Sort by instruction index
timeline['events'].sort(key=lambda x: x['instruction_index'])
with open(output_file, 'w') as f:
json.dump(timeline, f, indent=2)
print(f"Timeline saved to {output_file}")
return timeline
def export_analysis_results(self, output_file="reven_analysis.json"):
"""Export analysis results"""
export_data = {
'analysis_metadata': {
'timestamp': datetime.now().isoformat(),
'recording_path': str(self.recording.path) if self.recording else None,
'trace_length': len(self.trace) if self.trace else 0
},
'results': self.results
}
with open(output_file, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"Analysis results exported to {output_file}")
def run_comprehensive_analysis(self, start_idx=0, end_idx=None):
"""Run comprehensive analysis"""
print("Starting comprehensive REVEN analysis...")
start_time = time.time()
# Execution flow analysis
print("Analyzing execution flow...")
flow_results = self.analyze_execution_flow(start_idx, end_idx)
self.results['execution_flow'] = flow_results
# Anomaly detection
print("Detecting anomalies...")
anomaly_results = self.detect_anomalies(start_idx, end_idx)
self.results['anomalies'] = anomaly_results
# Generate timeline
print("Generating timeline...")
timeline_events = []
if anomaly_results:
for category, items in anomaly_results.items():
for item in items:
timeline_events.append({
'instruction_index': item['index'],
'address': item['address'],
'type': category,
'description': item.get('reason', item.get('anomaly', item.get('indicator', ''))),
'severity': 'warning'
})
timeline = self.generate_timeline(timeline_events)
self.results['timeline'] = timeline
# Export results
self.export_analysis_results()
elapsed_time = time.time() - start_time
print(f"Comprehensive analysis completed in {elapsed_time:.2f} seconds")
return self.results
# Usage example
if __name__ == "__main__":
analyzer = REVENAnalyzer()
# Load recording
recording_path = "/data/reven/recordings/malware-001"
if analyzer.load_recording(recording_path):
# Run comprehensive analysis
results = analyzer.run_comprehensive_analysis(0, 5000)
# Display summary
print("\n=== Analysis Summary ===")
if 'execution_flow' in results:
flow = results['execution_flow']
print(f"Instructions analyzed: {flow['total_instructions']}")
print(f"Function calls: {flow['function_calls']}")
print(f"Unique addresses: {flow['unique_addresses']}")
if 'anomalies' in results:
anomalies = results['anomalies']
total_anomalies = sum(len(items) for items in anomalies.values())
print(f"Total anomalies detected: {total_anomalies}")
for category, items in anomalies.items():
if items:
print(f" {category}: {len(items)}")
Traitement par lots et automatisation
# Batch processing scripts for REVEN
#!/bin/bash
# Batch analysis script
REVEN_BATCH_ANALYSIS_SCRIPT="batch_analysis.sh"
cat > $REVEN_BATCH_ANALYSIS_SCRIPT << 'EOF'
#!/bin/bash
# REVEN Batch Analysis Script
set -e
RECORDINGS_DIR="/data/reven/recordings"
RESULTS_DIR="/data/reven/analysis-results"
PYTHON_SCRIPT="/opt/reven/scripts/batch_analyzer.py"
# Create results directory
mkdir -p "$RESULTS_DIR"
# Function to analyze single recording
analyze_recording() {
local recording_path="$1"
local recording_name=$(basename "$recording_path")
local result_dir="$RESULTS_DIR/$recording_name"
echo "Analyzing recording: $recording_name"
# Create result directory
mkdir -p "$result_dir"
# Run Python analysis
python3 "$PYTHON_SCRIPT" \
--recording "$recording_path" \
--output "$result_dir" \
--format json,html \
--timeout 3600
# Generate summary report
reven report generate \
--recording "$recording_path" \
--output "$result_dir/summary.pdf" \
--template security-analysis
echo "Analysis completed: $recording_name"
}
# Process all recordings
for recording in "$RECORDINGS_DIR"/*; do
if [ -d "$recording" ]; then
analyze_recording "$recording"
fi
done
# Generate consolidated report
python3 /opt/reven/scripts/consolidate_results.py \
--input "$RESULTS_DIR" \
--output "$RESULTS_DIR/consolidated_report.html"
echo "Batch analysis completed"
EOF
chmod +x $REVEN_BATCH_ANALYSIS_SCRIPT
# Automated recording script
REVEN_AUTO_RECORD_SCRIPT="auto_record.sh"
cat > $REVEN_AUTO_RECORD_SCRIPT << 'EOF'
#!/bin/bash
# REVEN Automated Recording Script
set -e
VM_NAME="analysis-vm"
SNAPSHOT_NAME="clean-state"
SAMPLES_DIR="/data/malware-samples"
RECORDINGS_DIR="/data/reven/recordings"
# Function to record malware sample
record_sample() {
local sample_path="$1"
local sample_name=$(basename "$sample_path" .exe)
local recording_name="recording-$sample_name-$(date +%Y%m%d-%H%M%S)"
local recording_path="$RECORDINGS_DIR/$recording_name"
echo "Recording sample: $sample_name"
# Reset VM to clean state
reven vm reset "$VM_NAME" --snapshot "$SNAPSHOT_NAME"
# Start recording
reven record start \
--vm "$VM_NAME" \
--snapshot "$SNAPSHOT_NAME" \
--name "$recording_name" \
--duration 300 \
--max-size 5G \
--output-dir "$recording_path"
# Wait for VM to boot
sleep 30
# Copy sample to VM
reven vm copy-to "$VM_NAME" "$sample_path" "C:\\malware.exe"
# Execute sample
reven vm execute "$VM_NAME" "C:\\malware.exe"
# Wait for execution
sleep 120
# Stop recording
reven record stop "$recording_name"
echo "Recording completed: $recording_name"
# Trigger analysis
if [ -f "$REVEN_BATCH_ANALYSIS_SCRIPT" ]; then
analyze_recording "$recording_path"
fi
}
# Process all samples
for sample in "$SAMPLES_DIR"/*.exe; do
if [ -f "$sample" ]; then
record_sample "$sample"
fi
done
echo "Automated recording completed"
EOF
chmod +x $REVEN_AUTO_RECORD_SCRIPT
# Monitoring and alerting script
REVEN_MONITOR_SCRIPT="monitor.sh"
cat > $REVEN_MONITOR_SCRIPT << 'EOF'
#!/bin/bash
# REVEN Monitoring Script
set -e
LOG_FILE="/var/log/reven-monitor.log"
ALERT_EMAIL="security@company.com"
THRESHOLD_CPU=80
THRESHOLD_MEMORY=90
THRESHOLD_DISK=85
# Function to log with timestamp
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
# Function to send alert
send_alert() {
local subject="$1"
local message="$2"
echo "$message" | mail -s "$subject" "$ALERT_EMAIL"
log_message "ALERT: $subject"
}
# Check REVEN server status
check_reven_status() {
if ! systemctl is-active --quiet reven-server; then
send_alert "REVEN Server Down" "REVEN server is not running"
return 1
fi
return 0
}
# Check system resources
check_resources() {
# Check CPU usage
cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
if (( $(echo "$cpu_usage > $THRESHOLD_CPU" | bc -l) )); then
send_alert "High CPU Usage" "CPU usage is ${cpu_usage}%"
fi
# Check memory usage
memory_usage=$(free | grep Mem | awk '{printf("%.1f", $3/$2 * 100.0)}')
if (( $(echo "$memory_usage > $THRESHOLD_MEMORY" | bc -l) )); then
send_alert "High Memory Usage" "Memory usage is ${memory_usage}%"
fi
# Check disk usage
disk_usage=$(df /data | tail -1 | awk '{print $5}' | cut -d'%' -f1)
if [ "$disk_usage" -gt "$THRESHOLD_DISK" ]; then
send_alert "High Disk Usage" "Disk usage is ${disk_usage}%"
fi
}
# Check recording status
check_recordings() {
active_recordings=$(reven record list --status active | wc -l)
if [ "$active_recordings" -gt 5 ]; then
send_alert "Too Many Active Recordings" "Currently $active_recordings active recordings"
fi
# Check for failed recordings
failed_recordings=$(reven record list --status failed | wc -l)
if [ "$failed_recordings" -gt 0 ]; then
send_alert "Failed Recordings Detected" "$failed_recordings recordings have failed"
fi
}
# Main monitoring loop
log_message "Starting REVEN monitoring"
while true; do
check_reven_status
check_resources
check_recordings
sleep 300 # Check every 5 minutes
done
EOF
chmod +x $REVEN_MONITOR_SCRIPT
# Create systemd service for monitoring
sudo tee /etc/systemd/system/reven-monitor.service > /dev/null << 'EOF'
[Unit]
Description=REVEN Monitoring Service
After=network.target
[Service]
Type=simple
User=reven
ExecStart=/opt/reven/scripts/monitor.sh
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
# Enable and start monitoring service
sudo systemctl daemon-reload
sudo systemctl enable reven-monitor
sudo systemctl start reven-monitor
echo "REVEN automation scripts created and monitoring service started"
Pratiques exemplaires et conseils
Optimisation des performances
# REVEN performance optimization
# System tuning for optimal REVEN performance
# Kernel parameters for large memory systems
echo 'vm.swappiness=1' | sudo tee -a /etc/sysctl.conf
echo 'vm.dirty_ratio=5' | sudo tee -a /etc/sysctl.conf
echo 'vm.dirty_background_ratio=2' | sudo tee -a /etc/sysctl.conf
echo 'kernel.shmmax=68719476736' | sudo tee -a /etc/sysctl.conf # 64GB
echo 'kernel.shmall=16777216' | sudo tee -a /etc/sysctl.conf
# Apply kernel parameters
sudo sysctl -p
# CPU governor for performance
echo 'performance' | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
# Disable CPU frequency scaling
sudo systemctl disable ondemand
# Storage optimization
# Use deadline scheduler for SSDs
echo 'deadline' | sudo tee /sys/block/sda/queue/scheduler
# Increase I/O queue depth
echo '32' | sudo tee /sys/block/sda/queue/nr_requests
# REVEN-specific optimizations
# Increase file descriptor limits
echo 'reven soft nofile 65536' | sudo tee -a /etc/security/limits.conf
echo 'reven hard nofile 65536' | sudo tee -a /etc/security/limits.conf
# Optimize REVEN configuration
cat > /opt/reven/etc/performance.conf << 'EOF'
# REVEN Performance Configuration
[recording]
# Use compression for storage efficiency
compression_level = 6
compression_algorithm = lz4
# Memory management
memory_pool_size = 16G
max_memory_usage = 32G
# I/O optimization
async_io = true
io_threads = 8
buffer_size = 64M
[analysis]
# Parallel processing
max_worker_threads = 16
enable_parallel_analysis = true
# Caching
cache_size = 8G
enable_instruction_cache = true
enable_memory_cache = true
[storage]
# Storage paths
recordings_path = /data/reven/recordings
cache_path = /data/reven/cache
temp_path = /tmp/reven
# Cleanup policies
auto_cleanup = true
max_recording_age = 30d
max_cache_size = 100G
EOF
Sécurité et pratiques exemplaires
# REVEN security hardening
# Secure REVEN deployment practices
# Create dedicated REVEN user
sudo useradd -r -s /bin/bash -d /opt/reven -m reven
# Set proper permissions
sudo chown -R reven:reven /opt/reven
sudo chown -R reven:reven /data/reven
sudo chmod 750 /opt/reven
sudo chmod 750 /data/reven
# Network security
# Configure firewall for REVEN
sudo ufw allow from 192.168.1.0/24 to any port 13370 comment 'REVEN Server'
sudo ufw allow from 192.168.1.0/24 to any port 13371 comment 'REVEN Web UI'
# SSL/TLS configuration
# Generate SSL certificates
sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout /opt/reven/etc/ssl/reven.key \
-out /opt/reven/etc/ssl/reven.crt \
-subj "/C=US/ST=State/L=City/O=Organization/CN=reven.company.com"
# Configure REVEN for SSL
cat > /opt/reven/etc/ssl.conf << 'EOF'
[ssl]
enabled = true
certificate = /opt/reven/etc/ssl/reven.crt
private_key = /opt/reven/etc/ssl/reven.key
protocols = TLSv1.2,TLSv1.3
ciphers = ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256
[authentication]
enabled = true
method = ldap
ldap_server = ldap://ldap.company.com
ldap_base_dn = dc=company,dc=com
ldap_user_filter = (uid=%s)
EOF
# Backup and recovery
# Create backup script
cat > /opt/reven/scripts/backup.sh << 'EOF'
#!/bin/bash
BACKUP_DIR="/backup/reven"
DATE=$(date +%Y%m%d-%H%M%S)
BACKUP_NAME="reven-backup-$DATE"
# Create backup directory
mkdir -p "$BACKUP_DIR"
# Backup configuration
tar -czf "$BACKUP_DIR/$BACKUP_NAME-config.tar.gz" /opt/reven/etc/
# Backup critical recordings (last 7 days)
find /data/reven/recordings -mtime -7 -type f -name "*.reven" \
-exec tar -czf "$BACKUP_DIR/$BACKUP_NAME-recordings.tar.gz" {} +
# Backup database
reven db export --output "$BACKUP_DIR/$BACKUP_NAME-database.sql"
# Cleanup old backups (keep 30 days)
find "$BACKUP_DIR" -name "reven-backup-*" -mtime +30 -delete
echo "Backup completed: $BACKUP_NAME"
EOF
chmod +x /opt/reven/scripts/backup.sh
# Schedule daily backups
echo "0 2 * * * /opt/reven/scripts/backup.sh" | sudo crontab -u reven -
# Audit logging
# Configure audit logging for REVEN
cat > /opt/reven/etc/audit.conf << 'EOF'
[audit]
enabled = true
log_file = /var/log/reven/audit.log
log_level = INFO
log_rotation = daily
max_log_size = 100M
[events]
log_user_actions = true
log_recording_operations = true
log_analysis_operations = true
log_system_events = true
log_security_events = true
EOF
# Create log directory
sudo mkdir -p /var/log/reven
sudo chown reven:reven /var/log/reven
# Configure logrotate
sudo tee /etc/logrotate.d/reven > /dev/null << 'EOF'
/var/log/reven/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 644 reven reven
postrotate
systemctl reload reven-server
endscript
}
EOF
Ressources
Documentation et apprentissage
- [Documentation sur le REVEN] (LINK_16) - Documentation officielle
- [Guide de l'utilisateur REVEN] (LINK_16) - Guide de l'utilisateur complet
- API Python REVEN - Documentation de l'API Python
- Blogue REVEN - Dernières mises à jour et études de cas
Formation et certification
- [Formation REVEN] (LINK_16) - Cours de formation officiels
- [Entraînement en génie inverse] (LINK_16) - Techniques de RE avancées
- [Recherche sur la vulnérabilité] (LINK_16) - Méthodes de recherche sur la vulnérabilité
- [Analyse des logiciels malveillants] (LINK_16) - Techniques d'analyse des logiciels malveillants
Communauté et soutien
- Communauté REVEN - Communauté des utilisateurs et forums
- [Support REVEN] (LINK_16) - Portail de soutien technique
- REVEN GitHub - Outils et scripts open source
- Documents de recherche - Recherche universitaire et publications
Outils connexes et intégration
- [Intégration GDB] (LINK_16) - Intégration GDB
- [IDA Pro Plugin] (LINK_16) - IDA Intégration pro
- [Intégration Ghidra] (LINK_16) - Intégration Ghidra
- [Intégration YARA] (LINK_16) - Intégration des règles YARA