Zum Inhalt

Angr Cheat Blatt

generieren

Überblick

angr ist eine leistungsstarke Python-basierte binäre Analyseplattform, die symbolische Ausführung, statische Analyse und dynamische Analysefähigkeiten bietet. Es ist für automatisierte Schwachstelle Entdeckung, Reverse Engineering und Programmanalyse konzipiert. angr kann Binaries über mehrere Architekturen analysieren und bietet eine reiche API für den Aufbau von benutzerdefinierten Analysetools.

RECHT *Key Strengths: Symbolische Ausführung, Multi-Architektur-Unterstützung, automatisierte Sicherheitserkennung, Zwangslösung und umfangreiche Python API für benutzerdefinierte Analyse-Workflows.

Installation und Inbetriebnahme

Einfache Installation

```bash

Install angr via pip (recommended)

pip install angr

Install with additional dependencies

pip install angr[all]

Install development version

pip install git+https://github.com/angr/angr.git

Install in virtual environment (recommended)

python -m venv angr-env source angr-env/bin/activate # Linux/macOS

angr-env\Scripts\activate # Windows

pip install angr

Verify installation

python -c "import angr; print(angr.version)"

Install additional tools

pip install angr-management # GUI for angr pip install angr-doc # Documentation pip install angr-utils # Utility functions ```_

Abhängigkeiten und Anforderungen

```bash

System dependencies (Ubuntu/Debian)

sudo apt-get update sudo apt-get install -y \ python3-dev \ build-essential \ libffi-dev \ libssl-dev \ libtool \ pkg-config \ cmake \ libgmp-dev \ libmpfr-dev \ libmpc-dev

System dependencies (CentOS/RHEL)

sudo yum groupinstall -y "Development Tools" sudo yum install -y \ python3-devel \ libffi-devel \ openssl-devel \ libtool \ pkgconfig \ cmake \ gmp-devel \ mpfr-devel \ libmpc-devel

macOS dependencies (with Homebrew)

brew install \ libffi \ openssl \ libtool \ pkg-config \ cmake \ gmp \ mpfr \ libmpc

Install Z3 solver (recommended)

pip install z3-solver

Install additional solvers

pip install pysmt pysmt-install --z3 --cvc4 --boolector

Verify solver installation

python -c "import z3; print('Z3 version:', z3.get_version_string())" ```_

Konfiguration und Umwelt

```python

angr configuration and environment setup

import angr import logging import os

Configure logging

logging.getLogger('angr').setLevel(logging.INFO) logging.getLogger('cle').setLevel(logging.WARNING) logging.getLogger('pyvex').setLevel(logging.WARNING)

Set environment variables

os.environ['ANGR_CACHE_DIR'] = '/tmp/angr_cache' os.environ['ANGR_LOG_LEVEL'] = 'INFO'

Configure angr options

angr.options.ZERO_FILL_UNCONSTRAINED_MEMORY = True angr.options.ZERO_FILL_UNCONSTRAINED_REGISTERS = True

Memory and performance settings

import angr.options as o

Common option sets

common_options = { o.ABSTRACT_SOLVER, o.ABSTRACT_MEMORY, o.APPROXIMATE_FIRST, o.APPROXIMATE_GUARDS, o.APPROXIMATE_SATISFIABILITY }

Debugging options

debug_options = { o.TRACK_MEMORY_ACTIONS, o.TRACK_REGISTER_ACTIONS, o.TRACK_JMP_ACTIONS, o.TRACK_CONSTRAINT_ACTIONS }

Performance options

performance_options = { o.EFFICIENT_STATE_MERGING, o.LAZY_SOLVES, o.FAST_MEMORY, o.FAST_REGISTERS }

print("angr environment configured successfully") ```_

Grundlegende binäre Analyse

Lade- und Analyse Binaries

```python

Basic binary loading and analysis

import angr import archinfo

def load_binary(binary_path, **kwargs): """Load binary with angr"""

# Basic loading
project = angr.Project(binary_path, auto_load_libs=False)

print(f"Binary: {binary_path}")
print(f"Architecture: {project.arch}")
print(f"Entry point: {hex(project.entry)}")
print(f"Base address: {hex(project.loader.main_object.min_addr)}")
print(f"Binary type: {project.loader.main_object.os}")

return project

Load binary with custom options

def load_binary_advanced(binary_path): """Load binary with advanced options"""

# Custom loading options
project = angr.Project(
    binary_path,
    auto_load_libs=True,          # Load shared libraries
    use_sim_procedures=True,      # Use SimProcedures for library functions
    exclude_sim_procedures_func=['malloc', 'free'],  # Exclude specific functions
    force_load_libs=['libc.so.6'],  # Force load specific libraries
    main_opts={
        'base_addr': 0x400000,    # Custom base address
        'backend': 'elf'          # Force backend type
    }
)

# Print loaded objects
print("Loaded objects:")
for obj in project.loader.all_objects:
    print(f"  {obj.binary}: {hex(obj.min_addr)}-{hex(obj.max_addr)}")

# Print symbols
print(f"\nSymbols: {len(project.loader.main_object.symbols)}")
for name, symbol in list(project.loader.main_object.symbols.items())[:10]:
    print(f"  {name}: {hex(symbol.rebased_addr)}")

return project

Analyze binary structure

def analyze_binary_structure(project): """Analyze binary structure and sections"""

main_object = project.loader.main_object

print("Binary Structure Analysis:")
print(f"  Entry point: {hex(project.entry)}")
print(f"  Architecture: {project.arch.name}")
print(f"  Word size: {project.arch.bits} bits")
print(f"  Endianness: {project.arch.memory_endness}")
print(f"  Address space: {hex(main_object.min_addr)} - {hex(main_object.max_addr)}")

# Analyze sections
if hasattr(main_object, 'sections'):
    print(f"\nSections ({len(main_object.sections)}):")
    for section in main_object.sections:
        print(f"  {section.name}: {hex(section.vaddr)} - {hex(section.vaddr + section.memsize)} "
              f"({section.memsize} bytes, {section.flags})")

# Analyze segments
if hasattr(main_object, 'segments'):
    print(f"\nSegments ({len(main_object.segments)}):")
    for segment in main_object.segments:
        print(f"  {hex(segment.vaddr)} - {hex(segment.vaddr + segment.memsize)} "
              f"({segment.memsize} bytes, flags: {segment.flags})")

# Find functions
cfg = project.analyses.CFGFast()
functions = cfg.functions

print(f"\nFunctions found: {len(functions)}")
for addr, func in list(functions.items())[:10]:
    print(f"  {func.name}: {hex(addr)} ({func.size} bytes)")

return cfg

Example usage

if name == "main": # Load a binary binary_path = "/bin/ls" # Example binary project = load_binary_advanced(binary_path)

# Analyze structure
cfg = analyze_binary_structure(project)

```_

Analyse der Strömungsdiagramme

