Aller au contenu

RECETTES Feuille de chaleur

Copier toutes les commandes Générer PDF

Aperçu général

REVEN by Tetrane est une plate-forme avancée d'analyse binaire dynamique et de débogage inverse qui fournit des capacités d'enregistrement et de rejouage de tout le système. Il permet le débogage temps-voyage, permettant aux analystes d'enregistrer l'exécution de programme, puis de naviguer en arrière et en avant à travers le calendrier d'exécution pour comprendre les comportements complexes, trouver des vulnérabilités et analyser les logiciels malveillants.

C'est-à-dire Principales forces: Débogage des voyages dans le temps, enregistrement du système entier, visualisation de la mémoire, traçage de l'exécution, recherche sur la vulnérabilité et capacités avancées d'analyse dynamique.

Installation et configuration

Exigences du système et installation

# System Requirements:
# - Linux (Ubuntu 18.04+ or CentOS 7+)
# - Intel CPU with VT-x support
# - 32GB RAM minimum (64GB+ recommended)
# - 1TB+ SSD storage for recordings
# - NVIDIA GPU (optional, for acceleration)

# REVEN Installation (Enterprise License Required)
# Contact Tetrane for licensing: https://www.tetrane.com/

# Download REVEN installer
wget https://download.tetrane.com/reven-installer.run

# Make installer executable
chmod +x reven-installer.run

# Run installer as root
sudo ./reven-installer.run

# Follow installation wizard
# - Choose installation directory (/opt/reven recommended)
# - Configure license server
# - Set up user permissions
# - Configure storage paths

# Post-installation setup
sudo usermod -a -G reven $USER
sudo systemctl enable reven-server
sudo systemctl start reven-server

# Verify installation
reven --version
reven-server status

Configuration de la licence

# License Server Configuration
# Edit license configuration
sudo nano /opt/reven/etc/license.conf

# Example license configuration:
[license]
server_host = license.company.com
server_port = 27000
license_file = /opt/reven/etc/reven.lic

# Floating license configuration
[floating_license]
enabled = true
server_host = 192.168.1.100
server_port = 27000
checkout_timeout = 3600

# Verify license
reven license status
reven license info

# Test license connectivity
reven license test-connection
```_

### Aménagement de l'environnement
```bash
# REVEN Environment Variables
export REVEN_HOME=/opt/reven
export REVEN_PROJECTS=/data/reven/projects
export REVEN_RECORDINGS=/data/reven/recordings
export PATH=$REVEN_HOME/bin:$PATH

# Add to ~/.bashrc for persistence
echo 'export REVEN_HOME=/opt/reven' >> ~/.bashrc
echo 'export REVEN_PROJECTS=/data/reven/projects' >> ~/.bashrc
echo 'export REVEN_RECORDINGS=/data/reven/recordings' >> ~/.bashrc
echo 'export PATH=$REVEN_HOME/bin:$PATH' >> ~/.bashrc

# Create project directories
mkdir -p $REVEN_PROJECTS
mkdir -p $REVEN_RECORDINGS
sudo chown -R $USER:reven $REVEN_PROJECTS
sudo chown -R $USER:reven $REVEN_RECORDINGS

# Configure storage quotas
sudo setquota -u $USER 100G 120G 0 0 /data

# Set up VM templates directory
mkdir -p $REVEN_HOME/vm-templates
```_

## Enregistrement et rediffusion

### Configuration et configuration de VM
```bash
# Create VM for recording
reven vm create --name "analysis-vm" \
    --os windows10 \
    --memory 4096 \
    --disk 50G \
    --iso /path/to/windows10.iso

# Configure VM settings
reven vm configure analysis-vm \
    --cpu-count 2 \
    --enable-nested-virtualization \
    --disable-aslr \
    --enable-debug-symbols

# Install VM operating system
reven vm install analysis-vm \
    --unattended \
    --admin-password "Password123!" \
    --timezone "UTC"

# Create VM snapshot for recording
reven vm snapshot analysis-vm \
    --name "clean-state" \
    --description "Clean Windows 10 installation"

# List available VMs
reven vm list

# VM management commands
reven vm start analysis-vm
reven vm stop analysis-vm
reven vm reset analysis-vm
reven vm delete analysis-vm

Enregistrement de l'exécution

# Start recording session
reven record start \
    --vm analysis-vm \
    --snapshot clean-state \
    --name "malware-analysis-001" \
    --duration 300 \
    --output-dir $REVEN_RECORDINGS/malware-001

# Recording with specific triggers
reven record start \
    --vm analysis-vm \
    --snapshot clean-state \
    --name "exploit-analysis" \
    --trigger-on-process "notepad.exe" \
    --trigger-on-module "kernel32.dll" \
    --max-size 10G

# Interactive recording session
reven record interactive \
    --vm analysis-vm \
    --snapshot clean-state \
    --name "interactive-session"

# Recording with custom configuration
reven record start \
    --vm analysis-vm \
    --config recording-config.json \
    --name "custom-recording"

# Example recording configuration (recording-config.json)
cat > recording-config.json << 'EOF'
{
    "recording": {
        "max_duration": 600,
        "max_size": "20GB",
        "compression": "lz4",
        "memory_tracking": true,
        "syscall_tracking": true,
        "network_tracking": true
    },
    "triggers": {
        "start_triggers": [
            {"type": "process", "name": "malware.exe"},
            {"type": "file_access", "path": "C:\\Windows\\System32\\"}
        ],
        "stop_triggers": [
            {"type": "process_exit", "name": "malware.exe"},
            {"type": "timeout", "seconds": 300}
        ]
    },
    "filters": {
        "exclude_processes": ["dwm.exe", "winlogon.exe"],
        "include_modules": ["ntdll.dll", "kernel32.dll", "user32.dll"]
    }
}
EOF

# Monitor recording progress
reven record status malware-analysis-001
reven record list

# Stop recording
reven record stop malware-analysis-001

# Recording management
reven record pause malware-analysis-001
reven record resume malware-analysis-001
reven record cancel malware-analysis-001

Rejouer et analyser la configuration

# Load recording for analysis
reven replay load $REVEN_RECORDINGS/malware-001

# Create analysis project
reven project create \
    --name "malware-analysis" \
    --recording $REVEN_RECORDINGS/malware-001 \
    --description "Analysis of malware sample XYZ"

# Open project in REVEN GUI
reven gui --project malware-analysis

# Command-line replay navigation
reven replay goto --instruction 1000
reven replay goto --time 10.5
reven replay goto --address 0x401000

# Replay control commands
reven replay step-forward
reven replay step-backward
reven replay run-forward --count 100
reven replay run-backward --count 50

# Set breakpoints in replay
reven replay breakpoint set --address 0x401000
reven replay breakpoint set --function "CreateFileA"
reven replay breakpoint set --module "ntdll.dll"

# List and manage breakpoints
reven replay breakpoint list
reven replay breakpoint delete --id 1
reven replay breakpoint disable --id 2

Débogage temps-voyage

# REVEN Python API for time-travel debugging
import reven2

# Connect to REVEN server
server = reven2.RevenServer("localhost", 13370)

# Open recording
recording = server.open_recording("/data/reven/recordings/malware-001")

# Get execution trace
trace = recording.trace

# Navigate through execution
print(f"Total instructions: {len(trace)}")

# Go to specific instruction
instruction = trace[1000]
print(f"Instruction: {instruction}")
print(f"Address: {hex(instruction.address)}")
print(f"Mnemonic: {instruction.mnemonic}")

# Time-travel navigation
def navigate_execution(trace, start_idx, end_idx):
    """Navigate through execution range"""

    for i in range(start_idx, min(end_idx, len(trace))):
        instruction = trace[i]

        print(f"[{i:08d}] {hex(instruction.address):>10} {instruction.mnemonic}")

        # Check for interesting events
        if instruction.mnemonic.startswith("call"):
            print(f"  -> Function call detected")

        if instruction.mnemonic.startswith("ret"):
            print(f"  <- Function return detected")

# Navigate execution range
navigate_execution(trace, 1000, 1100)

# Find specific instructions
def find_instructions(trace, mnemonic_pattern):
    """Find instructions matching pattern"""

    matches = []
    for i, instruction in enumerate(trace):
        if mnemonic_pattern.lower() in instruction.mnemonic.lower():
            matches.append((i, instruction))

    return matches

# Find all call instructions
call_instructions = find_instructions(trace, "call")
print(f"Found {len(call_instructions)} call instructions")

# Display first 10 calls
for i, (idx, instruction) in enumerate(call_instructions[:10]):
    print(f"Call {i+1}: [{idx:08d}] {hex(instruction.address)} {instruction.mnemonic}")

Analyse et suivi de la mémoire

# Memory analysis with REVEN
def analyze_memory_access(trace, address_range):
    """Analyze memory accesses in address range"""

    memory_accesses = []

    for i, instruction in enumerate(trace):
        # Get memory accesses for this instruction
        for access in instruction.memory_accesses():
            if address_range[0] <= access.address <= address_range[1]:
                memory_accesses.append({
                    'instruction_index': i,
                    'address': access.address,
                    'size': access.size,
                    'type': access.type,  # read/write
                    'value': access.value
                })

    return memory_accesses

# Analyze heap memory accesses
heap_start = 0x00400000
heap_end = 0x00500000
heap_accesses = analyze_memory_access(trace, (heap_start, heap_end))

print(f"Heap memory accesses: {len(heap_accesses)}")

# Track specific memory location
def track_memory_location(trace, target_address):
    """Track all accesses to specific memory location"""

    accesses = []

    for i, instruction in enumerate(trace):
        for access in instruction.memory_accesses():
            if access.address == target_address:
                accesses.append({
                    'instruction_index': i,
                    'instruction': instruction,
                    'type': access.type,
                    'value': access.value,
                    'timestamp': instruction.timestamp
                })

    return accesses

# Track critical memory location
critical_address = 0x00401234
memory_timeline = track_memory_location(trace, critical_address)

print(f"Memory location {hex(critical_address)} accessed {len(memory_timeline)} times")

# Display memory timeline
for access in memory_timeline:
    print(f"[{access['instruction_index']:08d}] {access['type']}: {access['value']:08x}")

# Memory diff analysis
def memory_diff_analysis(trace, start_idx, end_idx, memory_range):
    """Analyze memory changes between two points"""

    start_instruction = trace[start_idx]
    end_instruction = trace[end_idx]

    # Get memory state at start
    start_memory = {}
    for addr in range(memory_range[0], memory_range[1], 4):
        try:
            start_memory[addr] = start_instruction.memory_read(addr, 4)
        except:
            pass

    # Get memory state at end
    end_memory = {}
    for addr in range(memory_range[0], memory_range[1], 4):
        try:
            end_memory[addr] = end_instruction.memory_read(addr, 4)
        except:
            pass

    # Find differences
    changes = []
    for addr in start_memory:
        if addr in end_memory and start_memory[addr] != end_memory[addr]:
            changes.append({
                'address': addr,
                'old_value': start_memory[addr],
                'new_value': end_memory[addr]
            })

    return changes

# Analyze memory changes during function execution
function_start = 1000
function_end = 2000
stack_range = (0x7fff0000, 0x7fff1000)

memory_changes = memory_diff_analysis(trace, function_start, function_end, stack_range)
print(f"Memory changes during function: {len(memory_changes)}")

for change in memory_changes[:10]:  # Show first 10 changes
    print(f"{hex(change['address'])}: {change['old_value']:08x} -> {change['new_value']:08x}")

Fonction Analyse des appels

# Function call analysis and tracing
def analyze_function_calls(trace, start_idx=0, end_idx=None):
    """Analyze function calls in execution trace"""

    if end_idx is None:
        end_idx = len(trace)

    call_stack = []
    function_calls = []

    for i in range(start_idx, min(end_idx, len(trace))):
        instruction = trace[i]

        if instruction.mnemonic.startswith("call"):
            # Function call
            call_info = {
                'instruction_index': i,
                'caller_address': instruction.address,
                'target_address': instruction.operands[0] if instruction.operands else None,
                'stack_depth': len(call_stack),
                'timestamp': instruction.timestamp
            }

            call_stack.append(call_info)
            function_calls.append(call_info)

        elif instruction.mnemonic.startswith("ret"):
            # Function return
            if call_stack:
                call_info = call_stack.pop()
                call_info['return_index'] = i
                call_info['duration'] = i - call_info['instruction_index']

    return function_calls, call_stack