```python

Control Flow Graph (CFG) analysis with angr

import angr import networkx as nx import matplotlib.pyplot as plt

class CFGAnalyzer: def init(self, project): self.project = project self.cfg = None self.functions = {}

def generate_cfg(self, normalize=True, resolve_indirect_jumps=True):
    """Generate Control Flow Graph"""

    print("Generating CFG...")

    # Generate CFG with options
    self.cfg = self.project.analyses.CFGFast(
        normalize=normalize,
        resolve_indirect_jumps=resolve_indirect_jumps,
        force_complete_scan=False,
        show_progressbar=True
    )

    print(f"CFG generated: {len(self.cfg.nodes())} nodes, {len(self.cfg.edges())} edges")
    print(f"Functions discovered: {len(self.cfg.functions)}")

    return self.cfg

def analyze_function(self, function_addr):
    """Analyze specific function"""

    if self.cfg is None:
        self.generate_cfg()

    if function_addr not in self.cfg.functions:
        print(f"Function at {hex(function_addr)} not found")
        return None

    func = self.cfg.functions[function_addr]

    analysis = {
        'name': func.name,
        'address': hex(func.addr),
        'size': func.size,
        'blocks': len(func.blocks),
        'calling_convention': func.calling_convention,
        'is_plt': func.is_plt,
        'is_simprocedure': func.is_simprocedure,
        'endpoints': [hex(ep.addr) for ep in func.endpoints],
        'callsites': [hex(cs.addr) for cs in func.get_call_sites()],
        'callers': [hex(caller.addr) for caller in func.callers],
        'callees': [hex(callee.addr) for callee in func.callees]
    }

    print(f"Function Analysis: {func.name}")
    for key, value in analysis.items():
        if isinstance(value, list) and len(value) > 5:
            print(f"  {key}: {value[:5]}... ({len(value)} total)")
        else:
            print(f"  {key}: {value}")

    return analysis

def find_interesting_functions(self):
    """Find potentially interesting functions"""

    if self.cfg is None:
        self.generate_cfg()

    interesting = {
        'large_functions': [],
        'complex_functions': [],
        'leaf_functions': [],
        'recursive_functions': [],
        'plt_functions': []
    }

    for addr, func in self.cfg.functions.items():
        # Large functions (>1000 bytes)
        if func.size > 1000:
            interesting['large_functions'].append((func.name, hex(addr), func.size))

        # Complex functions (>20 basic blocks)
        if len(func.blocks) > 20:
            interesting['complex_functions'].append((func.name, hex(addr), len(func.blocks)))

        # Leaf functions (no callees)
        if not func.callees:
            interesting['leaf_functions'].append((func.name, hex(addr)))

        # Recursive functions
        if addr in [callee.addr for callee in func.callees]:
            interesting['recursive_functions'].append((func.name, hex(addr)))

        # PLT functions
        if func.is_plt:
            interesting['plt_functions'].append((func.name, hex(addr)))

    # Print results
    for category, functions in interesting.items():
        print(f"\n{category.replace('_', ' ').title()}: {len(functions)}")
        for func_info in functions[:5]:  # Show first 5
            print(f"  {func_info}")

    return interesting

def analyze_call_graph(self):
    """Analyze call graph relationships"""

    if self.cfg is None:
        self.generate_cfg()

    # Build call graph
    call_graph = nx.DiGraph()

    for addr, func in self.cfg.functions.items():
        call_graph.add_node(addr, name=func.name, size=func.size)

        for callee in func.callees:
            call_graph.add_edge(addr, callee.addr)

    # Analyze call graph properties
    analysis = {
        'total_functions': len(call_graph.nodes()),
        'total_calls': len(call_graph.edges()),
        'strongly_connected_components': len(list(nx.strongly_connected_components(call_graph))),
        'weakly_connected_components': len(list(nx.weakly_connected_components(call_graph))),
        'cycles': len(list(nx.simple_cycles(call_graph))),
        'average_degree': sum(dict(call_graph.degree()).values()) / len(call_graph.nodes()) if call_graph.nodes() else 0
    }

    # Find central functions (high degree)
    degree_centrality = nx.degree_centrality(call_graph)
    central_functions = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)[:10]

    print("Call Graph Analysis:")
    for key, value in analysis.items():
        print(f"  {key}: {value}")

    print("\nMost Central Functions:")
    for addr, centrality in central_functions:
        func_name = self.cfg.functions[addr].name
        print(f"  {func_name} ({hex(addr)}): {centrality:.3f}")

    return call_graph, analysis

def visualize_function_cfg(self, function_addr, output_file="function_cfg.png"):
    """Visualize function CFG"""

    if self.cfg is None:
        self.generate_cfg()

    if function_addr not in self.cfg.functions:
        print(f"Function at {hex(function_addr)} not found")
        return

    func = self.cfg.functions[function_addr]

    # Create graph for function
    G = nx.DiGraph()

    # Add nodes (basic blocks)
    for block in func.blocks:
        G.add_node(block.addr, label=f"{hex(block.addr)}\n{block.size} bytes")

    # Add edges
    for block in func.blocks:
        for successor in block.successors:
            G.add_edge(block.addr, successor.addr)

    # Create visualization
    plt.figure(figsize=(12, 8))
    pos = nx.spring_layout(G, k=1, iterations=50)

    # Draw nodes
    nx.draw_networkx_nodes(G, pos, node_color='lightblue', 
                          node_size=1000, alpha=0.7)

    # Draw edges
    nx.draw_networkx_edges(G, pos, edge_color='gray', 
                          arrows=True, arrowsize=20)

    # Draw labels
    labels = nx.get_node_attributes(G, 'label')
    nx.draw_networkx_labels(G, pos, labels, font_size=8)

    plt.title(f"CFG for function {func.name} ({hex(function_addr)})")
    plt.axis('off')
    plt.tight_layout()
    plt.savefig(output_file, dpi=300, bbox_inches='tight')
    plt.close()

    print(f"Function CFG saved to {output_file}")

Example usage

def analyze_binary_cfg(binary_path): """Complete CFG analysis example"""

# Load binary
project = angr.Project(binary_path, auto_load_libs=False)

# Create CFG analyzer
analyzer = CFGAnalyzer(project)

# Generate and analyze CFG
cfg = analyzer.generate_cfg()

# Find interesting functions
interesting = analyzer.find_interesting_functions()

# Analyze call graph
call_graph, cg_analysis = analyzer.analyze_call_graph()

# Analyze main function if available
main_addr = project.loader.main_object.get_symbol('main')
if main_addr:
    analyzer.analyze_function(main_addr.rebased_addr)
    analyzer.visualize_function_cfg(main_addr.rebased_addr)

return analyzer

Run analysis

if name == "main": binary_path = "/bin/ls" analyzer = analyze_binary_cfg(binary_path) ```_

Symbolische Ausführung

Grundlegende symbolische Ausführung

```python

Symbolic execution with angr

import angr import claripy

class SymbolicExecutor: def init(self, project): self.project = project self.simulation_manager = None self.initial_state = None

def create_initial_state(self, addr=None, **kwargs):
    """Create initial state for symbolic execution"""

    if addr is None:
        addr = self.project.entry

    # Create initial state
    self.initial_state = self.project.factory.entry_state(
        addr=addr,
        add_options={
            angr.options.ABSTRACT_SOLVER,
            angr.options.ABSTRACT_MEMORY,
            angr.options.APPROXIMATE_FIRST
        },
        **kwargs
    )

    print(f"Initial state created at {hex(addr)}")
    return self.initial_state

def setup_symbolic_input(self, input_size=100, input_name="symbolic_input"):
    """Setup symbolic input for analysis"""

    if self.initial_state is None:
        self.create_initial_state()

    # Create symbolic bitvector
    symbolic_input = claripy.BVS(input_name, input_size * 8)

    # Add constraints (printable characters)
    for i in range(input_size):
        byte = symbolic_input.get_byte(i)
        self.initial_state.solver.add(byte >= 0x20)  # Printable ASCII
        self.initial_state.solver.add(byte <= 0x7e)

    print(f"Symbolic input created: {input_name} ({input_size} bytes)")
    return symbolic_input

def setup_symbolic_arguments(self, argc=2, argv_size=100):
    """Setup symbolic command line arguments"""

    if self.initial_state is None:
        self.create_initial_state()

    # Create symbolic argv
    argv = []
    for i in range(argc):
        arg = claripy.BVS(f"argv_{i}", argv_size * 8)

        # Add null terminator constraint
        for j in range(argv_size - 1):
            byte = arg.get_byte(j)
            self.initial_state.solver.add(byte >= 0x20)
            self.initial_state.solver.add(byte <= 0x7e)

        # Null terminate
        self.initial_state.solver.add(arg.get_byte(argv_size - 1) == 0)
        argv.append(arg)

    # Set up argv in memory
    argv_ptrs = []
    base_addr = 0x7fff0000

    for i, arg in enumerate(argv):
        arg_addr = base_addr + i * argv_size
        self.initial_state.memory.store(arg_addr, arg)
        argv_ptrs.append(arg_addr)

    # Set up argv pointer array
    argv_array_addr = base_addr + len(argv) * argv_size
    for i, ptr in enumerate(argv_ptrs):
        self.initial_state.memory.store(
            argv_array_addr + i * self.project.arch.bytes,
            ptr,
            size=self.project.arch.bytes
        )

    # Set argc and argv in registers/stack
    if self.project.arch.name == 'AMD64':
        self.initial_state.regs.rdi = argc
        self.initial_state.regs.rsi = argv_array_addr
    elif self.project.arch.name == 'X86':
        # Push argv and argc onto stack
        self.initial_state.stack_push(argv_array_addr)
        self.initial_state.stack_push(argc)

    print(f"Symbolic arguments setup: argc={argc}, argv at {hex(argv_array_addr)}")
    return argv

def run_symbolic_execution(self, find_addrs=None, avoid_addrs=None, max_steps=1000):
    """Run symbolic execution"""

    if self.initial_state is None:
        self.create_initial_state()

    # Create simulation manager
    self.simulation_manager = self.project.factory.simulation_manager(self.initial_state)

    print("Starting symbolic execution...")

    # Run exploration
    if find_addrs or avoid_addrs:
        self.simulation_manager.explore(
            find=find_addrs,
            avoid=avoid_addrs,
            step_func=self.step_callback,
            num_find=10  # Limit number of solutions
        )
    else:
        # Step-by-step execution
        for step in range(max_steps):
            if not self.simulation_manager.active:
                break

            self.simulation_manager.step()

            if step % 100 == 0:
                print(f"Step {step}: {len(self.simulation_manager.active)} active states")

            # Check for interesting states
            if self.simulation_manager.deadended:
                print(f"Found {len(self.simulation_manager.deadended)} deadended states")

            if self.simulation_manager.errored:
                print(f"Found {len(self.simulation_manager.errored)} errored states")

    print("Symbolic execution completed")
    return self.simulation_manager

def step_callback(self, sm):
    """Callback function for each step"""

    # Limit number of active states to prevent state explosion
    if len(sm.active) > 50:
        sm.active = sm.active[:50]

    return sm

def analyze_found_states(self):
    """Analyze states that reached target"""

    if not self.simulation_manager or not self.simulation_manager.found:
        print("No found states to analyze")
        return []

    solutions = []

    for i, state in enumerate(self.simulation_manager.found):
        print(f"\n--- Found State {i+1} ---")
        print(f"Address: {hex(state.addr)}")

        # Get concrete values for symbolic variables
        solution = {}

        for var_name in state.solver.variables:
            if state.solver.satisfiable():
                concrete_value = state.solver.eval(var_name, cast_to=bytes)
                solution[str(var_name)] = concrete_value
                print(f"  {var_name}: {concrete_value}")

        solutions.append(solution)

    return solutions

def find_path_to_address(self, target_addr, avoid_addrs=None):
    """Find execution path to specific address"""

    if self.initial_state is None:
        self.create_initial_state()

    print(f"Finding path to {hex(target_addr)}")

    # Run exploration
    sm = self.project.factory.simulation_manager(self.initial_state)
    sm.explore(find=target_addr, avoid=avoid_addrs)

    if sm.found:
        found_state = sm.found[0]

        # Get execution trace
        trace = []
        state = found_state

        while state.history.parent is not None:
            trace.append({
                'address': hex(state.addr),
                'instruction': state.block().capstone.insns[0] if state.block().capstone.insns else None,
                'constraints': len(state.solver.constraints)
            })
            state = state.history.parent

        trace.reverse()

        print(f"Path found with {len(trace)} steps:")
        for i, step in enumerate(trace[:10]):  # Show first 10 steps
            print(f"  {i}: {step['address']} - {step.get('instruction', 'N/A')}")

        return trace, found_state
    else:
        print("No path found to target address")
        return None, None

Vulnerability discovery with symbolic execution

class VulnerabilityFinder: def init(self, project): self.project = project self.vulnerabilities = []

def find_buffer_overflows(self, function_addr, buffer_size=100):
    """Find buffer overflow vulnerabilities"""

    print(f"Searching for buffer overflows in function {hex(function_addr)}")

    # Create initial state at function entry
    initial_state = self.project.factory.call_state(function_addr)

    # Create symbolic input buffer
    symbolic_buffer = claripy.BVS("input_buffer", buffer_size * 8)

    # Store symbolic buffer in memory
    buffer_addr = 0x7fff0000
    initial_state.memory.store(buffer_addr, symbolic_buffer)

    # Set up function arguments to point to buffer
    if self.project.arch.name == 'AMD64':
        initial_state.regs.rdi = buffer_addr
        initial_state.regs.rsi = buffer_size

    # Run symbolic execution
    sm = self.project.factory.simulation_manager(initial_state)

    # Look for crashes or overwrites
    crash_addrs = []

    try:
        sm.run(until=lambda sm: len(sm.active) == 0 or len(sm.errored) > 0)

        # Check for errors that might indicate buffer overflow
        for errored_state in sm.errored:
            error = errored_state.error

            if "Segmentation fault" in str(error) or "Invalid memory access" in str(error):
                # Potential buffer overflow
                vulnerability = {
                    'type': 'buffer_overflow',
                    'function': hex(function_addr),
                    'error_addr': hex(errored_state.state.addr),
                    'error': str(error),
                    'input_constraints': errored_state.state.solver.constraints
                }

                self.vulnerabilities.append(vulnerability)
                print(f"Potential buffer overflow found at {hex(errored_state.state.addr)}")

    except Exception as e:
        print(f"Error during buffer overflow analysis: {e}")

    return self.vulnerabilities

def find_format_string_bugs(self, printf_addr):
    """Find format string vulnerabilities"""

    print(f"Searching for format string bugs at {hex(printf_addr)}")

    # Create state at printf call
    initial_state = self.project.factory.call_state(printf_addr)

    # Create symbolic format string
    format_string = claripy.BVS("format_string", 100 * 8)

    # Add constraints for format string characters
    for i in range(100):
        byte = format_string.get_byte(i)
        initial_state.solver.add(claripy.Or(
            claripy.And(byte >= 0x20, byte <= 0x7e),  # Printable
            byte == 0  # Null terminator
        ))

    # Set format string as first argument
    format_addr = 0x7fff1000
    initial_state.memory.store(format_addr, format_string)

    if self.project.arch.name == 'AMD64':
        initial_state.regs.rdi = format_addr

    # Look for format string specifiers that could be dangerous
    dangerous_patterns = [b'%n', b'%s', b'%x']

    for pattern in dangerous_patterns:
        # Check if format string can contain dangerous pattern
        pattern_bv = claripy.BVV(pattern)

        # This is a simplified check - in practice, you'd need more sophisticated analysis
        if initial_state.solver.satisfiable(extra_constraints=[
            format_string.get_bytes(0, len(pattern)) == pattern_bv
        ]):
            vulnerability = {
                'type': 'format_string',
                'function': hex(printf_addr),
                'pattern': pattern.decode(),
                'description': f"Format string can contain {pattern.decode()}"
            }

            self.vulnerabilities.append(vulnerability)
            print(f"Potential format string vulnerability: {pattern.decode()}")

    return self.vulnerabilities

def find_integer_overflows(self, function_addr):
    """Find integer overflow vulnerabilities"""

    print(f"Searching for integer overflows in function {hex(function_addr)}")

    # Create initial state
    initial_state = self.project.factory.call_state(function_addr)

    # Create symbolic integers
    int1 = claripy.BVS("int1", 32)
    int2 = claripy.BVS("int2", 32)

    # Set up function arguments
    if self.project.arch.name == 'AMD64':
        initial_state.regs.rdi = int1.zero_extend(32)
        initial_state.regs.rsi = int2.zero_extend(32)

    # Run symbolic execution
    sm = self.project.factory.simulation_manager(initial_state)

    try:
        sm.run(until=lambda sm: len(sm.active) == 0)

        # Check for integer overflow conditions
        for state in sm.deadended:
            # Look for arithmetic operations that could overflow
            for action in state.history.actions:
                if hasattr(action, 'op') and action.op in ['__add__', '__mul__', '__sub__']:
                    # Check if result could overflow
                    if state.solver.satisfiable(extra_constraints=[
                        action.result > 0xffffffff  # 32-bit overflow
                    ]):
                        vulnerability = {
                            'type': 'integer_overflow',
                            'function': hex(function_addr),
                            'operation': action.op,
                            'address': hex(state.addr)
                        }

                        self.vulnerabilities.append(vulnerability)
                        print(f"Potential integer overflow in {action.op} at {hex(state.addr)}")

    except Exception as e:
        print(f"Error during integer overflow analysis: {e}")

    return self.vulnerabilities

Example usage

def symbolic_execution_example(binary_path): """Complete symbolic execution example"""

# Load binary
project = angr.Project(binary_path, auto_load_libs=False)

# Create symbolic executor
executor = SymbolicExecutor(project)

# Setup symbolic input
executor.create_initial_state()
symbolic_input = executor.setup_symbolic_input(input_size=50)

# Find main function
cfg = project.analyses.CFGFast()
main_func = None

for addr, func in cfg.functions.items():
    if func.name == 'main':
        main_func = func
        break

if main_func:
    print(f"Found main function at {hex(main_func.addr)}")

    # Find interesting addresses in main
    interesting_addrs = []
    for block in main_func.blocks:
        # Look for calls to interesting functions
        for insn in block.capstone.insns:
            if insn.mnemonic == 'call':
                interesting_addrs.append(insn.address)

    if interesting_addrs:
        # Run symbolic execution to reach interesting addresses
        sm = executor.run_symbolic_execution(
            find_addrs=interesting_addrs[:3],  # First 3 interesting addresses
            max_steps=500
        )

        # Analyze results
        solutions = executor.analyze_found_states()

        return executor, solutions

return executor, []

Run example

if name == "main": binary_path = "/bin/ls" executor, solutions = symbolic_execution_example(binary_path) print(f"Found {len(solutions)} solutions") ```_