# Analyze function calls
function_calls, remaining_stack = analyze_function_calls(trace, 0, 10000)

print(f"Function calls analyzed: {len(function_calls)}")
print(f"Unclosed calls: {len(remaining_stack)}")

# Display function call hierarchy
def display_call_hierarchy(function_calls, max_depth=5):
    """Display function call hierarchy"""

    for call in function_calls:
        if call['stack_depth'] <= max_depth:
            indent = "  " * call['stack_depth']
            target = hex(call['target_address']) if call['target_address'] else "unknown"
            duration = call.get('duration', 'ongoing')

            print(f"{indent}[{call['instruction_index']:08d}] Call to {target} (duration: {duration})")

display_call_hierarchy(function_calls[:20])

# API call tracking
def track_api_calls(trace, api_addresses):
    """Track specific API calls"""

    api_calls = []

    for i, instruction in enumerate(trace):
        if instruction.mnemonic.startswith("call"):
            target = instruction.operands[0] if instruction.operands else None

            if target in api_addresses:
                api_name = api_addresses[target]

                # Get function arguments (simplified)
                args = []
                try:
                    # Assume x86-64 calling convention
                    registers = instruction.registers
                    args = [
                        registers.get('rcx', 0),
                        registers.get('rdx', 0),
                        registers.get('r8', 0),
                        registers.get('r9', 0)
                    ]
                except:
                    pass

                api_calls.append({
                    'instruction_index': i,
                    'api_name': api_name,
                    'arguments': args,
                    'caller_address': instruction.address
                })

    return api_calls

# Define API addresses (example)
api_addresses = {
    0x77701234: "CreateFileA",
    0x77701456: "ReadFile",
    0x77701678: "WriteFile",
    0x77701890: "CloseHandle"
}

api_calls = track_api_calls(trace, api_addresses)

print(f"API calls tracked: {len(api_calls)}")

for call in api_calls[:10]:
    print(f"[{call['instruction_index']:08d}] {call['api_name']} called from {hex(call['caller_address'])}")
    print(f"  Arguments: {[hex(arg) for arg in call['arguments']]}")

Techniques d'analyse avancées

Analyse du flux de données

# Data flow analysis with REVEN
def trace_data_flow(trace, source_address, target_address):
    """Trace data flow from source to target"""

    data_flow = []
    tracked_values = set()

    for i, instruction in enumerate(trace):
        # Check if instruction reads from source
        for access in instruction.memory_accesses():
            if access.address == source_address and access.type == 'read':
                tracked_values.add(access.value)
                data_flow.append({
                    'instruction_index': i,
                    'type': 'source_read',
                    'address': access.address,
                    'value': access.value
                })

        # Check if instruction writes tracked value to target
        for access in instruction.memory_accesses():
            if (access.address == target_address and 
                access.type == 'write' and 
                access.value in tracked_values):

                data_flow.append({
                    'instruction_index': i,
                    'type': 'target_write',
                    'address': access.address,
                    'value': access.value
                })

    return data_flow

# Trace data flow from input buffer to output
input_buffer = 0x00401000
output_buffer = 0x00402000

data_flow = trace_data_flow(trace, input_buffer, output_buffer)

print(f"Data flow events: {len(data_flow)}")

for event in data_flow:
    event_type = event['type']
    addr = hex(event['address'])
    value = hex(event['value'])
    print(f"[{event['instruction_index']:08d}] {event_type}: {addr} = {value}")

# Taint analysis
def taint_analysis(trace, taint_sources, max_instructions=10000):
    """Perform taint analysis on execution trace"""

    tainted_memory = set()
    tainted_registers = set()
    taint_events = []

    for i, instruction in enumerate(trace[:max_instructions]):
        # Check for taint sources
        for access in instruction.memory_accesses():
            if access.address in taint_sources:
                tainted_memory.add(access.address)
                taint_events.append({
                    'instruction_index': i,
                    'type': 'taint_source',
                    'address': access.address,
                    'value': access.value
                })

        # Propagate taint through memory operations
        for access in instruction.memory_accesses():
            if access.type == 'read' and access.address in tainted_memory:
                # Taint spreads to destination
                if instruction.destination:
                    if instruction.destination.type == 'memory':
                        tainted_memory.add(instruction.destination.address)
                    elif instruction.destination.type == 'register':
                        tainted_registers.add(instruction.destination.name)

                taint_events.append({
                    'instruction_index': i,
                    'type': 'taint_propagation',
                    'source': access.address,
                    'destination': instruction.destination
                })

        # Check for tainted data usage in critical operations
        if instruction.mnemonic in ['call', 'jmp', 'cmp']:
            for operand in instruction.operands:
                if (operand.type == 'memory' and operand.address in tainted_memory) or \
                   (operand.type == 'register' and operand.name in tainted_registers):

                    taint_events.append({
                        'instruction_index': i,
                        'type': 'tainted_control_flow',
                        'instruction': instruction,
                        'tainted_operand': operand
                    })

    return taint_events, tainted_memory, tainted_registers

# Perform taint analysis
taint_sources = {0x00401000, 0x00401004, 0x00401008}  # Input buffer addresses
taint_events, tainted_mem, tainted_regs = taint_analysis(trace, taint_sources)

print(f"Taint events: {len(taint_events)}")
print(f"Tainted memory locations: {len(tainted_mem)}")
print(f"Tainted registers: {len(tainted_regs)}")

# Display critical taint events
critical_events = [e for e in taint_events if e['type'] == 'tainted_control_flow']
print(f"Critical taint events (control flow): {len(critical_events)}")

for event in critical_events[:5]:
    idx = event['instruction_index']
    instr = event['instruction']
    print(f"[{idx:08d}] Tainted control flow: {hex(instr.address)} {instr.mnemonic}")