Advanced Symbolic Execution Techniques

```python

Advanced symbolic execution techniques

import angr import claripy import itertools

class AdvancedSymbolicAnalysis: def init(self, project): self.project = project self.custom_hooks = {} self.analysis_results = {}

def setup_custom_hooks(self):
    """Setup custom hooks for library functions"""

    # Hook malloc to track allocations
    @self.project.hook_symbol('malloc')
    def malloc_hook(state):
        size = state.solver.eval(state.regs.rdi)  # First argument

        # Allocate memory
        addr = state.heap.allocate(size)

        # Track allocation
        if not hasattr(state.globals, 'allocations'):
            state.globals['allocations'] = {}

        state.globals['allocations'][addr] = {
            'size': size,
            'allocated_at': state.addr
        }

        state.regs.rax = addr  # Return address
        print(f"malloc({size}) = {hex(addr)}")

    # Hook free to track deallocations
    @self.project.hook_symbol('free')
    def free_hook(state):
        addr = state.solver.eval(state.regs.rdi)

        if hasattr(state.globals, 'allocations') and addr in state.globals['allocations']:
            del state.globals['allocations'][addr]
            print(f"free({hex(addr)})")

        # Mark memory as freed (for use-after-free detection)
        if not hasattr(state.globals, 'freed_memory'):
            state.globals['freed_memory'] = set()

        state.globals['freed_memory'].add(addr)

    # Hook strcpy to detect buffer overflows
    @self.project.hook_symbol('strcpy')
    def strcpy_hook(state):
        dest = state.solver.eval(state.regs.rdi)
        src = state.solver.eval(state.regs.rsi)

        # Read source string
        src_data = state.memory.load(src, 1000)  # Max 1000 bytes

        # Find null terminator
        null_pos = None
        for i in range(1000):
            byte = src_data.get_byte(i)
            if state.solver.eval(byte) == 0:
                null_pos = i
                break

        if null_pos is not None:
            # Copy string including null terminator
            string_data = src_data.get_bytes(0, null_pos + 1)
            state.memory.store(dest, string_data)

            print(f"strcpy({hex(dest)}, {hex(src)}) - copied {null_pos + 1} bytes")

            # Check for potential buffer overflow
            if hasattr(state.globals, 'allocations') and dest in state.globals['allocations']:
                allocated_size = state.globals['allocations'][dest]['size']
                if null_pos + 1 > allocated_size:
                    print(f"WARNING: Potential buffer overflow! Copied {null_pos + 1} bytes to {allocated_size} byte buffer")

        state.regs.rax = dest  # Return destination

    print("Custom hooks installed")

def concolic_execution(self, initial_inputs, target_function):
    """Perform concolic execution with concrete and symbolic inputs"""

    results = []

    for concrete_input in initial_inputs:
        print(f"Concolic execution with input: {concrete_input}")

        # Create initial state
        state = self.project.factory.entry_state()

        # Set up concrete input
        input_addr = 0x7fff0000
        state.memory.store(input_addr, concrete_input)

        # Create symbolic version of input
        symbolic_input = claripy.BVS("symbolic_input", len(concrete_input) * 8)

        # Add constraint that symbolic input equals concrete input initially
        state.solver.add(symbolic_input == claripy.BVV(concrete_input))

        # Run execution
        sm = self.project.factory.simulation_manager(state)
        sm.explore(find=target_function)

        if sm.found:
            found_state = sm.found[0]

            # Generate new inputs by negating path constraints
            new_inputs = self.generate_new_inputs(found_state, symbolic_input)
            results.extend(new_inputs)

    return results

def generate_new_inputs(self, state, symbolic_var):
    """Generate new inputs by negating path constraints"""

    new_inputs = []
    constraints = state.solver.constraints

    # Try negating each constraint to explore different paths
    for i, constraint in enumerate(constraints):
        # Create new constraint set with negated constraint
        new_constraints = constraints[:i] + [claripy.Not(constraint)] + constraints[i+1:]

        # Check if new constraint set is satisfiable
        if state.solver.satisfiable(extra_constraints=new_constraints):
            # Generate concrete input
            new_input = state.solver.eval(symbolic_var, extra_constraints=new_constraints, cast_to=bytes)
            new_inputs.append(new_input)

    return new_inputs

def taint_analysis(self, taint_sources, sink_functions):
    """Perform taint analysis to track data flow"""

    print("Starting taint analysis...")

    # Create initial state with taint tracking
    initial_state = self.project.factory.entry_state(
        add_options={angr.options.TRACK_MEMORY_ACTIONS}
    )

    # Mark taint sources
    for source_addr in taint_sources:
        # Create tainted symbolic variable
        tainted_data = claripy.BVS(f"tainted_{hex(source_addr)}", 64)
        initial_state.memory.store(source_addr, tainted_data)

        # Track taint
        if not hasattr(initial_state.globals, 'tainted_data'):
            initial_state.globals['tainted_data'] = set()

        initial_state.globals['tainted_data'].add(tainted_data)

    # Run symbolic execution
    sm = self.project.factory.simulation_manager(initial_state)

    taint_flows = []

    def check_taint_at_sinks(state):
        """Check if tainted data reaches sink functions"""

        if state.addr in sink_functions:
            sink_name = sink_functions[state.addr]

            # Check function arguments for taint
            if self.project.arch.name == 'AMD64':
                args = [state.regs.rdi, state.regs.rsi, state.regs.rdx, state.regs.rcx]
            else:
                # Get arguments from stack for x86
                args = []
                for i in range(4):
                    arg = state.memory.load(state.regs.esp + (i + 1) * 4, 4)
                    args.append(arg)

            for i, arg in enumerate(args):
                if hasattr(state.globals, 'tainted_data'):
                    for tainted_var in state.globals['tainted_data']:
                        if state.solver.satisfiable(extra_constraints=[arg == tainted_var]):
                            taint_flow = {
                                'sink_function': sink_name,
                                'sink_address': hex(state.addr),
                                'argument_index': i,
                                'tainted_variable': str(tainted_var)
                            }
                            taint_flows.append(taint_flow)
                            print(f"Taint flow detected: {tainted_var} -> {sink_name} arg {i}")

    # Step through execution and check for taint at each step
    for _ in range(1000):  # Limit steps
        if not sm.active:
            break

        sm.step()

        for state in sm.active:
            check_taint_at_sinks(state)

    return taint_flows

def constraint_solving_optimization(self, state, optimization_target):
    """Optimize constraint solving for specific targets"""

    # Use different solvers for different types of constraints
    if 'crypto' in optimization_target.lower():
        # Use specialized solver for cryptographic constraints
        state.solver._solver = claripy.SolverCacheless(backend=claripy.backends.z3)

    elif 'arithmetic' in optimization_target.lower():
        # Use arithmetic-optimized solver
        state.solver._solver = claripy.SolverCacheless(backend=claripy.backends.vsa)

    # Add solver optimizations
    state.solver.simplify()

    # Cache frequently used constraints
    if not hasattr(state.globals, 'constraint_cache'):
        state.globals['constraint_cache'] = {}

    return state

def path_explosion_mitigation(self, sm, max_active_states=50):
    """Mitigate path explosion in symbolic execution"""

    if len(sm.active) > max_active_states:
        # Prioritize states based on different criteria
        scored_states = []

        for state in sm.active:
            score = 0

            # Prefer states with fewer constraints (simpler paths)
            score += 1000 / (len(state.solver.constraints) + 1)

            # Prefer states that have made more progress
            score += state.history.depth

            # Prefer states in interesting code regions
            if self.is_interesting_address(state.addr):
                score += 500

            scored_states.append((score, state))

        # Keep top-scored states
        scored_states.sort(key=lambda x: x[0], reverse=True)
        sm.active = [state for score, state in scored_states[:max_active_states]]

        print(f"Pruned to {len(sm.active)} active states")

    return sm

def is_interesting_address(self, addr):
    """Check if address is in interesting code region"""

    # Check if address is in main executable (not libraries)
    main_object = self.project.loader.main_object
    return main_object.min_addr <= addr <= main_object.max_addr

def symbolic_memory_modeling(self, state, memory_model='flat'):
    """Configure symbolic memory modeling"""

    if memory_model == 'flat':
        # Simple flat memory model
        state.options.add(angr.options.ABSTRACT_MEMORY)

    elif memory_model == 'paged':
        # Paged memory model for better performance
        state.options.add(angr.options.FAST_MEMORY)
        state.options.add(angr.options.ABSTRACT_MEMORY)

    elif memory_model == 'concrete':
        # Concrete memory with symbolic overlay
        state.options.add(angr.options.CONCRETE_MEMORY)
        state.options.add(angr.options.SYMBOLIC_INITIAL_VALUES)

    return state

def run_comprehensive_analysis(self, target_function, analysis_config):
    """Run comprehensive symbolic analysis"""

    print("Starting comprehensive symbolic analysis...")

    # Setup hooks
    self.setup_custom_hooks()

    # Create initial state
    initial_state = self.project.factory.entry_state()

    # Configure memory model
    initial_state = self.symbolic_memory_modeling(
        initial_state, 
        analysis_config.get('memory_model', 'flat')
    )

    # Setup symbolic inputs
    if 'symbolic_inputs' in analysis_config:
        for input_config in analysis_config['symbolic_inputs']:
            symbolic_var = claripy.BVS(
                input_config['name'], 
                input_config['size'] * 8
            )
            initial_state.memory.store(input_config['address'], symbolic_var)

    # Run symbolic execution
    sm = self.project.factory.simulation_manager(initial_state)

    # Configure exploration
    find_addrs = analysis_config.get('find_addresses', [])
    avoid_addrs = analysis_config.get('avoid_addresses', [])

    step_count = 0
    max_steps = analysis_config.get('max_steps', 1000)

    while sm.active and step_count < max_steps:
        sm.step()
        step_count += 1

        # Mitigate path explosion
        sm = self.path_explosion_mitigation(sm)

        # Check for target addresses
        if find_addrs:
            found_states = [s for s in sm.active if s.addr in find_addrs]
            if found_states:
                sm.found.extend(found_states)
                sm.active = [s for s in sm.active if s.addr not in find_addrs]

        # Avoid specified addresses
        if avoid_addrs:
            sm.active = [s for s in sm.active if s.addr not in avoid_addrs]

        if step_count % 100 == 0:
            print(f"Step {step_count}: {len(sm.active)} active, {len(sm.found)} found")

    # Analyze results
    results = {
        'found_states': len(sm.found),
        'deadended_states': len(sm.deadended),
        'errored_states': len(sm.errored),
        'total_steps': step_count,
        'solutions': []
    }

    # Extract solutions from found states
    for state in sm.found:
        solution = {}
        for var_name in state.solver.variables:
            if state.solver.satisfiable():
                concrete_value = state.solver.eval(var_name, cast_to=bytes)
                solution[str(var_name)] = concrete_value

        results['solutions'].append(solution)

    self.analysis_results[target_function] = results
    return results

Example usage

def advanced_symbolic_analysis_example(): """Example of advanced symbolic analysis"""

binary_path = "/bin/ls"
project = angr.Project(binary_path, auto_load_libs=False)

analyzer = AdvancedSymbolicAnalysis(project)

# Configuration for analysis
analysis_config = {
    'memory_model': 'paged',
    'max_steps': 500,
    'symbolic_inputs': [
        {
            'name': 'user_input',
            'address': 0x7fff0000,
            'size': 100
        }
    ],
    'find_addresses': [],  # Add target addresses
    'avoid_addresses': []  # Add addresses to avoid
}

# Run analysis
results = analyzer.run_comprehensive_analysis(
    target_function=project.entry,
    analysis_config=analysis_config
)

print(f"Analysis completed: {results}")
return analyzer, results

if name == "main": analyzer, results = advanced_symbolic_analysis_example() ```_