Analyse de vulnérabilité

# Vulnerability detection and analysis
def detect_buffer_overflows(trace, buffer_ranges):
    """Detect potential buffer overflow vulnerabilities"""

    potential_overflows = []

    for i, instruction in enumerate(trace):
        for access in instruction.memory_accesses():
            if access.type == 'write':
                # Check if write is outside buffer bounds
                for buffer_start, buffer_end in buffer_ranges:
                    if buffer_start <= access.address < buffer_end:
                        # Write within buffer - check for boundary violations
                        if access.address + access.size > buffer_end:
                            potential_overflows.append({
                                'instruction_index': i,
                                'instruction': instruction,
                                'buffer_start': buffer_start,
                                'buffer_end': buffer_end,
                                'write_address': access.address,
                                'write_size': access.size,
                                'overflow_bytes': (access.address + access.size) - buffer_end
                            })

    return potential_overflows

# Define buffer ranges to monitor
buffer_ranges = [
    (0x00401000, 0x00401100),  # 256-byte buffer
    (0x7fff0000, 0x7fff1000),  # Stack buffer
]

overflows = detect_buffer_overflows(trace, buffer_ranges)

print(f"Potential buffer overflows detected: {len(overflows)}")

for overflow in overflows:
    idx = overflow['instruction_index']
    addr = hex(overflow['write_address'])
    size = overflow['write_size']
    overflow_bytes = overflow['overflow_bytes']

    print(f"[{idx:08d}] Buffer overflow: write to {addr} (size {size}, overflow {overflow_bytes} bytes)")

# Use-after-free detection
def detect_use_after_free(trace, heap_operations):
    """Detect use-after-free vulnerabilities"""

    freed_memory = set()
    uaf_violations = []

    for i, instruction in enumerate(trace):
        # Track heap operations
        if instruction.address in heap_operations:
            operation = heap_operations[instruction.address]

            if operation['type'] == 'free':
                # Get freed address from function argument
                freed_address = operation['address']
                freed_memory.add(freed_address)

            elif operation['type'] == 'malloc':
                # Remove from freed set if reallocated
                allocated_address = operation['address']
                freed_memory.discard(allocated_address)

        # Check for accesses to freed memory
        for access in instruction.memory_accesses():
            if access.address in freed_memory:
                uaf_violations.append({
                    'instruction_index': i,
                    'instruction': instruction,
                    'freed_address': access.address,
                    'access_type': access.type,
                    'access_size': access.size
                })

    return uaf_violations

# Define heap operations (simplified)
heap_operations = {
    0x77701234: {'type': 'malloc', 'address': 0x00500000},
    0x77701456: {'type': 'free', 'address': 0x00500000},
}

uaf_violations = detect_use_after_free(trace, heap_operations)

print(f"Use-after-free violations: {len(uaf_violations)}")

for violation in uaf_violations:
    idx = violation['instruction_index']
    addr = hex(violation['freed_address'])
    access_type = violation['access_type']

    print(f"[{idx:08d}] Use-after-free: {access_type} access to freed memory {addr}")

# Format string vulnerability detection
def detect_format_string_vulns(trace, printf_functions):
    """Detect format string vulnerabilities"""

    format_vulns = []

    for i, instruction in enumerate(trace):
        if instruction.mnemonic.startswith("call"):
            target = instruction.operands[0] if instruction.operands else None

            if target in printf_functions:
                # Analyze format string argument
                try:
                    # Get format string (first argument)
                    format_string_addr = instruction.registers.get('rcx', 0)  # x86-64
                    format_string = instruction.memory_read_string(format_string_addr)

                    # Check for user-controlled format string
                    if '%n' in format_string or format_string.count('%') > 10:
                        format_vulns.append({
                            'instruction_index': i,
                            'function': printf_functions[target],
                            'format_string': format_string,
                            'format_string_address': format_string_addr
                        })

                except:
                    pass

    return format_vulns

# Define printf-family functions
printf_functions = {
    0x77701234: "printf",
    0x77701456: "sprintf",
    0x77701678: "fprintf",
}

format_vulns = detect_format_string_vulns(trace, printf_functions)

print(f"Format string vulnerabilities: {len(format_vulns)}")

for vuln in format_vulns:
    idx = vuln['instruction_index']
    func = vuln['function']
    fmt_str = vuln['format_string'][:50]  # Truncate for display

    print(f"[{idx:08d}] Format string vuln in {func}: '{fmt_str}'")

Analyse cryptographique

# Cryptographic algorithm detection and analysis
def detect_crypto_algorithms(trace, crypto_patterns):
    """Detect cryptographic algorithms in execution"""

    crypto_detections = []

    for i, instruction in enumerate(trace):
        # Check for crypto-specific instruction patterns
        if instruction.mnemonic in ['aes', 'sha', 'xor']:
            crypto_detections.append({
                'instruction_index': i,
                'type': 'crypto_instruction',
                'algorithm': instruction.mnemonic,
                'address': instruction.address
            })

        # Check for crypto constants
        for access in instruction.memory_accesses():
            if access.type == 'read':
                value = access.value

                # Check against known crypto constants
                for algorithm, constants in crypto_patterns.items():
                    if value in constants:
                        crypto_detections.append({
                            'instruction_index': i,
                            'type': 'crypto_constant',
                            'algorithm': algorithm,
                            'constant': value,
                            'address': access.address
                        })

    return crypto_detections

# Define crypto patterns and constants
crypto_patterns = {
    'AES': [0x63636363, 0x7c7c7c7c, 0x77777777, 0x7b7b7b7b],  # AES S-box constants
    'SHA1': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],  # SHA1 initial values
    'SHA256': [0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a],  # SHA256 initial values
    'MD5': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],  # MD5 initial values
}

crypto_detections = detect_crypto_algorithms(trace, crypto_patterns)

print(f"Cryptographic algorithm detections: {len(crypto_detections)}")

# Group by algorithm
crypto_by_algorithm = {}
for detection in crypto_detections:
    algorithm = detection['algorithm']
    if algorithm not in crypto_by_algorithm:
        crypto_by_algorithm[algorithm] = []
    crypto_by_algorithm[algorithm].append(detection)