Statische Analyse und Code Discovery

Funktion und Codeanalyse

```python

Static analysis and code discovery with angr

import angr import networkx as nx from collections import defaultdict

class StaticAnalyzer: def init(self, project): self.project = project self.cfg = None self.functions = {} self.call_graph = None self.analysis_cache = {}

def generate_comprehensive_cfg(self):
    """Generate comprehensive CFG with multiple techniques"""

    print("Generating comprehensive CFG...")

    # Generate fast CFG first
    cfg_fast = self.project.analyses.CFGFast(
        normalize=True,
        resolve_indirect_jumps=True,
        force_complete_scan=False
    )

    # Generate accurate CFG for better precision
    cfg_accurate = self.project.analyses.CFGEmulated(
        context_sensitivity_level=1,
        keep_state=True,
        state_add_options=angr.sim_options.refs,
        state_remove_options=angr.sim_options.simplification
    )

    # Combine results
    self.cfg = cfg_accurate if cfg_accurate.functions else cfg_fast
    self.functions = self.cfg.functions

    print(f"CFG generated: {len(self.cfg.nodes())} nodes, {len(self.functions)} functions")
    return self.cfg

def analyze_function_complexity(self, function_addr):
    """Analyze function complexity metrics"""

    if function_addr not in self.functions:
        return None

    func = self.functions[function_addr]

    # Basic metrics
    complexity = {
        'name': func.name,
        'address': hex(func.addr),
        'size': func.size,
        'basic_blocks': len(func.blocks),
        'instructions': sum(block.instructions for block in func.blocks),
        'cyclomatic_complexity': 0,
        'call_sites': len(func.get_call_sites()),
        'callers': len(func.callers),
        'callees': len(func.callees),
        'loops': 0,
        'depth': 0
    }

    # Calculate cyclomatic complexity
    # V(G) = E - N + 2P (where E=edges, N=nodes, P=connected components)
    if func.graph:
        edges = len(func.graph.edges())
        nodes = len(func.graph.nodes())
        complexity['cyclomatic_complexity'] = edges - nodes + 2

    # Detect loops
    if func.graph:
        try:
            cycles = list(nx.simple_cycles(func.graph))
            complexity['loops'] = len(cycles)
        except:
            complexity['loops'] = 0

    # Calculate call depth
    complexity['depth'] = self.calculate_call_depth(func)

    # Analyze instruction types
    instruction_types = defaultdict(int)
    for block in func.blocks:
        for insn in block.capstone.insns:
            instruction_types[insn.mnemonic] += 1

    complexity['instruction_distribution'] = dict(instruction_types)

    # Complexity classification
    if complexity['cyclomatic_complexity'] > 20:
        complexity['complexity_level'] = 'very_high'
    elif complexity['cyclomatic_complexity'] > 10:
        complexity['complexity_level'] = 'high'
    elif complexity['cyclomatic_complexity'] > 5:
        complexity['complexity_level'] = 'medium'
    else:
        complexity['complexity_level'] = 'low'

    return complexity

def calculate_call_depth(self, func, visited=None, depth=0):
    """Calculate maximum call depth from function"""

    if visited is None:
        visited = set()

    if func.addr in visited or depth > 20:  # Prevent infinite recursion
        return depth

    visited.add(func.addr)
    max_depth = depth

    for callee in func.callees:
        if callee.addr in self.functions:
            callee_func = self.functions[callee.addr]
            callee_depth = self.calculate_call_depth(callee_func, visited.copy(), depth + 1)
            max_depth = max(max_depth, callee_depth)

    return max_depth

def find_code_patterns(self):
    """Find interesting code patterns"""

    patterns = {
        'crypto_functions': [],
        'string_functions': [],
        'file_operations': [],
        'network_operations': [],
        'system_calls': [],
        'anti_analysis': [],
        'obfuscation': []
    }

    for addr, func in self.functions.items():
        func_name = func.name.lower()

        # Cryptographic functions
        crypto_keywords = ['crypt', 'hash', 'md5', 'sha', 'aes', 'des', 'rsa', 'cipher']
        if any(keyword in func_name for keyword in crypto_keywords):
            patterns['crypto_functions'].append((func.name, hex(addr)))

        # String manipulation functions
        string_keywords = ['str', 'mem', 'copy', 'cmp', 'len', 'cat', 'chr']
        if any(keyword in func_name for keyword in string_keywords):
            patterns['string_functions'].append((func.name, hex(addr)))

        # File operations
        file_keywords = ['file', 'open', 'read', 'write', 'close', 'fopen', 'fread']
        if any(keyword in func_name for keyword in file_keywords):
            patterns['file_operations'].append((func.name, hex(addr)))

        # Network operations
        network_keywords = ['socket', 'connect', 'send', 'recv', 'bind', 'listen', 'accept']
        if any(keyword in func_name for keyword in network_keywords):
            patterns['network_operations'].append((func.name, hex(addr)))

        # System calls
        syscall_keywords = ['system', 'exec', 'fork', 'exit', 'kill', 'signal']
        if any(keyword in func_name for keyword in syscall_keywords):
            patterns['system_calls'].append((func.name, hex(addr)))

        # Anti-analysis techniques
        anti_keywords = ['debug', 'trace', 'ptrace', 'isdebuggerpresent', 'checkremotedebugger']
        if any(keyword in func_name for keyword in anti_keywords):
            patterns['anti_analysis'].append((func.name, hex(addr)))

        # Analyze function content for patterns
        self.analyze_function_content(func, patterns)

    # Print results
    for category, functions in patterns.items():
        if functions:
            print(f"\n{category.replace('_', ' ').title()}: {len(functions)}")
            for func_name, addr in functions[:5]:  # Show first 5
                print(f"  {func_name} at {addr}")

    return patterns

def analyze_function_content(self, func, patterns):
    """Analyze function content for specific patterns"""

    for block in func.blocks:
        for insn in block.capstone.insns:
            mnemonic = insn.mnemonic.lower()

            # Look for obfuscation patterns
            if mnemonic in ['nop', 'xor', 'rol', 'ror', 'shl', 'shr']:
                # Check for excessive use of these instructions
                obfuscation_count = sum(1 for b in func.blocks 
                                      for i in b.capstone.insns 
                                      if i.mnemonic.lower() in ['nop', 'xor', 'rol', 'ror'])

                if obfuscation_count > 10:  # Threshold for obfuscation
                    if (func.name, hex(func.addr)) not in patterns['obfuscation']:
                        patterns['obfuscation'].append((func.name, hex(func.addr)))

            # Look for anti-debugging instructions
            if mnemonic in ['rdtsc', 'cpuid', 'int']:
                if (func.name, hex(func.addr)) not in patterns['anti_analysis']:
                    patterns['anti_analysis'].append((func.name, hex(func.addr)))

def build_call_graph(self):
    """Build comprehensive call graph"""

    if self.cfg is None:
        self.generate_comprehensive_cfg()

    self.call_graph = nx.DiGraph()

    # Add nodes (functions)
    for addr, func in self.functions.items():
        self.call_graph.add_node(addr, 
                               name=func.name, 
                               size=func.size,
                               complexity=len(func.blocks))

    # Add edges (calls)
    for addr, func in self.functions.items():
        for callee in func.callees:
            if callee.addr in self.functions:
                self.call_graph.add_edge(addr, callee.addr)

    print(f"Call graph built: {len(self.call_graph.nodes())} functions, {len(self.call_graph.edges())} calls")
    return self.call_graph

def analyze_call_graph_properties(self):
    """Analyze call graph properties"""

    if self.call_graph is None:
        self.build_call_graph()

    analysis = {
        'total_functions': len(self.call_graph.nodes()),
        'total_calls': len(self.call_graph.edges()),
        'strongly_connected_components': len(list(nx.strongly_connected_components(self.call_graph))),
        'weakly_connected_components': len(list(nx.weakly_connected_components(self.call_graph))),
        'average_degree': sum(dict(self.call_graph.degree()).values()) / len(self.call_graph.nodes()) if self.call_graph.nodes() else 0,
        'density': nx.density(self.call_graph),
        'diameter': 0,
        'radius': 0
    }

    # Calculate diameter and radius (for connected components)
    try:
        if nx.is_weakly_connected(self.call_graph):
            undirected = self.call_graph.to_undirected()
            analysis['diameter'] = nx.diameter(undirected)
            analysis['radius'] = nx.radius(undirected)
    except:
        pass

    # Find central functions
    centrality_measures = {
        'degree_centrality': nx.degree_centrality(self.call_graph),
        'betweenness_centrality': nx.betweenness_centrality(self.call_graph),
        'closeness_centrality': nx.closeness_centrality(self.call_graph),
        'pagerank': nx.pagerank(self.call_graph)
    }

    # Get top functions for each centrality measure
    analysis['central_functions'] = {}
    for measure, values in centrality_measures.items():
        top_functions = sorted(values.items(), key=lambda x: x[1], reverse=True)[:5]
        analysis['central_functions'][measure] = [
            (self.functions[addr].name, hex(addr), score) 
            for addr, score in top_functions
        ]

    return analysis

def find_entry_points(self):
    """Find all possible entry points"""

    entry_points = {
        'main_entry': self.project.entry,
        'exported_functions': [],
        'constructor_functions': [],
        'exception_handlers': [],
        'signal_handlers': []
    }

    # Find exported functions
    for symbol in self.project.loader.main_object.symbols:
        if symbol.is_export and symbol.rebased_addr in self.functions:
            entry_points['exported_functions'].append({
                'name': symbol.name,
                'address': hex(symbol.rebased_addr),
                'type': 'export'
            })

    # Find constructor functions
    for addr, func in self.functions.items():
        if 'init' in func.name.lower() or 'constructor' in func.name.lower():
            entry_points['constructor_functions'].append({
                'name': func.name,
                'address': hex(addr),
                'type': 'constructor'
            })

    # Find exception handlers (simplified)
    try:
        if hasattr(self.project.loader.main_object, 'exception_handoff_table'):
            for handler in self.project.loader.main_object.exception_handoff_table:
                if handler in self.functions:
                    entry_points['exception_handlers'].append({
                        'name': self.functions[handler].name,
                        'address': hex(handler),
                        'type': 'exception_handler'
                    })
    except:
        pass

    return entry_points

def analyze_data_references(self):
    """Analyze data references and constants"""

    data_analysis = {
        'string_references': [],
        'numeric_constants': [],
        'function_pointers': [],
        'global_variables': []
    }

    # Analyze each function for data references
    for addr, func in self.functions.items():
        for block in func.blocks:
            for insn in block.capstone.insns:
                # Look for memory references
                if insn.operands:
                    for operand in insn.operands:
                        if operand.type == 3:  # Memory operand
                            mem_addr = operand.value.mem.disp

                            # Check if it's a string reference
                            try:
                                string_data = self.project.loader.memory.load(mem_addr, 100)
                                if self.is_printable_string(string_data):
                                    data_analysis['string_references'].append({
                                        'function': func.name,
                                        'address': hex(mem_addr),
                                        'string': string_data.decode('ascii', errors='ignore')[:50]
                                    })
                            except:
                                pass

                            # Check if it's a function pointer
                            if mem_addr in self.functions:
                                data_analysis['function_pointers'].append({
                                    'function': func.name,
                                    'target': self.functions[mem_addr].name,
                                    'address': hex(mem_addr)
                                })

                        elif operand.type == 2:  # Immediate operand
                            value = operand.value.imm

                            # Collect interesting constants
                            if self.is_interesting_constant(value):
                                data_analysis['numeric_constants'].append({
                                    'function': func.name,
                                    'value': hex(value),
                                    'decimal': value,
                                    'instruction': f"{insn.mnemonic} {insn.op_str}"
                                })

    return data_analysis

def is_printable_string(self, data):
    """Check if data represents a printable string"""

    if len(data) < 4:
        return False

    # Check for null terminator
    null_pos = data.find(b'\x00')
    if null_pos == -1:
        return False

    string_data = data[:null_pos]

    # Check if mostly printable ASCII
    printable_count = sum(1 for byte in string_data if 32 <= byte <= 126)
    return printable_count / len(string_data) > 0.8 if string_data else False

def is_interesting_constant(self, value):
    """Check if constant is potentially interesting"""

    # Common interesting constants
    interesting_values = {
        0x41414141,  # 'AAAA'
        0x42424242,  # 'BBBB'
        0xdeadbeef,
        0xcafebabe,
        0xfeedface,
        0x12345678,
        0x87654321
    }

    if value in interesting_values:
        return True

    # Large constants that might be addresses or keys
    if value > 0x10000000:
        return True

    # Powers of 2
    if value > 0 and (value & (value - 1)) == 0:
        return True

    return False

def generate_analysis_report(self, output_file="static_analysis_report.txt"):
    """Generate comprehensive static analysis report"""

    print("Generating static analysis report...")

    # Run all analyses
    if self.cfg is None:
        self.generate_comprehensive_cfg()

    patterns = self.find_code_patterns()
    call_graph_analysis = self.analyze_call_graph_properties()
    entry_points = self.find_entry_points()
    data_analysis = self.analyze_data_references()

    # Generate report
    with open(output_file, 'w') as f:
        f.write("STATIC ANALYSIS REPORT\n")
        f.write("=" * 50 + "\n\n")

        # Binary information
        f.write("BINARY INFORMATION\n")
        f.write("-" * 20 + "\n")
        f.write(f"Architecture: {self.project.arch.name}\n")
        f.write(f"Entry point: {hex(self.project.entry)}\n")
        f.write(f"Base address: {hex(self.project.loader.main_object.min_addr)}\n")
        f.write(f"Binary size: {self.project.loader.main_object.max_addr - self.project.loader.main_object.min_addr} bytes\n")
        f.write(f"Functions discovered: {len(self.functions)}\n\n")

        # Call graph analysis
        f.write("CALL GRAPH ANALYSIS\n")
        f.write("-" * 20 + "\n")
        for key, value in call_graph_analysis.items():
            if key != 'central_functions':
                f.write(f"{key}: {value}\n")

        f.write("\nCentral Functions:\n")
        for measure, functions in call_graph_analysis['central_functions'].items():
            f.write(f"  {measure}:\n")
            for name, addr, score in functions:
                f.write(f"    {name} ({addr}): {score:.3f}\n")
        f.write("\n")

        # Code patterns
        f.write("CODE PATTERNS\n")
        f.write("-" * 20 + "\n")
        for category, functions in patterns.items():
            if functions:
                f.write(f"{category}: {len(functions)} functions\n")
                for name, addr in functions[:3]:  # Show first 3
                    f.write(f"  {name} at {addr}\n")
        f.write("\n")

        # Entry points
        f.write("ENTRY POINTS\n")
        f.write("-" * 20 + "\n")
        f.write(f"Main entry: {hex(entry_points['main_entry'])}\n")
        f.write(f"Exported functions: {len(entry_points['exported_functions'])}\n")
        f.write(f"Constructor functions: {len(entry_points['constructor_functions'])}\n")
        f.write("\n")

        # Data analysis
        f.write("DATA ANALYSIS\n")
        f.write("-" * 20 + "\n")
        f.write(f"String references: {len(data_analysis['string_references'])}\n")
        f.write(f"Numeric constants: {len(data_analysis['numeric_constants'])}\n")
        f.write(f"Function pointers: {len(data_analysis['function_pointers'])}\n")
        f.write("\n")

        # Top complex functions
        f.write("COMPLEX FUNCTIONS\n")
        f.write("-" * 20 + "\n")
        complex_functions = []
        for addr, func in self.functions.items():
            complexity = self.analyze_function_complexity(addr)
            if complexity and complexity['cyclomatic_complexity'] > 10:
                complex_functions.append(complexity)

        complex_functions.sort(key=lambda x: x['cyclomatic_complexity'], reverse=True)

        for func in complex_functions[:10]:  # Top 10
            f.write(f"{func['name']} ({func['address']}): "
                   f"CC={func['cyclomatic_complexity']}, "
                   f"Size={func['size']}, "
                   f"Blocks={func['basic_blocks']}\n")

    print(f"Static analysis report saved to {output_file}")
    return output_file

Example usage

def comprehensive_static_analysis(binary_path): """Perform comprehensive static analysis"""

# Load binary
project = angr.Project(binary_path, auto_load_libs=False)

# Create analyzer
analyzer = StaticAnalyzer(project)

# Generate CFG
cfg = analyzer.generate_comprehensive_cfg()

# Analyze specific functions
main_symbol = project.loader.main_object.get_symbol('main')
if main_symbol:
    main_complexity = analyzer.analyze_function_complexity(main_symbol.rebased_addr)
    print(f"Main function complexity: {main_complexity}")

# Generate comprehensive report
report_file = analyzer.generate_analysis_report()

return analyzer, report_file

if name == "main": binary_path = "/bin/ls" analyzer, report = comprehensive_static_analysis(binary_path) print(f"Analysis completed. Report: {report}") ```_