for algorithm, detections in crypto_by_algorithm.items():
    print(f"{algorithm}: {len(detections)} detections")

# Analyze crypto key usage
def analyze_crypto_keys(trace, key_addresses):
    """Analyze cryptographic key usage"""

    key_usage = []

    for i, instruction in enumerate(trace):
        for access in instruction.memory_accesses():
            if access.address in key_addresses:
                key_usage.append({
                    'instruction_index': i,
                    'key_address': access.address,
                    'access_type': access.type,
                    'instruction': instruction,
                    'context': 'crypto_operation'
                })

    return key_usage

# Define crypto key addresses
key_addresses = {0x00401200, 0x00401210, 0x00401220}

key_usage = analyze_crypto_keys(trace, key_addresses)

print(f"Cryptographic key usage events: {len(key_usage)}")

for usage in key_usage[:10]:
    idx = usage['instruction_index']
    addr = hex(usage['key_address'])
    access_type = usage['access_type']

    print(f"[{idx:08d}] Key {access_type}: {addr}")

Scénario et automatisation

API Python REVEN

# Advanced REVEN Python scripting
import reven2
import json
import time
from datetime import datetime

class REVENAnalyzer:
    def __init__(self, server_host="localhost", server_port=13370):
        self.server = reven2.RevenServer(server_host, server_port)
        self.recording = None
        self.trace = None
        self.results = {}

    def load_recording(self, recording_path):
        """Load recording for analysis"""

        try:
            self.recording = self.server.open_recording(recording_path)
            self.trace = self.recording.trace
            print(f"Loaded recording: {recording_path}")
            print(f"Total instructions: {len(self.trace)}")
            return True

        except Exception as e:
            print(f"Error loading recording: {e}")
            return False

    def analyze_execution_flow(self, start_idx=0, end_idx=None):
        """Analyze execution flow patterns"""

        if not self.trace:
            print("No trace loaded")
            return None

        if end_idx is None:
            end_idx = min(len(self.trace), start_idx + 10000)  # Limit for performance

        flow_analysis = {
            'total_instructions': end_idx - start_idx,
            'function_calls': 0,
            'function_returns': 0,
            'jumps': 0,
            'conditional_jumps': 0,
            'loops_detected': 0,
            'call_stack_depth': 0,
            'unique_addresses': set()
        }

        call_stack = []

        for i in range(start_idx, end_idx):
            instruction = self.trace[i]
            flow_analysis['unique_addresses'].add(instruction.address)

            mnemonic = instruction.mnemonic.lower()

            if mnemonic.startswith('call'):
                flow_analysis['function_calls'] += 1
                call_stack.append(instruction.address)
                flow_analysis['call_stack_depth'] = max(
                    flow_analysis['call_stack_depth'], 
                    len(call_stack)
                )

            elif mnemonic.startswith('ret'):
                flow_analysis['function_returns'] += 1
                if call_stack:
                    call_stack.pop()

            elif mnemonic.startswith('j'):
                flow_analysis['jumps'] += 1
                if mnemonic in ['je', 'jne', 'jz', 'jnz', 'jl', 'jg', 'jle', 'jge']:
                    flow_analysis['conditional_jumps'] += 1

        flow_analysis['unique_addresses'] = len(flow_analysis['unique_addresses'])
        flow_analysis['final_call_stack_depth'] = len(call_stack)

        return flow_analysis

    def detect_anomalies(self, start_idx=0, end_idx=None):
        """Detect execution anomalies"""

        if not self.trace:
            return None

        if end_idx is None:
            end_idx = min(len(self.trace), start_idx + 10000)

        anomalies = {
            'suspicious_instructions': [],
            'unusual_memory_patterns': [],
            'potential_exploits': [],
            'anti_analysis_techniques': []
        }

        for i in range(start_idx, end_idx):
            instruction = self.trace[i]

            # Detect suspicious instructions
            if self.is_suspicious_instruction(instruction):
                anomalies['suspicious_instructions'].append({
                    'index': i,
                    'address': instruction.address,
                    'mnemonic': instruction.mnemonic,
                    'reason': self.get_suspicion_reason(instruction)
                })

            # Detect unusual memory patterns
            memory_anomaly = self.check_memory_anomaly(instruction)
            if memory_anomaly:
                anomalies['unusual_memory_patterns'].append({
                    'index': i,
                    'address': instruction.address,
                    'anomaly': memory_anomaly
                })

            # Detect potential exploits
            exploit_indicator = self.check_exploit_indicators(instruction)
            if exploit_indicator:
                anomalies['potential_exploits'].append({
                    'index': i,
                    'address': instruction.address,
                    'indicator': exploit_indicator
                })

        return anomalies

    def is_suspicious_instruction(self, instruction):
        """Check if instruction is suspicious"""

        suspicious_patterns = [
            'int 0x80',  # System call
            'sysenter',  # System call
            'syscall',   # System call
            'rdtsc',     # Timestamp counter (anti-debugging)
            'cpuid',     # CPU identification (anti-VM)
        ]

        return any(pattern in instruction.mnemonic.lower() for pattern in suspicious_patterns)

    def get_suspicion_reason(self, instruction):
        """Get reason for instruction suspicion"""

        mnemonic = instruction.mnemonic.lower()

        if 'int' in mnemonic or 'syscall' in mnemonic or 'sysenter' in mnemonic:
            return "System call detected"
        elif 'rdtsc' in mnemonic:
            return "Timestamp counter access (anti-debugging)"
        elif 'cpuid' in mnemonic:
            return "CPU identification (anti-VM)"
        else:
            return "Unknown suspicious pattern"

    def check_memory_anomaly(self, instruction):
        """Check for memory access anomalies"""

        for access in instruction.memory_accesses():
            # Check for executable memory writes
            if access.type == 'write' and self.is_executable_memory(access.address):
                return "Write to executable memory (code injection)"

            # Check for large memory operations
            if access.size > 1024:
                return f"Large memory operation ({access.size} bytes)"

            # Check for null pointer dereference
            if access.address < 0x1000:
                return "Potential null pointer dereference"

        return None

    def is_executable_memory(self, address):
        """Check if memory address is in executable region"""

        # Simplified check - in practice would use memory map
        executable_ranges = [
            (0x00400000, 0x00500000),  # Typical executable range
            (0x77700000, 0x77800000),  # DLL range
        ]

        return any(start <= address < end for start, end in executable_ranges)

    def check_exploit_indicators(self, instruction):
        """Check for exploit indicators"""

        # Check for ROP gadgets
        if instruction.mnemonic.startswith('ret') and len(instruction.operands) > 0:
            return "Potential ROP gadget"

        # Check for stack pivot
        if 'esp' in instruction.mnemonic or 'rsp' in instruction.mnemonic:
            if instruction.mnemonic.startswith('mov') or instruction.mnemonic.startswith('xchg'):
                return "Potential stack pivot"

        # Check for shellcode patterns
        if instruction.mnemonic in ['nop', 'inc', 'dec'] and self.is_shellcode_context(instruction):
            return "Potential shellcode pattern"

        return None

    def is_shellcode_context(self, instruction):
        """Check if instruction is in shellcode context"""

        # Simplified heuristic - check for executable heap/stack
        return 0x00400000 <= instruction.address <= 0x00500000

    def generate_timeline(self, events, output_file="timeline.json"):
        """Generate execution timeline"""

        timeline = {
            'metadata': {
                'generated_at': datetime.now().isoformat(),
                'total_events': len(events),
                'recording_path': str(self.recording.path) if self.recording else None
            },
            'events': []
        }

        for event in events:
            timeline_event = {
                'timestamp': event.get('timestamp', 0),
                'instruction_index': event.get('instruction_index', 0),
                'address': hex(event.get('address', 0)),
                'type': event.get('type', 'unknown'),
                'description': event.get('description', ''),
                'severity': event.get('severity', 'info')
            }

            timeline['events'].append(timeline_event)

        # Sort by instruction index
        timeline['events'].sort(key=lambda x: x['instruction_index'])

        with open(output_file, 'w') as f:
            json.dump(timeline, f, indent=2)

        print(f"Timeline saved to {output_file}")
        return timeline

    def export_analysis_results(self, output_file="reven_analysis.json"):
        """Export analysis results"""

        export_data = {
            'analysis_metadata': {
                'timestamp': datetime.now().isoformat(),
                'recording_path': str(self.recording.path) if self.recording else None,
                'trace_length': len(self.trace) if self.trace else 0
            },
            'results': self.results
        }

        with open(output_file, 'w') as f:
            json.dump(export_data, f, indent=2)

        print(f"Analysis results exported to {output_file}")

    def run_comprehensive_analysis(self, start_idx=0, end_idx=None):
        """Run comprehensive analysis"""

        print("Starting comprehensive REVEN analysis...")
        start_time = time.time()

        # Execution flow analysis
        print("Analyzing execution flow...")
        flow_results = self.analyze_execution_flow(start_idx, end_idx)
        self.results['execution_flow'] = flow_results

        # Anomaly detection
        print("Detecting anomalies...")
        anomaly_results = self.detect_anomalies(start_idx, end_idx)
        self.results['anomalies'] = anomaly_results

        # Generate timeline
        print("Generating timeline...")
        timeline_events = []

        if anomaly_results:
            for category, items in anomaly_results.items():
                for item in items:
                    timeline_events.append({
                        'instruction_index': item['index'],
                        'address': item['address'],
                        'type': category,
                        'description': item.get('reason', item.get('anomaly', item.get('indicator', ''))),
                        'severity': 'warning'
                    })

        timeline = self.generate_timeline(timeline_events)
        self.results['timeline'] = timeline

        # Export results
        self.export_analysis_results()

        elapsed_time = time.time() - start_time
        print(f"Comprehensive analysis completed in {elapsed_time:.2f} seconds")

        return self.results