Automatisierung und Integration

Automatisierte Analyse Pipelines

```python

Automated analysis pipelines with angr

import angr import json import time import logging import multiprocessing from pathlib import Path from datetime import datetime

class AnalysisPipeline: def init(self, config_file=None): self.config = self.load_config(config_file) self.setup_logging() self.results = {} self.analysis_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue()

def load_config(self, config_file):
    """Load pipeline configuration"""

    default_config = {
        "analysis_modules": [
            "cfg_analysis",
            "function_analysis",
            "symbolic_execution",
            "vulnerability_detection",
            "static_analysis"
        ],
        "symbolic_execution": {
            "max_steps": 1000,
            "max_states": 50,
            "timeout": 300
        },
        "vulnerability_detection": {
            "buffer_overflow": True,
            "format_string": True,
            "integer_overflow": True,
            "use_after_free": True
        },
        "output": {
            "formats": ["json", "html", "txt"],
            "directory": "./analysis_results"
        },
        "performance": {
            "parallel_workers": 4,
            "memory_limit": "4GB",
            "timeout_per_binary": 1800
        }
    }

    if config_file and Path(config_file).exists():
        with open(config_file, 'r') as f:
            user_config = json.load(f)
            # Merge configurations
            for key, value in user_config.items():
                if isinstance(value, dict) and key in default_config:
                    default_config[key].update(value)
                else:
                    default_config[key] = value

    return default_config

def setup_logging(self):
    """Setup logging for the pipeline"""

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('analysis_pipeline.log'),
            logging.StreamHandler()
        ]
    )

    self.logger = logging.getLogger(__name__)

    # Reduce angr logging verbosity
    logging.getLogger('angr').setLevel(logging.WARNING)
    logging.getLogger('cle').setLevel(logging.WARNING)
    logging.getLogger('pyvex').setLevel(logging.WARNING)

def analyze_binary(self, binary_path, analysis_id=None):
    """Analyze single binary"""

    if analysis_id is None:
        analysis_id = f"analysis_{int(time.time())}"

    self.logger.info(f"Starting analysis of {binary_path} (ID: {analysis_id})")

    start_time = time.time()
    analysis_result = {
        'binary_path': str(binary_path),
        'analysis_id': analysis_id,
        'start_time': datetime.now().isoformat(),
        'status': 'running',
        'modules': {},
        'errors': [],
        'warnings': []
    }

    try:
        # Load binary
        project = angr.Project(binary_path, auto_load_libs=False)
        analysis_result['binary_info'] = {
            'architecture': project.arch.name,
            'entry_point': hex(project.entry),
            'base_address': hex(project.loader.main_object.min_addr),
            'size': project.loader.main_object.max_addr - project.loader.main_object.min_addr
        }

        # Run analysis modules
        for module in self.config['analysis_modules']:
            try:
                self.logger.info(f"Running module: {module}")
                module_result = self.run_analysis_module(project, module)
                analysis_result['modules'][module] = module_result

            except Exception as e:
                error_msg = f"Error in module {module}: {str(e)}"
                self.logger.error(error_msg)
                analysis_result['errors'].append(error_msg)

        analysis_result['status'] = 'completed'

    except Exception as e:
        error_msg = f"Failed to analyze {binary_path}: {str(e)}"
        self.logger.error(error_msg)
        analysis_result['status'] = 'failed'
        analysis_result['errors'].append(error_msg)

    finally:
        analysis_result['end_time'] = datetime.now().isoformat()
        analysis_result['duration'] = time.time() - start_time

        self.results[analysis_id] = analysis_result
        self.save_results(analysis_result)

    return analysis_result

def run_analysis_module(self, project, module_name):
    """Run specific analysis module"""

    if module_name == "cfg_analysis":
        return self.run_cfg_analysis(project)
    elif module_name == "function_analysis":
        return self.run_function_analysis(project)
    elif module_name == "symbolic_execution":
        return self.run_symbolic_execution(project)
    elif module_name == "vulnerability_detection":
        return self.run_vulnerability_detection(project)
    elif module_name == "static_analysis":
        return self.run_static_analysis(project)
    else:
        raise ValueError(f"Unknown analysis module: {module_name}")

def run_cfg_analysis(self, project):
    """Run CFG analysis module"""

    cfg = project.analyses.CFGFast()

    result = {
        'total_functions': len(cfg.functions),
        'total_nodes': len(cfg.nodes()),
        'total_edges': len(cfg.edges()),
        'entry_point': hex(project.entry),
        'functions': []
    }

    # Analyze top 10 largest functions
    sorted_functions = sorted(cfg.functions.items(), 
                            key=lambda x: x[1].size, reverse=True)

    for addr, func in sorted_functions[:10]:
        func_info = {
            'name': func.name,
            'address': hex(addr),
            'size': func.size,
            'blocks': len(func.blocks),
            'callers': len(func.callers),
            'callees': len(func.callees)
        }
        result['functions'].append(func_info)

    return result

def run_function_analysis(self, project):
    """Run function analysis module"""

    cfg = project.analyses.CFGFast()

    result = {
        'total_functions': len(cfg.functions),
        'complexity_distribution': {'low': 0, 'medium': 0, 'high': 0, 'very_high': 0},
        'interesting_functions': {
            'large_functions': [],
            'complex_functions': [],
            'recursive_functions': []
        }
    }

    for addr, func in cfg.functions.items():
        # Calculate complexity
        complexity = len(func.blocks)

        if complexity <= 5:
            result['complexity_distribution']['low'] += 1
        elif complexity <= 15:
            result['complexity_distribution']['medium'] += 1
        elif complexity <= 30:
            result['complexity_distribution']['high'] += 1
        else:
            result['complexity_distribution']['very_high'] += 1

        # Find interesting functions
        if func.size > 1000:
            result['interesting_functions']['large_functions'].append({
                'name': func.name,
                'address': hex(addr),
                'size': func.size
            })

        if complexity > 20:
            result['interesting_functions']['complex_functions'].append({
                'name': func.name,
                'address': hex(addr),
                'complexity': complexity
            })

        # Check for recursion
        if addr in [callee.addr for callee in func.callees]:
            result['interesting_functions']['recursive_functions'].append({
                'name': func.name,
                'address': hex(addr)
            })

    return result

def run_symbolic_execution(self, project):
    """Run symbolic execution module"""

    config = self.config['symbolic_execution']

    result = {
        'executed': False,
        'states_explored': 0,
        'paths_found': 0,
        'errors': [],
        'timeout': False
    }

    try:
        # Create initial state
        initial_state = project.factory.entry_state()

        # Run symbolic execution with timeout
        sm = project.factory.simulation_manager(initial_state)

        start_time = time.time()
        step_count = 0

        while (sm.active and 
               step_count < config['max_steps'] and 
               time.time() - start_time < config['timeout']):

            sm.step()
            step_count += 1

            # Limit active states
            if len(sm.active) > config['max_states']:
                sm.active = sm.active[:config['max_states']]

        result['executed'] = True
        result['states_explored'] = step_count
        result['paths_found'] = len(sm.deadended) + len(sm.found)

        if time.time() - start_time >= config['timeout']:
            result['timeout'] = True

    except Exception as e:
        result['errors'].append(str(e))

    return result

def run_vulnerability_detection(self, project):
    """Run vulnerability detection module"""

    config = self.config['vulnerability_detection']

    result = {
        'vulnerabilities_found': [],
        'checks_performed': []
    }

    cfg = project.analyses.CFGFast()

    # Buffer overflow detection
    if config.get('buffer_overflow', True):
        result['checks_performed'].append('buffer_overflow')

        for addr, func in cfg.functions.items():
            if 'strcpy' in func.name or 'gets' in func.name or 'sprintf' in func.name:
                result['vulnerabilities_found'].append({
                    'type': 'potential_buffer_overflow',
                    'function': func.name,
                    'address': hex(addr),
                    'description': f'Dangerous function {func.name} detected'
                })

    # Format string detection
    if config.get('format_string', True):
        result['checks_performed'].append('format_string')

        for addr, func in cfg.functions.items():
            if 'printf' in func.name and 'sprintf' not in func.name:
                result['vulnerabilities_found'].append({
                    'type': 'potential_format_string',
                    'function': func.name,
                    'address': hex(addr),
                    'description': f'Format string function {func.name} detected'
                })

    return result

def run_static_analysis(self, project):
    """Run static analysis module"""

    result = {
        'strings': [],
        'imports': [],
        'exports': [],
        'sections': []
    }

    # Extract strings
    try:
        for addr, string in project.loader.main_object.strings.items():
            if len(string) > 4 and string.isprintable():
                result['strings'].append({
                    'address': hex(addr),
                    'value': string[:100]  # Truncate long strings
                })
    except:
        pass

    # Extract imports
    try:
        for symbol in project.loader.main_object.imports:
            result['imports'].append({
                'name': symbol.name,
                'address': hex(symbol.rebased_addr) if symbol.rebased_addr else None
            })
    except:
        pass

    # Extract exports
    try:
        for symbol in project.loader.main_object.exports:
            result['exports'].append({
                'name': symbol.name,
                'address': hex(symbol.rebased_addr)
            })
    except:
        pass

    return result

def batch_analyze(self, binary_paths, parallel=True):
    """Analyze multiple binaries"""

    self.logger.info(f"Starting batch analysis of {len(binary_paths)} binaries")

    if parallel and len(binary_paths) > 1:
        return self.parallel_batch_analyze(binary_paths)
    else:
        return self.sequential_batch_analyze(binary_paths)

def sequential_batch_analyze(self, binary_paths):
    """Sequential batch analysis"""

    results = []

    for i, binary_path in enumerate(binary_paths):
        self.logger.info(f"Analyzing binary {i+1}/{len(binary_paths)}: {binary_path}")

        try:
            result = self.analyze_binary(binary_path)
            results.append(result)
        except Exception as e:
            self.logger.error(f"Failed to analyze {binary_path}: {e}")
            results.append({
                'binary_path': str(binary_path),
                'status': 'failed',
                'error': str(e)
            })

    return results

def parallel_batch_analyze(self, binary_paths):
    """Parallel batch analysis"""

    num_workers = min(self.config['performance']['parallel_workers'], len(binary_paths))

    # Create worker processes
    workers = []
    for i in range(num_workers):
        worker = multiprocessing.Process(
            target=self.analysis_worker,
            args=(self.analysis_queue, self.result_queue)
        )
        workers.append(worker)
        worker.start()

    # Add tasks to queue
    for binary_path in binary_paths:
        self.analysis_queue.put(binary_path)

    # Add sentinel values to stop workers
    for _ in range(num_workers):
        self.analysis_queue.put(None)

    # Collect results
    results = []
    for _ in range(len(binary_paths)):
        result = self.result_queue.get()
        results.append(result)

    # Wait for workers to finish
    for worker in workers:
        worker.join()

    return results

def analysis_worker(self, task_queue, result_queue):
    """Worker process for parallel analysis"""

    while True:
        binary_path = task_queue.get()

        if binary_path is None:  # Sentinel value
            break

        try:
            result = self.analyze_binary(binary_path)
            result_queue.put(result)
        except Exception as e:
            result_queue.put({
                'binary_path': str(binary_path),
                'status': 'failed',
                'error': str(e)
            })

def save_results(self, analysis_result):
    """Save analysis results"""

    output_dir = Path(self.config['output']['directory'])
    output_dir.mkdir(exist_ok=True)

    analysis_id = analysis_result['analysis_id']

    # Save JSON result
    if 'json' in self.config['output']['formats']:
        json_file = output_dir / f"{analysis_id}.json"
        with open(json_file, 'w') as f:
            json.dump(analysis_result, f, indent=2)

    # Save text report
    if 'txt' in self.config['output']['formats']:
        txt_file = output_dir / f"{analysis_id}.txt"
        self.generate_text_report(analysis_result, txt_file)

    # Save HTML report
    if 'html' in self.config['output']['formats']:
        html_file = output_dir / f"{analysis_id}.html"
        self.generate_html_report(analysis_result, html_file)

def generate_text_report(self, analysis_result, output_file):
    """Generate text report"""

    with open(output_file, 'w') as f:
        f.write("ANGR ANALYSIS REPORT\n")
        f.write("=" * 50 + "\n\n")

        # Basic information
        f.write(f"Binary: {analysis_result['binary_path']}\n")
        f.write(f"Analysis ID: {analysis_result['analysis_id']}\n")
        f.write(f"Status: {analysis_result['status']}\n")
        f.write(f"Duration: {analysis_result.get('duration', 0):.2f} seconds\n\n")

        # Binary information
        if 'binary_info' in analysis_result:
            info = analysis_result['binary_info']
            f.write("BINARY INFORMATION\n")
            f.write("-" * 20 + "\n")
            for key, value in info.items():
                f.write(f"{key}: {value}\n")
            f.write("\n")

        # Module results
        for module, result in analysis_result.get('modules', {}).items():
            f.write(f"{module.upper().replace('_', ' ')}\n")
            f.write("-" * 20 + "\n")
            self.write_module_result(f, result)
            f.write("\n")

        # Errors and warnings
        if analysis_result.get('errors'):
            f.write("ERRORS\n")
            f.write("-" * 20 + "\n")
            for error in analysis_result['errors']:
                f.write(f"- {error}\n")
            f.write("\n")

def write_module_result(self, file_handle, result):
    """Write module result to file"""

    if isinstance(result, dict):
        for key, value in result.items():
            if isinstance(value, (list, dict)) and len(str(value)) > 100:
                file_handle.write(f"{key}: {type(value).__name__} with {len(value)} items\n")
            else:
                file_handle.write(f"{key}: {value}\n")
    else:
        file_handle.write(f"Result: {result}\n")

def generate_html_report(self, analysis_result, output_file):
    """Generate HTML report"""

    html_content = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>angr Analysis Report - {analysis_result['analysis_id']}</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .header {{ background-color: #f0f0f0; padding: 10px; border-radius: 5px; }}
            .section {{ margin: 20px 0; }}
            .error {{ color: red; }}
            .warning {{ color: orange; }}
            table {{ border-collapse: collapse; width: 100%; }}
            th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
            th {{ background-color: #f2f2f2; }}
        </style>
    </head>
    <body>
        <div class="header">
            <h1>angr Analysis Report</h1>
            <p><strong>Binary:</strong> {analysis_result['binary_path']}</p>
            <p><strong>Analysis ID:</strong> {analysis_result['analysis_id']}</p>
            <p><strong>Status:</strong> {analysis_result['status']}</p>
            <p><strong>Duration:</strong> {analysis_result.get('duration', 0):.2f} seconds</p>
        </div>
    """

    # Add module results
    for module, result in analysis_result.get('modules', {}).items():
        html_content += f'<div class="section"><h2>{module.replace("_", " ").title()}</h2>'
        html_content += self.format_result_html(result)
        html_content += '</div>'

    html_content += """
    </body>
    </html>
    """

    with open(output_file, 'w') as f:
        f.write(html_content)

def format_result_html(self, result):
    """Format result as HTML"""

    if isinstance(result, dict):
        html = "<table>"
        for key, value in result.items():
            html += f"<tr><td><strong>{key}</strong></td><td>{value}</td></tr>"
        html += "</table>"
        return html
    else:
        return f"<p>{result}</p>"

Example usage and configuration

def create_analysis_pipeline(): """Create and configure analysis pipeline"""

# Create configuration
config = {
    "analysis_modules": [
        "cfg_analysis",
        "function_analysis",
        "static_analysis",
        "vulnerability_detection"
    ],
    "output": {
        "formats": ["json", "txt", "html"],
        "directory": "./angr_analysis_results"
    },
    "performance": {
        "parallel_workers": 2,
        "timeout_per_binary": 300
    }
}

# Save configuration
with open('pipeline_config.json', 'w') as f:
    json.dump(config, f, indent=2)

# Create pipeline
pipeline = AnalysisPipeline('pipeline_config.json')

return pipeline

def run_pipeline_example(): """Example of running analysis pipeline"""

# Create pipeline
pipeline = create_analysis_pipeline()

# Analyze single binary
binary_path = "/bin/ls"
result = pipeline.analyze_binary(binary_path)

print(f"Analysis completed: {result['status']}")
print(f"Duration: {result.get('duration', 0):.2f} seconds")

# Batch analysis example
binary_paths = ["/bin/ls", "/bin/cat", "/bin/echo"]
batch_results = pipeline.batch_analyze(binary_paths, parallel=True)

print(f"Batch analysis completed: {len(batch_results)} binaries")

return pipeline, batch_results

if name == "main": pipeline, results = run_pipeline_example() print("Pipeline example completed") ```_

Best Practices und Tipps

Leistungsoptimierung

```python

angr performance optimization techniques

import angr import angr.options as o

class PerformanceOptimizer: def init(self): self.optimization_strategies = { 'memory': self.optimize_memory, 'solver': self.optimize_solver, 'state_management': self.optimize_state_management, 'analysis': self.optimize_analysis }

def optimize_memory(self, project, state=None):
    """Optimize memory usage"""

    # Memory optimization options
    memory_opts = {
        o.ABSTRACT_MEMORY,
        o.FAST_MEMORY,
        o.APPROXIMATE_MEMORY_SIZES,
        o.APPROXIMATE_MEMORY_INDICES
    }

    if state:
        state.options.update(memory_opts)

    # Configure memory backend
    if state:
        state.memory.set_state_options(
            write_strategies=[angr.storage.memory_mixins.DefaultMemory.write_strategy_store],
            read_strategies=[angr.storage.memory_mixins.DefaultMemory.read_strategy_load]
        )

    return memory_opts

def optimize_solver(self, project, state=None):
    """Optimize constraint solver"""

    # Solver optimization options
    solver_opts = {
        o.ABSTRACT_SOLVER,
        o.APPROXIMATE_FIRST,
        o.APPROXIMATE_GUARDS,
        o.APPROXIMATE_SATISFIABILITY
    }

    if state:
        state.options.update(solver_opts)

        # Use faster solver backend
        state.solver._solver = angr.solvers.SolverCacheless(
            backend=angr.solvers.backends.z3
        )

        # Set solver timeout
        state.solver.timeout = 5000  # 5 seconds

    return solver_opts

def optimize_state_management(self, simulation_manager):
    """Optimize state management"""

    # Limit number of active states
    max_active = 20
    if len(simulation_manager.active) > max_active:
        # Keep most promising states
        scored_states = []
        for state in simulation_manager.active:
            score = self.calculate_state_score(state)
            scored_states.append((score, state))

        scored_states.sort(key=lambda x: x[0], reverse=True)
        simulation_manager.active = [state for _, state in scored_states[:max_active]]

    # Merge similar states
    simulation_manager.merge()

    return simulation_manager

def calculate_state_score(self, state):
    """Calculate state priority score"""

    score = 0

    # Prefer states with fewer constraints
    score += 1000 / (len(state.solver.constraints) + 1)

    # Prefer states that have made more progress
    score += state.history.depth

    # Prefer states in main executable
    main_object = state.project.loader.main_object
    if main_object.min_addr <= state.addr <= main_object.max_addr:
        score += 100

    return score

def optimize_analysis(self, project):
    """Optimize analysis settings"""

    # CFG optimization
    cfg_opts = {
        'normalize': True,
        'resolve_indirect_jumps': False,  # Disable for speed
        'force_complete_scan': False,
        'show_progressbar': False
    }

    return cfg_opts

def create_optimized_state(self, project, addr=None):
    """Create optimized initial state"""

    if addr is None:
        addr = project.entry

    # Performance-oriented options
    perf_options = {
        o.ABSTRACT_SOLVER,
        o.ABSTRACT_MEMORY,
        o.FAST_MEMORY,
        o.FAST_REGISTERS,
        o.APPROXIMATE_FIRST,
        o.APPROXIMATE_GUARDS,
        o.LAZY_SOLVES
    }

    # Create state with optimizations
    state = project.factory.entry_state(
        addr=addr,
        add_options=perf_options
    )

    # Additional optimizations
    self.optimize_memory(project, state)
    self.optimize_solver(project, state)

    return state

Error handling and debugging

class AngrDebugger: def init(self, project): self.project = project self.debug_info = {}

def setup_debugging(self, state):
    """Setup debugging for state"""

    # Enable tracking options
    debug_options = {
        o.TRACK_MEMORY_ACTIONS,
        o.TRACK_REGISTER_ACTIONS,
        o.TRACK_JMP_ACTIONS,
        o.TRACK_CONSTRAINT_ACTIONS
    }

    state.options.update(debug_options)

    # Setup breakpoints
    state.inspect.b('mem_read', when=angr.BP_BEFORE, action=self.on_memory_read)
    state.inspect.b('mem_write', when=angr.BP_BEFORE, action=self.on_memory_write)
    state.inspect.b('call', when=angr.BP_BEFORE, action=self.on_function_call)

    return state

def on_memory_read(self, state):
    """Handle memory read events"""

    addr = state.inspect.mem_read_address
    size = state.inspect.mem_read_length

    if state.solver.symbolic(addr):
        print(f"Symbolic memory read at {addr} (size: {size})")

def on_memory_write(self, state):
    """Handle memory write events"""

    addr = state.inspect.mem_write_address
    data = state.inspect.mem_write_expr

    if state.solver.symbolic(addr) or state.solver.symbolic(data):
        print(f"Symbolic memory write at {addr}: {data}")

def on_function_call(self, state):
    """Handle function call events"""

    target = state.inspect.function_address
    print(f"Function call to {hex(target)}")

def analyze_state_history(self, state):
    """Analyze state execution history"""

    history = []
    current = state

    while current.history.parent is not None:
        history.append({
            'address': hex(current.addr),
            'depth': current.history.depth,
            'constraints': len(current.solver.constraints),
            'actions': len(current.history.actions)
        })
        current = current.history.parent

    history.reverse()
    return history

def debug_constraint_solving(self, state):
    """Debug constraint solving issues"""

    print(f"Constraint count: {len(state.solver.constraints)}")
    print(f"Variables: {list(state.solver.variables)}")

    # Check satisfiability
    if not state.solver.satisfiable():
        print("State is unsatisfiable!")

        # Find conflicting constraints
        for i, constraint in enumerate(state.solver.constraints):
            temp_solver = state.solver.branch()
            temp_solver.add(constraint)

            if not temp_solver.satisfiable():
                print(f"Constraint {i} makes state unsatisfiable: {constraint}")

    # Check solver performance
    import time
    start_time = time.time()
    state.solver.satisfiable()
    solve_time = time.time() - start_time

    if solve_time > 1.0:
        print(f"Slow constraint solving: {solve_time:.2f} seconds")

Best practices guide

def angr_best_practices(): """Guide to angr best practices"""

practices = {
    "Performance": [
        "Use ABSTRACT_MEMORY and ABSTRACT_SOLVER options for better performance",
        "Limit the number of active states to prevent state explosion",
        "Use timeouts for constraint solving",
        "Disable unnecessary tracking options",
        "Use CFGFast instead of CFGEmulated when possible"
    ],

    "Memory Management": [
        "Use FAST_MEMORY option for large binaries",
        "Avoid storing large amounts of data in state globals",
        "Clear unused states regularly",
        "Use memory-mapped files for large inputs"
    ],

    "Symbolic Execution": [
        "Start with concrete inputs and gradually make them symbolic",
        "Use find/avoid addresses to guide exploration",
        "Implement custom exploration techniques for complex targets",
        "Use state merging to reduce path explosion"
    ],

    "Debugging": [
        "Use state.inspect for debugging symbolic execution",
        "Enable tracking options only when needed",
        "Analyze constraint complexity regularly",
        "Use logging to track analysis progress"
    ],

    "Analysis Design": [
        "Break complex analysis into smaller modules",
        "Use caching for expensive computations",
        "Implement timeouts for all analysis phases",
        "Validate results with multiple techniques"
    ]
}

return practices

Example of optimized analysis

def optimized_analysis_example(binary_path): """Example of optimized angr analysis"""

# Load project
project = angr.Project(binary_path, auto_load_libs=False)

# Create optimizer
optimizer = PerformanceOptimizer()

# Create optimized state
initial_state = optimizer.create_optimized_state(project)

# Setup debugging if needed
debugger = AngrDebugger(project)
# initial_state = debugger.setup_debugging(initial_state)

# Run optimized symbolic execution
sm = project.factory.simulation_manager(initial_state)

# Exploration with optimization
step_count = 0
max_steps = 100

while sm.active and step_count < max_steps:
    sm.step()
    step_count += 1

    # Apply optimizations
    sm = optimizer.optimize_state_management(sm)

    if step_count % 10 == 0:
        print(f"Step {step_count}: {len(sm.active)} active states")

print(f"Analysis completed: {step_count} steps")
print(f"Final states: {len(sm.active)} active, {len(sm.deadended)} deadended")

return sm

if name == "main": binary_path = "/bin/ls" sm = optimized_analysis_example(binary_path)

# Print best practices
practices = angr_best_practices()
print("\nangr Best Practices:")
for category, tips in practices.items():
    print(f"\n{category}:")
    for tip in tips:
        print(f"  - {tip}")

```_

Ressourcen

Dokumentation und Lernen

Schulungen und Tutorials

Gemeinschaft und Unterstützung

Verwandte Tools und Integration