# Usage example
if __name__ == "__main__":
    analyzer = REVENAnalyzer()

    # Load recording
    recording_path = "/data/reven/recordings/malware-001"
    if analyzer.load_recording(recording_path):
        # Run comprehensive analysis
        results = analyzer.run_comprehensive_analysis(0, 5000)

        # Display summary
        print("\n=== Analysis Summary ===")
        if 'execution_flow' in results:
            flow = results['execution_flow']
            print(f"Instructions analyzed: {flow['total_instructions']}")
            print(f"Function calls: {flow['function_calls']}")
            print(f"Unique addresses: {flow['unique_addresses']}")

        if 'anomalies' in results:
            anomalies = results['anomalies']
            total_anomalies = sum(len(items) for items in anomalies.values())
            print(f"Total anomalies detected: {total_anomalies}")

            for category, items in anomalies.items():
                if items:
                    print(f"  {category}: {len(items)}")

Traitement par lots et automatisation

# Batch processing scripts for REVEN
#!/bin/bash

# Batch analysis script
REVEN_BATCH_ANALYSIS_SCRIPT="batch_analysis.sh"

cat > $REVEN_BATCH_ANALYSIS_SCRIPT << 'EOF'
#!/bin/bash

# REVEN Batch Analysis Script
set -e

RECORDINGS_DIR="/data/reven/recordings"
RESULTS_DIR="/data/reven/analysis-results"
PYTHON_SCRIPT="/opt/reven/scripts/batch_analyzer.py"

# Create results directory
mkdir -p "$RESULTS_DIR"

# Function to analyze single recording
analyze_recording() {
    local recording_path="$1"
    local recording_name=$(basename "$recording_path")
    local result_dir="$RESULTS_DIR/$recording_name"

    echo "Analyzing recording: $recording_name"

    # Create result directory
    mkdir -p "$result_dir"

    # Run Python analysis
    python3 "$PYTHON_SCRIPT" \
        --recording "$recording_path" \
        --output "$result_dir" \
        --format json,html \
        --timeout 3600

    # Generate summary report
    reven report generate \
        --recording "$recording_path" \
        --output "$result_dir/summary.pdf" \
        --template security-analysis

    echo "Analysis completed: $recording_name"
}

# Process all recordings
for recording in "$RECORDINGS_DIR"/*; do
    if [ -d "$recording" ]; then
        analyze_recording "$recording"
    fi
done

# Generate consolidated report
python3 /opt/reven/scripts/consolidate_results.py \
    --input "$RESULTS_DIR" \
    --output "$RESULTS_DIR/consolidated_report.html"

echo "Batch analysis completed"
EOF

chmod +x $REVEN_BATCH_ANALYSIS_SCRIPT

# Automated recording script
REVEN_AUTO_RECORD_SCRIPT="auto_record.sh"

cat > $REVEN_AUTO_RECORD_SCRIPT << 'EOF'
#!/bin/bash

# REVEN Automated Recording Script
set -e

VM_NAME="analysis-vm"
SNAPSHOT_NAME="clean-state"
SAMPLES_DIR="/data/malware-samples"
RECORDINGS_DIR="/data/reven/recordings"

# Function to record malware sample
record_sample() {
    local sample_path="$1"
    local sample_name=$(basename "$sample_path" .exe)
    local recording_name="recording-$sample_name-$(date +%Y%m%d-%H%M%S)"
    local recording_path="$RECORDINGS_DIR/$recording_name"

    echo "Recording sample: $sample_name"

    # Reset VM to clean state
    reven vm reset "$VM_NAME" --snapshot "$SNAPSHOT_NAME"

    # Start recording
    reven record start \
        --vm "$VM_NAME" \
        --snapshot "$SNAPSHOT_NAME" \
        --name "$recording_name" \
        --duration 300 \
        --max-size 5G \
        --output-dir "$recording_path"

    # Wait for VM to boot
    sleep 30

    # Copy sample to VM
    reven vm copy-to "$VM_NAME" "$sample_path" "C:\\malware.exe"

    # Execute sample
    reven vm execute "$VM_NAME" "C:\\malware.exe"

    # Wait for execution
    sleep 120

    # Stop recording
    reven record stop "$recording_name"

    echo "Recording completed: $recording_name"

    # Trigger analysis
    if [ -f "$REVEN_BATCH_ANALYSIS_SCRIPT" ]; then
        analyze_recording "$recording_path"
    fi
}

# Process all samples
for sample in "$SAMPLES_DIR"/*.exe; do
    if [ -f "$sample" ]; then
        record_sample "$sample"
    fi
done

echo "Automated recording completed"
EOF

chmod +x $REVEN_AUTO_RECORD_SCRIPT

# Monitoring and alerting script
REVEN_MONITOR_SCRIPT="monitor.sh"

cat > $REVEN_MONITOR_SCRIPT << 'EOF'
#!/bin/bash

# REVEN Monitoring Script
set -e

LOG_FILE="/var/log/reven-monitor.log"
ALERT_EMAIL="security@company.com"
THRESHOLD_CPU=80
THRESHOLD_MEMORY=90
THRESHOLD_DISK=85

# Function to log with timestamp
log_message() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}

# Function to send alert
send_alert() {
    local subject="$1"
    local message="$2"

    echo "$message" | mail -s "$subject" "$ALERT_EMAIL"
    log_message "ALERT: $subject"
}

# Check REVEN server status
check_reven_status() {
    if ! systemctl is-active --quiet reven-server; then
        send_alert "REVEN Server Down" "REVEN server is not running"
        return 1
    fi

    return 0
}

# Check system resources
check_resources() {
    # Check CPU usage
    cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
    if (( $(echo "$cpu_usage > $THRESHOLD_CPU" | bc -l) )); then
        send_alert "High CPU Usage" "CPU usage is ${cpu_usage}%"
    fi

    # Check memory usage
    memory_usage=$(free | grep Mem | awk '{printf("%.1f", $3/$2 * 100.0)}')
    if (( $(echo "$memory_usage > $THRESHOLD_MEMORY" | bc -l) )); then
        send_alert "High Memory Usage" "Memory usage is ${memory_usage}%"
    fi

    # Check disk usage
    disk_usage=$(df /data | tail -1 | awk '{print $5}' | cut -d'%' -f1)
    if [ "$disk_usage" -gt "$THRESHOLD_DISK" ]; then
        send_alert "High Disk Usage" "Disk usage is ${disk_usage}%"
    fi
}

# Check recording status
check_recordings() {
    active_recordings=$(reven record list --status active | wc -l)

    if [ "$active_recordings" -gt 5 ]; then
        send_alert "Too Many Active Recordings" "Currently $active_recordings active recordings"
    fi

    # Check for failed recordings
    failed_recordings=$(reven record list --status failed | wc -l)
    if [ "$failed_recordings" -gt 0 ]; then
        send_alert "Failed Recordings Detected" "$failed_recordings recordings have failed"
    fi
}

# Main monitoring loop
log_message "Starting REVEN monitoring"

while true; do
    check_reven_status
    check_resources
    check_recordings

    sleep 300  # Check every 5 minutes
done
EOF

chmod +x $REVEN_MONITOR_SCRIPT

# Create systemd service for monitoring
sudo tee /etc/systemd/system/reven-monitor.service > /dev/null << 'EOF'
[Unit]
Description=REVEN Monitoring Service
After=network.target

[Service]
Type=simple
User=reven
ExecStart=/opt/reven/scripts/monitor.sh
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

# Enable and start monitoring service
sudo systemctl daemon-reload
sudo systemctl enable reven-monitor
sudo systemctl start reven-monitor

echo "REVEN automation scripts created and monitoring service started"

Pratiques exemplaires et conseils

Optimisation des performances

# REVEN performance optimization
# System tuning for optimal REVEN performance

# Kernel parameters for large memory systems
echo 'vm.swappiness=1' | sudo tee -a /etc/sysctl.conf
echo 'vm.dirty_ratio=5' | sudo tee -a /etc/sysctl.conf
echo 'vm.dirty_background_ratio=2' | sudo tee -a /etc/sysctl.conf
echo 'kernel.shmmax=68719476736' | sudo tee -a /etc/sysctl.conf  # 64GB
echo 'kernel.shmall=16777216' | sudo tee -a /etc/sysctl.conf

# Apply kernel parameters
sudo sysctl -p

# CPU governor for performance
echo 'performance' | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor

# Disable CPU frequency scaling
sudo systemctl disable ondemand

# Storage optimization
# Use deadline scheduler for SSDs
echo 'deadline' | sudo tee /sys/block/sda/queue/scheduler

# Increase I/O queue depth
echo '32' | sudo tee /sys/block/sda/queue/nr_requests

# REVEN-specific optimizations
# Increase file descriptor limits
echo 'reven soft nofile 65536' | sudo tee -a /etc/security/limits.conf
echo 'reven hard nofile 65536' | sudo tee -a /etc/security/limits.conf

# Optimize REVEN configuration
cat > /opt/reven/etc/performance.conf << 'EOF'
# REVEN Performance Configuration

[recording]
# Use compression for storage efficiency
compression_level = 6
compression_algorithm = lz4

# Memory management
memory_pool_size = 16G
max_memory_usage = 32G

# I/O optimization
async_io = true
io_threads = 8
buffer_size = 64M

[analysis]
# Parallel processing
max_worker_threads = 16
enable_parallel_analysis = true

# Caching
cache_size = 8G
enable_instruction_cache = true
enable_memory_cache = true

[storage]
# Storage paths
recordings_path = /data/reven/recordings
cache_path = /data/reven/cache
temp_path = /tmp/reven

# Cleanup policies
auto_cleanup = true
max_recording_age = 30d
max_cache_size = 100G
EOF

Sécurité et pratiques exemplaires

# REVEN security hardening
# Secure REVEN deployment practices

# Create dedicated REVEN user
sudo useradd -r -s /bin/bash -d /opt/reven -m reven

# Set proper permissions
sudo chown -R reven:reven /opt/reven
sudo chown -R reven:reven /data/reven
sudo chmod 750 /opt/reven
sudo chmod 750 /data/reven

# Network security
# Configure firewall for REVEN
sudo ufw allow from 192.168.1.0/24 to any port 13370 comment 'REVEN Server'
sudo ufw allow from 192.168.1.0/24 to any port 13371 comment 'REVEN Web UI'

# SSL/TLS configuration
# Generate SSL certificates
sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
    -keyout /opt/reven/etc/ssl/reven.key \
    -out /opt/reven/etc/ssl/reven.crt \
    -subj "/C=US/ST=State/L=City/O=Organization/CN=reven.company.com"

# Configure REVEN for SSL
cat > /opt/reven/etc/ssl.conf << 'EOF'
[ssl]
enabled = true
certificate = /opt/reven/etc/ssl/reven.crt
private_key = /opt/reven/etc/ssl/reven.key
protocols = TLSv1.2,TLSv1.3
ciphers = ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256

[authentication]
enabled = true
method = ldap
ldap_server = ldap://ldap.company.com
ldap_base_dn = dc=company,dc=com
ldap_user_filter = (uid=%s)
EOF

# Backup and recovery
# Create backup script
cat > /opt/reven/scripts/backup.sh << 'EOF'
#!/bin/bash

BACKUP_DIR="/backup/reven"
DATE=$(date +%Y%m%d-%H%M%S)
BACKUP_NAME="reven-backup-$DATE"

# Create backup directory
mkdir -p "$BACKUP_DIR"

# Backup configuration
tar -czf "$BACKUP_DIR/$BACKUP_NAME-config.tar.gz" /opt/reven/etc/

# Backup critical recordings (last 7 days)
find /data/reven/recordings -mtime -7 -type f -name "*.reven" \
    -exec tar -czf "$BACKUP_DIR/$BACKUP_NAME-recordings.tar.gz" {} +

# Backup database
reven db export --output "$BACKUP_DIR/$BACKUP_NAME-database.sql"

# Cleanup old backups (keep 30 days)
find "$BACKUP_DIR" -name "reven-backup-*" -mtime +30 -delete

echo "Backup completed: $BACKUP_NAME"
EOF

chmod +x /opt/reven/scripts/backup.sh

# Schedule daily backups
echo "0 2 * * * /opt/reven/scripts/backup.sh" | sudo crontab -u reven -

# Audit logging
# Configure audit logging for REVEN
cat > /opt/reven/etc/audit.conf << 'EOF'
[audit]
enabled = true
log_file = /var/log/reven/audit.log
log_level = INFO
log_rotation = daily
max_log_size = 100M

[events]
log_user_actions = true
log_recording_operations = true
log_analysis_operations = true
log_system_events = true
log_security_events = true
EOF

# Create log directory
sudo mkdir -p /var/log/reven
sudo chown reven:reven /var/log/reven

# Configure logrotate
sudo tee /etc/logrotate.d/reven > /dev/null << 'EOF'
/var/log/reven/*.log {
    daily
    rotate 30
    compress
    delaycompress
    missingok
    notifempty
    create 644 reven reven
    postrotate
        systemctl reload reven-server
    endscript
}
EOF

Ressources

Documentation et apprentissage

  • [Documentation sur le REVEN] (LINK_16) - Documentation officielle
  • [Guide de l'utilisateur REVEN] (LINK_16) - Guide de l'utilisateur complet
  • API Python REVEN - Documentation de l'API Python
  • Blogue REVEN - Dernières mises à jour et études de cas

Formation et certification

  • [Formation REVEN] (LINK_16) - Cours de formation officiels
  • [Entraînement en génie inverse] (LINK_16) - Techniques de RE avancées
  • [Recherche sur la vulnérabilité] (LINK_16) - Méthodes de recherche sur la vulnérabilité
  • [Analyse des logiciels malveillants] (LINK_16) - Techniques d'analyse des logiciels malveillants

Communauté et soutien

Outils connexes et intégration

  • [Intégration GDB] (LINK_16) - Intégration GDB
  • [IDA Pro Plugin] (LINK_16) - IDA Intégration pro
  • [Intégration Ghidra] (LINK_16) - Intégration Ghidra
  • [Intégration YARA] (LINK_16) - Intégration des règles YARA