Saltar a contenido

angr Cheat Sheet

"Clase de la hoja" id="copy-btn" class="copy-btn" onclick="copyAllCommands()" Copiar todos los comandos id="pdf-btn" class="pdf-btn" onclick="generatePDF()" Generar PDF seleccionado/button ■/div titulada

Sinopsis

angr es una poderosa plataforma de análisis binario basado en Python que proporciona funciones simbólicas de ejecución, análisis estático y análisis dinámico. Está diseñado para el descubrimiento automatizado de vulnerabilidad, ingeniería inversa y análisis del programa. angr puede analizar binarios a través de múltiples arquitecturas y proporciona una rica API para la construcción de herramientas de análisis personalizado.

Key Strengths: Ejecución simbólica, soporte multiarquitectura, detección automatizada de vulnerabilidad, resolución de restricciones y API de Python extensa para flujos de trabajo de análisis personalizados.

Instalación y configuración

Instalación básica

# Install angr via pip (recommended)
pip install angr

# Install with additional dependencies
pip install angr[all]

# Install development version
pip install git+https://github.com/angr/angr.git

# Install in virtual environment (recommended)
python -m venv angr-env
source angr-env/bin/activate  # Linux/macOS
# angr-env\Scripts\activate  # Windows
pip install angr

# Verify installation
python -c "import angr; print(angr.__version__)"

# Install additional tools
pip install angr-management  # GUI for angr
pip install angr-doc        # Documentation
pip install angr-utils      # Utility functions

Dependencias y necesidades

# System dependencies (Ubuntu/Debian)
sudo apt-get update
sudo apt-get install -y \
    python3-dev \
    build-essential \
    libffi-dev \
    libssl-dev \
    libtool \
    pkg-config \
    cmake \
    libgmp-dev \
    libmpfr-dev \
    libmpc-dev

# System dependencies (CentOS/RHEL)
sudo yum groupinstall -y "Development Tools"
sudo yum install -y \
    python3-devel \
    libffi-devel \
    openssl-devel \
    libtool \
    pkgconfig \
    cmake \
    gmp-devel \
    mpfr-devel \
    libmpc-devel

# macOS dependencies (with Homebrew)
brew install \
    libffi \
    openssl \
    libtool \
    pkg-config \
    cmake \
    gmp \
    mpfr \
    libmpc

# Install Z3 solver (recommended)
pip install z3-solver

# Install additional solvers
pip install pysmt
pysmt-install --z3 --cvc4 --boolector

# Verify solver installation
python -c "import z3; print('Z3 version:', z3.get_version_string())"

Configuración y medio ambiente

# angr configuration and environment setup
import angr
import logging
import os

# Configure logging
logging.getLogger('angr').setLevel(logging.INFO)
logging.getLogger('cle').setLevel(logging.WARNING)
logging.getLogger('pyvex').setLevel(logging.WARNING)

# Set environment variables
os.environ['ANGR_CACHE_DIR'] = '/tmp/angr_cache'
os.environ['ANGR_LOG_LEVEL'] = 'INFO'

# Configure angr options
angr.options.ZERO_FILL_UNCONSTRAINED_MEMORY = True
angr.options.ZERO_FILL_UNCONSTRAINED_REGISTERS = True

# Memory and performance settings
import angr.options as o

# Common option sets
common_options = {
    o.ABSTRACT_SOLVER,
    o.ABSTRACT_MEMORY,
    o.APPROXIMATE_FIRST,
    o.APPROXIMATE_GUARDS,
    o.APPROXIMATE_SATISFIABILITY
}

# Debugging options
debug_options = {
    o.TRACK_MEMORY_ACTIONS,
    o.TRACK_REGISTER_ACTIONS,
    o.TRACK_JMP_ACTIONS,
    o.TRACK_CONSTRAINT_ACTIONS
}

# Performance options
performance_options = {
    o.EFFICIENT_STATE_MERGING,
    o.LAZY_SOLVES,
    o.FAST_MEMORY,
    o.FAST_REGISTERS
}

print("angr environment configured successfully")

Análisis binario básico

Binarios de carga y análisis

# Basic binary loading and analysis
import angr
import archinfo

def load_binary(binary_path, **kwargs):
    """Load binary with angr"""

    # Basic loading
    project = angr.Project(binary_path, auto_load_libs=False)

    print(f"Binary: {binary_path}")
    print(f"Architecture: {project.arch}")
    print(f"Entry point: {hex(project.entry)}")
    print(f"Base address: {hex(project.loader.main_object.min_addr)}")
    print(f"Binary type: {project.loader.main_object.os}")

    return project

# Load binary with custom options
def load_binary_advanced(binary_path):
    """Load binary with advanced options"""

    # Custom loading options
    project = angr.Project(
        binary_path,
        auto_load_libs=True,          # Load shared libraries
        use_sim_procedures=True,      # Use SimProcedures for library functions
        exclude_sim_procedures_func=['malloc', 'free'],  # Exclude specific functions
        force_load_libs=['libc.so.6'],  # Force load specific libraries
        main_opts={
            'base_addr': 0x400000,    # Custom base address
            'backend': 'elf'          # Force backend type
        }
    )

    # Print loaded objects
    print("Loaded objects:")
    for obj in project.loader.all_objects:
        print(f"  {obj.binary}: {hex(obj.min_addr)}-{hex(obj.max_addr)}")

    # Print symbols
    print(f"\nSymbols: {len(project.loader.main_object.symbols)}")
    for name, symbol in list(project.loader.main_object.symbols.items())[:10]:
        print(f"  {name}: {hex(symbol.rebased_addr)}")

    return project

# Analyze binary structure
def analyze_binary_structure(project):
    """Analyze binary structure and sections"""

    main_object = project.loader.main_object

    print("Binary Structure Analysis:")
    print(f"  Entry point: {hex(project.entry)}")
    print(f"  Architecture: {project.arch.name}")
    print(f"  Word size: {project.arch.bits} bits")
    print(f"  Endianness: {project.arch.memory_endness}")
    print(f"  Address space: {hex(main_object.min_addr)} - {hex(main_object.max_addr)}")

    # Analyze sections
    if hasattr(main_object, 'sections'):
        print(f"\nSections ({len(main_object.sections)}):")
        for section in main_object.sections:
            print(f"  {section.name}: {hex(section.vaddr)} - {hex(section.vaddr + section.memsize)} "
                  f"({section.memsize} bytes, {section.flags})")

    # Analyze segments
    if hasattr(main_object, 'segments'):
        print(f"\nSegments ({len(main_object.segments)}):")
        for segment in main_object.segments:
            print(f"  {hex(segment.vaddr)} - {hex(segment.vaddr + segment.memsize)} "
                  f"({segment.memsize} bytes, flags: {segment.flags})")

    # Find functions
    cfg = project.analyses.CFGFast()
    functions = cfg.functions

    print(f"\nFunctions found: {len(functions)}")
    for addr, func in list(functions.items())[:10]:
        print(f"  {func.name}: {hex(addr)} ({func.size} bytes)")

    return cfg

# Example usage
if __name__ == "__main__":
    # Load a binary
    binary_path = "/bin/ls"  # Example binary
    project = load_binary_advanced(binary_path)

    # Analyze structure
    cfg = analyze_binary_structure(project)

Análisis de flujo de flujo de control

# Control Flow Graph (CFG) analysis with angr
import angr
import networkx as nx
import matplotlib.pyplot as plt

class CFGAnalyzer:
    def __init__(self, project):
        self.project = project
        self.cfg = None
        self.functions = {}

    def generate_cfg(self, normalize=True, resolve_indirect_jumps=True):
        """Generate Control Flow Graph"""

        print("Generating CFG...")

        # Generate CFG with options
        self.cfg = self.project.analyses.CFGFast(
            normalize=normalize,
            resolve_indirect_jumps=resolve_indirect_jumps,
            force_complete_scan=False,
            show_progressbar=True
        )

        print(f"CFG generated: {len(self.cfg.nodes())} nodes, {len(self.cfg.edges())} edges")
        print(f"Functions discovered: {len(self.cfg.functions)}")

        return self.cfg

    def analyze_function(self, function_addr):
        """Analyze specific function"""

        if self.cfg is None:
            self.generate_cfg()

        if function_addr not in self.cfg.functions:
            print(f"Function at {hex(function_addr)} not found")
            return None

        func = self.cfg.functions[function_addr]

        analysis = {
            'name': func.name,
            'address': hex(func.addr),
            'size': func.size,
            'blocks': len(func.blocks),
            'calling_convention': func.calling_convention,
            'is_plt': func.is_plt,
            'is_simprocedure': func.is_simprocedure,
            'endpoints': [hex(ep.addr) for ep in func.endpoints],
            'callsites': [hex(cs.addr) for cs in func.get_call_sites()],
            'callers': [hex(caller.addr) for caller in func.callers],
            'callees': [hex(callee.addr) for callee in func.callees]
        }

        print(f"Function Analysis: {func.name}")
        for key, value in analysis.items():
            if isinstance(value, list) and len(value) > 5:
                print(f"  {key}: {value[:5]}... ({len(value)} total)")
            else:
                print(f"  {key}: {value}")

        return analysis

    def find_interesting_functions(self):
        """Find potentially interesting functions"""

        if self.cfg is None:
            self.generate_cfg()

        interesting = {
            'large_functions': [],
            'complex_functions': [],
            'leaf_functions': [],
            'recursive_functions': [],
            'plt_functions': []
        }

        for addr, func in self.cfg.functions.items():
            # Large functions (>1000 bytes)
            if func.size > 1000:
                interesting['large_functions'].append((func.name, hex(addr), func.size))

            # Complex functions (>20 basic blocks)
            if len(func.blocks) > 20:
                interesting['complex_functions'].append((func.name, hex(addr), len(func.blocks)))

            # Leaf functions (no callees)
            if not func.callees:
                interesting['leaf_functions'].append((func.name, hex(addr)))

            # Recursive functions
            if addr in [callee.addr for callee in func.callees]:
                interesting['recursive_functions'].append((func.name, hex(addr)))

            # PLT functions
            if func.is_plt:
                interesting['plt_functions'].append((func.name, hex(addr)))

        # Print results
        for category, functions in interesting.items():
            print(f"\n{category.replace('_', ' ').title()}: {len(functions)}")
            for func_info in functions[:5]:  # Show first 5
                print(f"  {func_info}")

        return interesting

    def analyze_call_graph(self):
        """Analyze call graph relationships"""

        if self.cfg is None:
            self.generate_cfg()

        # Build call graph
        call_graph = nx.DiGraph()

        for addr, func in self.cfg.functions.items():
            call_graph.add_node(addr, name=func.name, size=func.size)

            for callee in func.callees:
                call_graph.add_edge(addr, callee.addr)

        # Analyze call graph properties
        analysis = {
            'total_functions': len(call_graph.nodes()),
            'total_calls': len(call_graph.edges()),
            'strongly_connected_components': len(list(nx.strongly_connected_components(call_graph))),
            'weakly_connected_components': len(list(nx.weakly_connected_components(call_graph))),
            'cycles': len(list(nx.simple_cycles(call_graph))),
            'average_degree': sum(dict(call_graph.degree()).values()) / len(call_graph.nodes()) if call_graph.nodes() else 0
        }

        # Find central functions (high degree)
        degree_centrality = nx.degree_centrality(call_graph)
        central_functions = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)[:10]

        print("Call Graph Analysis:")
        for key, value in analysis.items():
            print(f"  {key}: {value}")

        print("\nMost Central Functions:")
        for addr, centrality in central_functions:
            func_name = self.cfg.functions[addr].name
            print(f"  {func_name} ({hex(addr)}): {centrality:.3f}")

        return call_graph, analysis

    def visualize_function_cfg(self, function_addr, output_file="function_cfg.png"):
        """Visualize function CFG"""

        if self.cfg is None:
            self.generate_cfg()

        if function_addr not in self.cfg.functions:
            print(f"Function at {hex(function_addr)} not found")
            return

        func = self.cfg.functions[function_addr]

        # Create graph for function
        G = nx.DiGraph()

        # Add nodes (basic blocks)
        for block in func.blocks:
            G.add_node(block.addr, label=f"{hex(block.addr)}\n{block.size} bytes")

        # Add edges
        for block in func.blocks:
            for successor in block.successors:
                G.add_edge(block.addr, successor.addr)

        # Create visualization
        plt.figure(figsize=(12, 8))
        pos = nx.spring_layout(G, k=1, iterations=50)

        # Draw nodes
        nx.draw_networkx_nodes(G, pos, node_color='lightblue', 
                              node_size=1000, alpha=0.7)

        # Draw edges
        nx.draw_networkx_edges(G, pos, edge_color='gray', 
                              arrows=True, arrowsize=20)

        # Draw labels
        labels = nx.get_node_attributes(G, 'label')
        nx.draw_networkx_labels(G, pos, labels, font_size=8)

        plt.title(f"CFG for function {func.name} ({hex(function_addr)})")
        plt.axis('off')
        plt.tight_layout()
        plt.savefig(output_file, dpi=300, bbox_inches='tight')
        plt.close()

        print(f"Function CFG saved to {output_file}")

# Example usage
def analyze_binary_cfg(binary_path):
    """Complete CFG analysis example"""

    # Load binary
    project = angr.Project(binary_path, auto_load_libs=False)

    # Create CFG analyzer
    analyzer = CFGAnalyzer(project)

    # Generate and analyze CFG
    cfg = analyzer.generate_cfg()

    # Find interesting functions
    interesting = analyzer.find_interesting_functions()

    # Analyze call graph
    call_graph, cg_analysis = analyzer.analyze_call_graph()

    # Analyze main function if available
    main_addr = project.loader.main_object.get_symbol('main')
    if main_addr:
        analyzer.analyze_function(main_addr.rebased_addr)
        analyzer.visualize_function_cfg(main_addr.rebased_addr)

    return analyzer

# Run analysis
if __name__ == "__main__":
    binary_path = "/bin/ls"
    analyzer = analyze_binary_cfg(binary_path)

Ejecución simbólica

Ejecución simbólica básica

# Symbolic execution with angr
import angr
import claripy

class SymbolicExecutor:
    def __init__(self, project):
        self.project = project
        self.simulation_manager = None
        self.initial_state = None

    def create_initial_state(self, addr=None, **kwargs):
        """Create initial state for symbolic execution"""

        if addr is None:
            addr = self.project.entry

        # Create initial state
        self.initial_state = self.project.factory.entry_state(
            addr=addr,
            add_options={
                angr.options.ABSTRACT_SOLVER,
                angr.options.ABSTRACT_MEMORY,
                angr.options.APPROXIMATE_FIRST
            },
            **kwargs
        )

        print(f"Initial state created at {hex(addr)}")
        return self.initial_state

    def setup_symbolic_input(self, input_size=100, input_name="symbolic_input"):
        """Setup symbolic input for analysis"""

        if self.initial_state is None:
            self.create_initial_state()

        # Create symbolic bitvector
        symbolic_input = claripy.BVS(input_name, input_size * 8)

        # Add constraints (printable characters)
        for i in range(input_size):
            byte = symbolic_input.get_byte(i)
            self.initial_state.solver.add(byte >= 0x20)  # Printable ASCII
            self.initial_state.solver.add(byte <= 0x7e)

        print(f"Symbolic input created: {input_name} ({input_size} bytes)")
        return symbolic_input

    def setup_symbolic_arguments(self, argc=2, argv_size=100):
        """Setup symbolic command line arguments"""

        if self.initial_state is None:
            self.create_initial_state()

        # Create symbolic argv
        argv = []
        for i in range(argc):
            arg = claripy.BVS(f"argv_{i}", argv_size * 8)

            # Add null terminator constraint
            for j in range(argv_size - 1):
                byte = arg.get_byte(j)
                self.initial_state.solver.add(byte >= 0x20)
                self.initial_state.solver.add(byte <= 0x7e)

            # Null terminate
            self.initial_state.solver.add(arg.get_byte(argv_size - 1) == 0)
            argv.append(arg)

        # Set up argv in memory
        argv_ptrs = []
        base_addr = 0x7fff0000

        for i, arg in enumerate(argv):
            arg_addr = base_addr + i * argv_size
            self.initial_state.memory.store(arg_addr, arg)
            argv_ptrs.append(arg_addr)

        # Set up argv pointer array
        argv_array_addr = base_addr + len(argv) * argv_size
        for i, ptr in enumerate(argv_ptrs):
            self.initial_state.memory.store(
                argv_array_addr + i * self.project.arch.bytes,
                ptr,
                size=self.project.arch.bytes
            )

        # Set argc and argv in registers/stack
        if self.project.arch.name == 'AMD64':
            self.initial_state.regs.rdi = argc
            self.initial_state.regs.rsi = argv_array_addr
        elif self.project.arch.name == 'X86':
            # Push argv and argc onto stack
            self.initial_state.stack_push(argv_array_addr)
            self.initial_state.stack_push(argc)

        print(f"Symbolic arguments setup: argc={argc}, argv at {hex(argv_array_addr)}")
        return argv

    def run_symbolic_execution(self, find_addrs=None, avoid_addrs=None, max_steps=1000):
        """Run symbolic execution"""

        if self.initial_state is None:
            self.create_initial_state()

        # Create simulation manager
        self.simulation_manager = self.project.factory.simulation_manager(self.initial_state)

        print("Starting symbolic execution...")

        # Run exploration
        if find_addrs or avoid_addrs:
            self.simulation_manager.explore(
                find=find_addrs,
                avoid=avoid_addrs,
                step_func=self.step_callback,
                num_find=10  # Limit number of solutions
            )
        else:
            # Step-by-step execution
            for step in range(max_steps):
                if not self.simulation_manager.active:
                    break

                self.simulation_manager.step()

                if step % 100 == 0:
                    print(f"Step {step}: {len(self.simulation_manager.active)} active states")

                # Check for interesting states
                if self.simulation_manager.deadended:
                    print(f"Found {len(self.simulation_manager.deadended)} deadended states")

                if self.simulation_manager.errored:
                    print(f"Found {len(self.simulation_manager.errored)} errored states")

        print("Symbolic execution completed")
        return self.simulation_manager

    def step_callback(self, sm):
        """Callback function for each step"""

        # Limit number of active states to prevent state explosion
        if len(sm.active) > 50:
            sm.active = sm.active[:50]

        return sm

    def analyze_found_states(self):
        """Analyze states that reached target"""

        if not self.simulation_manager or not self.simulation_manager.found:
            print("No found states to analyze")
            return []

        solutions = []

        for i, state in enumerate(self.simulation_manager.found):
            print(f"\n--- Found State {i+1} ---")
            print(f"Address: {hex(state.addr)}")

            # Get concrete values for symbolic variables
            solution = {}

            for var_name in state.solver.variables:
                if state.solver.satisfiable():
                    concrete_value = state.solver.eval(var_name, cast_to=bytes)
                    solution[str(var_name)] = concrete_value
                    print(f"  {var_name}: {concrete_value}")

            solutions.append(solution)

        return solutions

    def find_path_to_address(self, target_addr, avoid_addrs=None):
        """Find execution path to specific address"""

        if self.initial_state is None:
            self.create_initial_state()

        print(f"Finding path to {hex(target_addr)}")

        # Run exploration
        sm = self.project.factory.simulation_manager(self.initial_state)
        sm.explore(find=target_addr, avoid=avoid_addrs)

        if sm.found:
            found_state = sm.found[0]

            # Get execution trace
            trace = []
            state = found_state

            while state.history.parent is not None:
                trace.append({
                    'address': hex(state.addr),
                    'instruction': state.block().capstone.insns[0] if state.block().capstone.insns else None,
                    'constraints': len(state.solver.constraints)
                })
                state = state.history.parent

            trace.reverse()

            print(f"Path found with {len(trace)} steps:")
            for i, step in enumerate(trace[:10]):  # Show first 10 steps
                print(f"  {i}: {step['address']} - {step.get('instruction', 'N/A')}")

            return trace, found_state
        else:
            print("No path found to target address")
            return None, None

# Vulnerability discovery with symbolic execution
class VulnerabilityFinder:
    def __init__(self, project):
        self.project = project
        self.vulnerabilities = []

    def find_buffer_overflows(self, function_addr, buffer_size=100):
        """Find buffer overflow vulnerabilities"""

        print(f"Searching for buffer overflows in function {hex(function_addr)}")

        # Create initial state at function entry
        initial_state = self.project.factory.call_state(function_addr)

        # Create symbolic input buffer
        symbolic_buffer = claripy.BVS("input_buffer", buffer_size * 8)

        # Store symbolic buffer in memory
        buffer_addr = 0x7fff0000
        initial_state.memory.store(buffer_addr, symbolic_buffer)

        # Set up function arguments to point to buffer
        if self.project.arch.name == 'AMD64':
            initial_state.regs.rdi = buffer_addr
            initial_state.regs.rsi = buffer_size

        # Run symbolic execution
        sm = self.project.factory.simulation_manager(initial_state)

        # Look for crashes or overwrites
        crash_addrs = []

        try:
            sm.run(until=lambda sm: len(sm.active) == 0 or len(sm.errored) > 0)

            # Check for errors that might indicate buffer overflow
            for errored_state in sm.errored:
                error = errored_state.error

                if "Segmentation fault" in str(error) or "Invalid memory access" in str(error):
                    # Potential buffer overflow
                    vulnerability = {
                        'type': 'buffer_overflow',
                        'function': hex(function_addr),
                        'error_addr': hex(errored_state.state.addr),
                        'error': str(error),
                        'input_constraints': errored_state.state.solver.constraints
                    }

                    self.vulnerabilities.append(vulnerability)
                    print(f"Potential buffer overflow found at {hex(errored_state.state.addr)}")

        except Exception as e:
            print(f"Error during buffer overflow analysis: {e}")

        return self.vulnerabilities

    def find_format_string_bugs(self, printf_addr):
        """Find format string vulnerabilities"""

        print(f"Searching for format string bugs at {hex(printf_addr)}")

        # Create state at printf call
        initial_state = self.project.factory.call_state(printf_addr)

        # Create symbolic format string
        format_string = claripy.BVS("format_string", 100 * 8)

        # Add constraints for format string characters
        for i in range(100):
            byte = format_string.get_byte(i)
            initial_state.solver.add(claripy.Or(
                claripy.And(byte >= 0x20, byte <= 0x7e),  # Printable
                byte == 0  # Null terminator
            ))

        # Set format string as first argument
        format_addr = 0x7fff1000
        initial_state.memory.store(format_addr, format_string)

        if self.project.arch.name == 'AMD64':
            initial_state.regs.rdi = format_addr

        # Look for format string specifiers that could be dangerous
        dangerous_patterns = [b'%n', b'%s', b'%x']

        for pattern in dangerous_patterns:
            # Check if format string can contain dangerous pattern
            pattern_bv = claripy.BVV(pattern)

            # This is a simplified check - in practice, you'd need more sophisticated analysis
            if initial_state.solver.satisfiable(extra_constraints=[
                format_string.get_bytes(0, len(pattern)) == pattern_bv
            ]):
                vulnerability = {
                    'type': 'format_string',
                    'function': hex(printf_addr),
                    'pattern': pattern.decode(),
                    'description': f"Format string can contain {pattern.decode()}"
                }

                self.vulnerabilities.append(vulnerability)
                print(f"Potential format string vulnerability: {pattern.decode()}")

        return self.vulnerabilities

    def find_integer_overflows(self, function_addr):
        """Find integer overflow vulnerabilities"""

        print(f"Searching for integer overflows in function {hex(function_addr)}")

        # Create initial state
        initial_state = self.project.factory.call_state(function_addr)

        # Create symbolic integers
        int1 = claripy.BVS("int1", 32)
        int2 = claripy.BVS("int2", 32)

        # Set up function arguments
        if self.project.arch.name == 'AMD64':
            initial_state.regs.rdi = int1.zero_extend(32)
            initial_state.regs.rsi = int2.zero_extend(32)

        # Run symbolic execution
        sm = self.project.factory.simulation_manager(initial_state)

        try:
            sm.run(until=lambda sm: len(sm.active) == 0)

            # Check for integer overflow conditions
            for state in sm.deadended:
                # Look for arithmetic operations that could overflow
                for action in state.history.actions:
                    if hasattr(action, 'op') and action.op in ['__add__', '__mul__', '__sub__']:
                        # Check if result could overflow
                        if state.solver.satisfiable(extra_constraints=[
                            action.result > 0xffffffff  # 32-bit overflow
                        ]):
                            vulnerability = {
                                'type': 'integer_overflow',
                                'function': hex(function_addr),
                                'operation': action.op,
                                'address': hex(state.addr)
                            }

                            self.vulnerabilities.append(vulnerability)
                            print(f"Potential integer overflow in {action.op} at {hex(state.addr)}")

        except Exception as e:
            print(f"Error during integer overflow analysis: {e}")

        return self.vulnerabilities

# Example usage
def symbolic_execution_example(binary_path):
    """Complete symbolic execution example"""

    # Load binary
    project = angr.Project(binary_path, auto_load_libs=False)

    # Create symbolic executor
    executor = SymbolicExecutor(project)

    # Setup symbolic input
    executor.create_initial_state()
    symbolic_input = executor.setup_symbolic_input(input_size=50)

    # Find main function
    cfg = project.analyses.CFGFast()
    main_func = None

    for addr, func in cfg.functions.items():
        if func.name == 'main':
            main_func = func
            break

    if main_func:
        print(f"Found main function at {hex(main_func.addr)}")

        # Find interesting addresses in main
        interesting_addrs = []
        for block in main_func.blocks:
            # Look for calls to interesting functions
            for insn in block.capstone.insns:
                if insn.mnemonic == 'call':
                    interesting_addrs.append(insn.address)

        if interesting_addrs:
            # Run symbolic execution to reach interesting addresses
            sm = executor.run_symbolic_execution(
                find_addrs=interesting_addrs[:3],  # First 3 interesting addresses
                max_steps=500
            )

            # Analyze results
            solutions = executor.analyze_found_states()

            return executor, solutions

    return executor, []

# Run example
if __name__ == "__main__":
    binary_path = "/bin/ls"
    executor, solutions = symbolic_execution_example(binary_path)
    print(f"Found {len(solutions)} solutions")

Técnicas avanzadas de ejecución simbólica

# Advanced symbolic execution techniques
import angr
import claripy
import itertools

class AdvancedSymbolicAnalysis:
    def __init__(self, project):
        self.project = project
        self.custom_hooks = {}
        self.analysis_results = {}

    def setup_custom_hooks(self):
        """Setup custom hooks for library functions"""

        # Hook malloc to track allocations
        @self.project.hook_symbol('malloc')
        def malloc_hook(state):
            size = state.solver.eval(state.regs.rdi)  # First argument

            # Allocate memory
            addr = state.heap.allocate(size)

            # Track allocation
            if not hasattr(state.globals, 'allocations'):
                state.globals['allocations'] = {}

            state.globals['allocations'][addr] = {
                'size': size,
                'allocated_at': state.addr
            }

            state.regs.rax = addr  # Return address
            print(f"malloc({size}) = {hex(addr)}")

        # Hook free to track deallocations
        @self.project.hook_symbol('free')
        def free_hook(state):
            addr = state.solver.eval(state.regs.rdi)

            if hasattr(state.globals, 'allocations') and addr in state.globals['allocations']:
                del state.globals['allocations'][addr]
                print(f"free({hex(addr)})")

            # Mark memory as freed (for use-after-free detection)
            if not hasattr(state.globals, 'freed_memory'):
                state.globals['freed_memory'] = set()

            state.globals['freed_memory'].add(addr)

        # Hook strcpy to detect buffer overflows
        @self.project.hook_symbol('strcpy')
        def strcpy_hook(state):
            dest = state.solver.eval(state.regs.rdi)
            src = state.solver.eval(state.regs.rsi)

            # Read source string
            src_data = state.memory.load(src, 1000)  # Max 1000 bytes

            # Find null terminator
            null_pos = None
            for i in range(1000):
                byte = src_data.get_byte(i)
                if state.solver.eval(byte) == 0:
                    null_pos = i
                    break

            if null_pos is not None:
                # Copy string including null terminator
                string_data = src_data.get_bytes(0, null_pos + 1)
                state.memory.store(dest, string_data)

                print(f"strcpy({hex(dest)}, {hex(src)}) - copied {null_pos + 1} bytes")

                # Check for potential buffer overflow
                if hasattr(state.globals, 'allocations') and dest in state.globals['allocations']:
                    allocated_size = state.globals['allocations'][dest]['size']
                    if null_pos + 1 > allocated_size:
                        print(f"WARNING: Potential buffer overflow! Copied {null_pos + 1} bytes to {allocated_size} byte buffer")

            state.regs.rax = dest  # Return destination

        print("Custom hooks installed")

    def concolic_execution(self, initial_inputs, target_function):
        """Perform concolic execution with concrete and symbolic inputs"""

        results = []

        for concrete_input in initial_inputs:
            print(f"Concolic execution with input: {concrete_input}")

            # Create initial state
            state = self.project.factory.entry_state()

            # Set up concrete input
            input_addr = 0x7fff0000
            state.memory.store(input_addr, concrete_input)

            # Create symbolic version of input
            symbolic_input = claripy.BVS("symbolic_input", len(concrete_input) * 8)

            # Add constraint that symbolic input equals concrete input initially
            state.solver.add(symbolic_input == claripy.BVV(concrete_input))

            # Run execution
            sm = self.project.factory.simulation_manager(state)
            sm.explore(find=target_function)

            if sm.found:
                found_state = sm.found[0]

                # Generate new inputs by negating path constraints
                new_inputs = self.generate_new_inputs(found_state, symbolic_input)
                results.extend(new_inputs)

        return results

    def generate_new_inputs(self, state, symbolic_var):
        """Generate new inputs by negating path constraints"""

        new_inputs = []
        constraints = state.solver.constraints

        # Try negating each constraint to explore different paths
        for i, constraint in enumerate(constraints):
            # Create new constraint set with negated constraint
            new_constraints = constraints[:i] + [claripy.Not(constraint)] + constraints[i+1:]

            # Check if new constraint set is satisfiable
            if state.solver.satisfiable(extra_constraints=new_constraints):
                # Generate concrete input
                new_input = state.solver.eval(symbolic_var, extra_constraints=new_constraints, cast_to=bytes)
                new_inputs.append(new_input)

        return new_inputs

    def taint_analysis(self, taint_sources, sink_functions):
        """Perform taint analysis to track data flow"""

        print("Starting taint analysis...")

        # Create initial state with taint tracking
        initial_state = self.project.factory.entry_state(
            add_options={angr.options.TRACK_MEMORY_ACTIONS}
        )

        # Mark taint sources
        for source_addr in taint_sources:
            # Create tainted symbolic variable
            tainted_data = claripy.BVS(f"tainted_{hex(source_addr)}", 64)
            initial_state.memory.store(source_addr, tainted_data)

            # Track taint
            if not hasattr(initial_state.globals, 'tainted_data'):
                initial_state.globals['tainted_data'] = set()

            initial_state.globals['tainted_data'].add(tainted_data)

        # Run symbolic execution
        sm = self.project.factory.simulation_manager(initial_state)

        taint_flows = []

        def check_taint_at_sinks(state):
            """Check if tainted data reaches sink functions"""

            if state.addr in sink_functions:
                sink_name = sink_functions[state.addr]

                # Check function arguments for taint
                if self.project.arch.name == 'AMD64':
                    args = [state.regs.rdi, state.regs.rsi, state.regs.rdx, state.regs.rcx]
                else:
                    # Get arguments from stack for x86
                    args = []
                    for i in range(4):
                        arg = state.memory.load(state.regs.esp + (i + 1) * 4, 4)
                        args.append(arg)

                for i, arg in enumerate(args):
                    if hasattr(state.globals, 'tainted_data'):
                        for tainted_var in state.globals['tainted_data']:
                            if state.solver.satisfiable(extra_constraints=[arg == tainted_var]):
                                taint_flow = {
                                    'sink_function': sink_name,
                                    'sink_address': hex(state.addr),
                                    'argument_index': i,
                                    'tainted_variable': str(tainted_var)
                                }
                                taint_flows.append(taint_flow)
                                print(f"Taint flow detected: {tainted_var} -> {sink_name} arg {i}")

        # Step through execution and check for taint at each step
        for _ in range(1000):  # Limit steps
            if not sm.active:
                break

            sm.step()

            for state in sm.active:
                check_taint_at_sinks(state)

        return taint_flows

    def constraint_solving_optimization(self, state, optimization_target):
        """Optimize constraint solving for specific targets"""

        # Use different solvers for different types of constraints
        if 'crypto' in optimization_target.lower():
            # Use specialized solver for cryptographic constraints
            state.solver._solver = claripy.SolverCacheless(backend=claripy.backends.z3)

        elif 'arithmetic' in optimization_target.lower():
            # Use arithmetic-optimized solver
            state.solver._solver = claripy.SolverCacheless(backend=claripy.backends.vsa)

        # Add solver optimizations
        state.solver.simplify()

        # Cache frequently used constraints
        if not hasattr(state.globals, 'constraint_cache'):
            state.globals['constraint_cache'] = {}

        return state

    def path_explosion_mitigation(self, sm, max_active_states=50):
        """Mitigate path explosion in symbolic execution"""

        if len(sm.active) > max_active_states:
            # Prioritize states based on different criteria
            scored_states = []

            for state in sm.active:
                score = 0

                # Prefer states with fewer constraints (simpler paths)
                score += 1000 / (len(state.solver.constraints) + 1)

                # Prefer states that have made more progress
                score += state.history.depth

                # Prefer states in interesting code regions
                if self.is_interesting_address(state.addr):
                    score += 500

                scored_states.append((score, state))

            # Keep top-scored states
            scored_states.sort(key=lambda x: x[0], reverse=True)
            sm.active = [state for score, state in scored_states[:max_active_states]]

            print(f"Pruned to {len(sm.active)} active states")

        return sm

    def is_interesting_address(self, addr):
        """Check if address is in interesting code region"""

        # Check if address is in main executable (not libraries)
        main_object = self.project.loader.main_object
        return main_object.min_addr <= addr <= main_object.max_addr

    def symbolic_memory_modeling(self, state, memory_model='flat'):
        """Configure symbolic memory modeling"""

        if memory_model == 'flat':
            # Simple flat memory model
            state.options.add(angr.options.ABSTRACT_MEMORY)

        elif memory_model == 'paged':
            # Paged memory model for better performance
            state.options.add(angr.options.FAST_MEMORY)
            state.options.add(angr.options.ABSTRACT_MEMORY)

        elif memory_model == 'concrete':
            # Concrete memory with symbolic overlay
            state.options.add(angr.options.CONCRETE_MEMORY)
            state.options.add(angr.options.SYMBOLIC_INITIAL_VALUES)

        return state

    def run_comprehensive_analysis(self, target_function, analysis_config):
        """Run comprehensive symbolic analysis"""

        print("Starting comprehensive symbolic analysis...")

        # Setup hooks
        self.setup_custom_hooks()

        # Create initial state
        initial_state = self.project.factory.entry_state()

        # Configure memory model
        initial_state = self.symbolic_memory_modeling(
            initial_state, 
            analysis_config.get('memory_model', 'flat')
        )

        # Setup symbolic inputs
        if 'symbolic_inputs' in analysis_config:
            for input_config in analysis_config['symbolic_inputs']:
                symbolic_var = claripy.BVS(
                    input_config['name'], 
                    input_config['size'] * 8
                )
                initial_state.memory.store(input_config['address'], symbolic_var)

        # Run symbolic execution
        sm = self.project.factory.simulation_manager(initial_state)

        # Configure exploration
        find_addrs = analysis_config.get('find_addresses', [])
        avoid_addrs = analysis_config.get('avoid_addresses', [])

        step_count = 0
        max_steps = analysis_config.get('max_steps', 1000)

        while sm.active and step_count < max_steps:
            sm.step()
            step_count += 1

            # Mitigate path explosion
            sm = self.path_explosion_mitigation(sm)

            # Check for target addresses
            if find_addrs:
                found_states = [s for s in sm.active if s.addr in find_addrs]
                if found_states:
                    sm.found.extend(found_states)
                    sm.active = [s for s in sm.active if s.addr not in find_addrs]

            # Avoid specified addresses
            if avoid_addrs:
                sm.active = [s for s in sm.active if s.addr not in avoid_addrs]

            if step_count % 100 == 0:
                print(f"Step {step_count}: {len(sm.active)} active, {len(sm.found)} found")

        # Analyze results
        results = {
            'found_states': len(sm.found),
            'deadended_states': len(sm.deadended),
            'errored_states': len(sm.errored),
            'total_steps': step_count,
            'solutions': []
        }

        # Extract solutions from found states
        for state in sm.found:
            solution = {}
            for var_name in state.solver.variables:
                if state.solver.satisfiable():
                    concrete_value = state.solver.eval(var_name, cast_to=bytes)
                    solution[str(var_name)] = concrete_value

            results['solutions'].append(solution)

        self.analysis_results[target_function] = results
        return results

# Example usage
def advanced_symbolic_analysis_example():
    """Example of advanced symbolic analysis"""

    binary_path = "/bin/ls"
    project = angr.Project(binary_path, auto_load_libs=False)

    analyzer = AdvancedSymbolicAnalysis(project)

    # Configuration for analysis
    analysis_config = {
        'memory_model': 'paged',
        'max_steps': 500,
        'symbolic_inputs': [
            {
                'name': 'user_input',
                'address': 0x7fff0000,
                'size': 100
            }
        ],
        'find_addresses': [],  # Add target addresses
        'avoid_addresses': []  # Add addresses to avoid
    }

    # Run analysis
    results = analyzer.run_comprehensive_analysis(
        target_function=project.entry,
        analysis_config=analysis_config
    )

    print(f"Analysis completed: {results}")
    return analyzer, results

if __name__ == "__main__":
    analyzer, results = advanced_symbolic_analysis_example()

Análisis estadístico y descubrimiento de código

Función y análisis del código

# Static analysis and code discovery with angr
import angr
import networkx as nx
from collections import defaultdict

class StaticAnalyzer:
    def __init__(self, project):
        self.project = project
        self.cfg = None
        self.functions = {}
        self.call_graph = None
        self.analysis_cache = {}

    def generate_comprehensive_cfg(self):
        """Generate comprehensive CFG with multiple techniques"""

        print("Generating comprehensive CFG...")

        # Generate fast CFG first
        cfg_fast = self.project.analyses.CFGFast(
            normalize=True,
            resolve_indirect_jumps=True,
            force_complete_scan=False
        )

        # Generate accurate CFG for better precision
        cfg_accurate = self.project.analyses.CFGEmulated(
            context_sensitivity_level=1,
            keep_state=True,
            state_add_options=angr.sim_options.refs,
            state_remove_options=angr.sim_options.simplification
        )

        # Combine results
        self.cfg = cfg_accurate if cfg_accurate.functions else cfg_fast
        self.functions = self.cfg.functions

        print(f"CFG generated: {len(self.cfg.nodes())} nodes, {len(self.functions)} functions")
        return self.cfg

    def analyze_function_complexity(self, function_addr):
        """Analyze function complexity metrics"""

        if function_addr not in self.functions:
            return None

        func = self.functions[function_addr]

        # Basic metrics
        complexity = {
            'name': func.name,
            'address': hex(func.addr),
            'size': func.size,
            'basic_blocks': len(func.blocks),
            'instructions': sum(block.instructions for block in func.blocks),
            'cyclomatic_complexity': 0,
            'call_sites': len(func.get_call_sites()),
            'callers': len(func.callers),
            'callees': len(func.callees),
            'loops': 0,
            'depth': 0
        }

        # Calculate cyclomatic complexity
        # V(G) = E - N + 2P (where E=edges, N=nodes, P=connected components)
        if func.graph:
            edges = len(func.graph.edges())
            nodes = len(func.graph.nodes())
            complexity['cyclomatic_complexity'] = edges - nodes + 2

        # Detect loops
        if func.graph:
            try:
                cycles = list(nx.simple_cycles(func.graph))
                complexity['loops'] = len(cycles)
            except:
                complexity['loops'] = 0

        # Calculate call depth
        complexity['depth'] = self.calculate_call_depth(func)

        # Analyze instruction types
        instruction_types = defaultdict(int)
        for block in func.blocks:
            for insn in block.capstone.insns:
                instruction_types[insn.mnemonic] += 1

        complexity['instruction_distribution'] = dict(instruction_types)

        # Complexity classification
        if complexity['cyclomatic_complexity'] > 20:
            complexity['complexity_level'] = 'very_high'
        elif complexity['cyclomatic_complexity'] > 10:
            complexity['complexity_level'] = 'high'
        elif complexity['cyclomatic_complexity'] > 5:
            complexity['complexity_level'] = 'medium'
        else:
            complexity['complexity_level'] = 'low'

        return complexity

    def calculate_call_depth(self, func, visited=None, depth=0):
        """Calculate maximum call depth from function"""

        if visited is None:
            visited = set()

        if func.addr in visited or depth > 20:  # Prevent infinite recursion
            return depth

        visited.add(func.addr)
        max_depth = depth

        for callee in func.callees:
            if callee.addr in self.functions:
                callee_func = self.functions[callee.addr]
                callee_depth = self.calculate_call_depth(callee_func, visited.copy(), depth + 1)
                max_depth = max(max_depth, callee_depth)

        return max_depth

    def find_code_patterns(self):
        """Find interesting code patterns"""

        patterns = {
            'crypto_functions': [],
            'string_functions': [],
            'file_operations': [],
            'network_operations': [],
            'system_calls': [],
            'anti_analysis': [],
            'obfuscation': []
        }

        for addr, func in self.functions.items():
            func_name = func.name.lower()

            # Cryptographic functions
            crypto_keywords = ['crypt', 'hash', 'md5', 'sha', 'aes', 'des', 'rsa', 'cipher']
            if any(keyword in func_name for keyword in crypto_keywords):
                patterns['crypto_functions'].append((func.name, hex(addr)))

            # String manipulation functions
            string_keywords = ['str', 'mem', 'copy', 'cmp', 'len', 'cat', 'chr']
            if any(keyword in func_name for keyword in string_keywords):
                patterns['string_functions'].append((func.name, hex(addr)))

            # File operations
            file_keywords = ['file', 'open', 'read', 'write', 'close', 'fopen', 'fread']
            if any(keyword in func_name for keyword in file_keywords):
                patterns['file_operations'].append((func.name, hex(addr)))

            # Network operations
            network_keywords = ['socket', 'connect', 'send', 'recv', 'bind', 'listen', 'accept']
            if any(keyword in func_name for keyword in network_keywords):
                patterns['network_operations'].append((func.name, hex(addr)))

            # System calls
            syscall_keywords = ['system', 'exec', 'fork', 'exit', 'kill', 'signal']
            if any(keyword in func_name for keyword in syscall_keywords):
                patterns['system_calls'].append((func.name, hex(addr)))

            # Anti-analysis techniques
            anti_keywords = ['debug', 'trace', 'ptrace', 'isdebuggerpresent', 'checkremotedebugger']
            if any(keyword in func_name for keyword in anti_keywords):
                patterns['anti_analysis'].append((func.name, hex(addr)))

            # Analyze function content for patterns
            self.analyze_function_content(func, patterns)

        # Print results
        for category, functions in patterns.items():
            if functions:
                print(f"\n{category.replace('_', ' ').title()}: {len(functions)}")
                for func_name, addr in functions[:5]:  # Show first 5
                    print(f"  {func_name} at {addr}")

        return patterns

    def analyze_function_content(self, func, patterns):
        """Analyze function content for specific patterns"""

        for block in func.blocks:
            for insn in block.capstone.insns:
                mnemonic = insn.mnemonic.lower()

                # Look for obfuscation patterns
                if mnemonic in ['nop', 'xor', 'rol', 'ror', 'shl', 'shr']:
                    # Check for excessive use of these instructions
                    obfuscation_count = sum(1 for b in func.blocks 
                                          for i in b.capstone.insns 
                                          if i.mnemonic.lower() in ['nop', 'xor', 'rol', 'ror'])

                    if obfuscation_count > 10:  # Threshold for obfuscation
                        if (func.name, hex(func.addr)) not in patterns['obfuscation']:
                            patterns['obfuscation'].append((func.name, hex(func.addr)))

                # Look for anti-debugging instructions
                if mnemonic in ['rdtsc', 'cpuid', 'int']:
                    if (func.name, hex(func.addr)) not in patterns['anti_analysis']:
                        patterns['anti_analysis'].append((func.name, hex(func.addr)))

    def build_call_graph(self):
        """Build comprehensive call graph"""

        if self.cfg is None:
            self.generate_comprehensive_cfg()

        self.call_graph = nx.DiGraph()

        # Add nodes (functions)
        for addr, func in self.functions.items():
            self.call_graph.add_node(addr, 
                                   name=func.name, 
                                   size=func.size,
                                   complexity=len(func.blocks))

        # Add edges (calls)
        for addr, func in self.functions.items():
            for callee in func.callees:
                if callee.addr in self.functions:
                    self.call_graph.add_edge(addr, callee.addr)

        print(f"Call graph built: {len(self.call_graph.nodes())} functions, {len(self.call_graph.edges())} calls")
        return self.call_graph

    def analyze_call_graph_properties(self):
        """Analyze call graph properties"""

        if self.call_graph is None:
            self.build_call_graph()

        analysis = {
            'total_functions': len(self.call_graph.nodes()),
            'total_calls': len(self.call_graph.edges()),
            'strongly_connected_components': len(list(nx.strongly_connected_components(self.call_graph))),
            'weakly_connected_components': len(list(nx.weakly_connected_components(self.call_graph))),
            'average_degree': sum(dict(self.call_graph.degree()).values()) / len(self.call_graph.nodes()) if self.call_graph.nodes() else 0,
            'density': nx.density(self.call_graph),
            'diameter': 0,
            'radius': 0
        }

        # Calculate diameter and radius (for connected components)
        try:
            if nx.is_weakly_connected(self.call_graph):
                undirected = self.call_graph.to_undirected()
                analysis['diameter'] = nx.diameter(undirected)
                analysis['radius'] = nx.radius(undirected)
        except:
            pass

        # Find central functions
        centrality_measures = {
            'degree_centrality': nx.degree_centrality(self.call_graph),
            'betweenness_centrality': nx.betweenness_centrality(self.call_graph),
            'closeness_centrality': nx.closeness_centrality(self.call_graph),
            'pagerank': nx.pagerank(self.call_graph)
        }

        # Get top functions for each centrality measure
        analysis['central_functions'] = {}
        for measure, values in centrality_measures.items():
            top_functions = sorted(values.items(), key=lambda x: x[1], reverse=True)[:5]
            analysis['central_functions'][measure] = [
                (self.functions[addr].name, hex(addr), score) 
                for addr, score in top_functions
            ]

        return analysis

    def find_entry_points(self):
        """Find all possible entry points"""

        entry_points = {
            'main_entry': self.project.entry,
            'exported_functions': [],
            'constructor_functions': [],
            'exception_handlers': [],
            'signal_handlers': []
        }

        # Find exported functions
        for symbol in self.project.loader.main_object.symbols:
            if symbol.is_export and symbol.rebased_addr in self.functions:
                entry_points['exported_functions'].append({
                    'name': symbol.name,
                    'address': hex(symbol.rebased_addr),
                    'type': 'export'
                })

        # Find constructor functions
        for addr, func in self.functions.items():
            if 'init' in func.name.lower() or 'constructor' in func.name.lower():
                entry_points['constructor_functions'].append({
                    'name': func.name,
                    'address': hex(addr),
                    'type': 'constructor'
                })

        # Find exception handlers (simplified)
        try:
            if hasattr(self.project.loader.main_object, 'exception_handoff_table'):
                for handler in self.project.loader.main_object.exception_handoff_table:
                    if handler in self.functions:
                        entry_points['exception_handlers'].append({
                            'name': self.functions[handler].name,
                            'address': hex(handler),
                            'type': 'exception_handler'
                        })
        except:
            pass

        return entry_points

    def analyze_data_references(self):
        """Analyze data references and constants"""

        data_analysis = {
            'string_references': [],
            'numeric_constants': [],
            'function_pointers': [],
            'global_variables': []
        }

        # Analyze each function for data references
        for addr, func in self.functions.items():
            for block in func.blocks:
                for insn in block.capstone.insns:
                    # Look for memory references
                    if insn.operands:
                        for operand in insn.operands:
                            if operand.type == 3:  # Memory operand
                                mem_addr = operand.value.mem.disp

                                # Check if it's a string reference
                                try:
                                    string_data = self.project.loader.memory.load(mem_addr, 100)
                                    if self.is_printable_string(string_data):
                                        data_analysis['string_references'].append({
                                            'function': func.name,
                                            'address': hex(mem_addr),
                                            'string': string_data.decode('ascii', errors='ignore')[:50]
                                        })
                                except:
                                    pass

                                # Check if it's a function pointer
                                if mem_addr in self.functions:
                                    data_analysis['function_pointers'].append({
                                        'function': func.name,
                                        'target': self.functions[mem_addr].name,
                                        'address': hex(mem_addr)
                                    })

                            elif operand.type == 2:  # Immediate operand
                                value = operand.value.imm

                                # Collect interesting constants
                                if self.is_interesting_constant(value):
                                    data_analysis['numeric_constants'].append({
                                        'function': func.name,
                                        'value': hex(value),
                                        'decimal': value,
                                        'instruction': f"{insn.mnemonic} {insn.op_str}"
                                    })

        return data_analysis

    def is_printable_string(self, data):
        """Check if data represents a printable string"""

        if len(data) < 4:
            return False

        # Check for null terminator
        null_pos = data.find(b'\x00')
        if null_pos == -1:
            return False

        string_data = data[:null_pos]

        # Check if mostly printable ASCII
        printable_count = sum(1 for byte in string_data if 32 <= byte <= 126)
        return printable_count / len(string_data) > 0.8 if string_data else False

    def is_interesting_constant(self, value):
        """Check if constant is potentially interesting"""

        # Common interesting constants
        interesting_values = {
            0x41414141,  # 'AAAA'
            0x42424242,  # 'BBBB'
            0xdeadbeef,
            0xcafebabe,
            0xfeedface,
            0x12345678,
            0x87654321
        }

        if value in interesting_values:
            return True

        # Large constants that might be addresses or keys
        if value > 0x10000000:
            return True

        # Powers of 2
        if value > 0 and (value & (value - 1)) == 0:
            return True

        return False

    def generate_analysis_report(self, output_file="static_analysis_report.txt"):
        """Generate comprehensive static analysis report"""

        print("Generating static analysis report...")

        # Run all analyses
        if self.cfg is None:
            self.generate_comprehensive_cfg()

        patterns = self.find_code_patterns()
        call_graph_analysis = self.analyze_call_graph_properties()
        entry_points = self.find_entry_points()
        data_analysis = self.analyze_data_references()

        # Generate report
        with open(output_file, 'w') as f:
            f.write("STATIC ANALYSIS REPORT\n")
            f.write("=" * 50 + "\n\n")

            # Binary information
            f.write("BINARY INFORMATION\n")
            f.write("-" * 20 + "\n")
            f.write(f"Architecture: {self.project.arch.name}\n")
            f.write(f"Entry point: {hex(self.project.entry)}\n")
            f.write(f"Base address: {hex(self.project.loader.main_object.min_addr)}\n")
            f.write(f"Binary size: {self.project.loader.main_object.max_addr - self.project.loader.main_object.min_addr} bytes\n")
            f.write(f"Functions discovered: {len(self.functions)}\n\n")

            # Call graph analysis
            f.write("CALL GRAPH ANALYSIS\n")
            f.write("-" * 20 + "\n")
            for key, value in call_graph_analysis.items():
                if key != 'central_functions':
                    f.write(f"{key}: {value}\n")

            f.write("\nCentral Functions:\n")
            for measure, functions in call_graph_analysis['central_functions'].items():
                f.write(f"  {measure}:\n")
                for name, addr, score in functions:
                    f.write(f"    {name} ({addr}): {score:.3f}\n")
            f.write("\n")

            # Code patterns
            f.write("CODE PATTERNS\n")
            f.write("-" * 20 + "\n")
            for category, functions in patterns.items():
                if functions:
                    f.write(f"{category}: {len(functions)} functions\n")
                    for name, addr in functions[:3]:  # Show first 3
                        f.write(f"  {name} at {addr}\n")
            f.write("\n")

            # Entry points
            f.write("ENTRY POINTS\n")
            f.write("-" * 20 + "\n")
            f.write(f"Main entry: {hex(entry_points['main_entry'])}\n")
            f.write(f"Exported functions: {len(entry_points['exported_functions'])}\n")
            f.write(f"Constructor functions: {len(entry_points['constructor_functions'])}\n")
            f.write("\n")

            # Data analysis
            f.write("DATA ANALYSIS\n")
            f.write("-" * 20 + "\n")
            f.write(f"String references: {len(data_analysis['string_references'])}\n")
            f.write(f"Numeric constants: {len(data_analysis['numeric_constants'])}\n")
            f.write(f"Function pointers: {len(data_analysis['function_pointers'])}\n")
            f.write("\n")

            # Top complex functions
            f.write("COMPLEX FUNCTIONS\n")
            f.write("-" * 20 + "\n")
            complex_functions = []
            for addr, func in self.functions.items():
                complexity = self.analyze_function_complexity(addr)
                if complexity and complexity['cyclomatic_complexity'] > 10:
                    complex_functions.append(complexity)

            complex_functions.sort(key=lambda x: x['cyclomatic_complexity'], reverse=True)

            for func in complex_functions[:10]:  # Top 10
                f.write(f"{func['name']} ({func['address']}): "
                       f"CC={func['cyclomatic_complexity']}, "
                       f"Size={func['size']}, "
                       f"Blocks={func['basic_blocks']}\n")

        print(f"Static analysis report saved to {output_file}")
        return output_file

# Example usage
def comprehensive_static_analysis(binary_path):
    """Perform comprehensive static analysis"""

    # Load binary
    project = angr.Project(binary_path, auto_load_libs=False)

    # Create analyzer
    analyzer = StaticAnalyzer(project)

    # Generate CFG
    cfg = analyzer.generate_comprehensive_cfg()

    # Analyze specific functions
    main_symbol = project.loader.main_object.get_symbol('main')
    if main_symbol:
        main_complexity = analyzer.analyze_function_complexity(main_symbol.rebased_addr)
        print(f"Main function complexity: {main_complexity}")

    # Generate comprehensive report
    report_file = analyzer.generate_analysis_report()

    return analyzer, report_file

if __name__ == "__main__":
    binary_path = "/bin/ls"
    analyzer, report = comprehensive_static_analysis(binary_path)
    print(f"Analysis completed. Report: {report}")

Automatización e integración

Pipelines de análisis automatizados

# Automated analysis pipelines with angr
import angr
import json
import time
import logging
import multiprocessing
from pathlib import Path
from datetime import datetime

class AnalysisPipeline:
    def __init__(self, config_file=None):
        self.config = self.load_config(config_file)
        self.setup_logging()
        self.results = {}
        self.analysis_queue = multiprocessing.Queue()
        self.result_queue = multiprocessing.Queue()

    def load_config(self, config_file):
        """Load pipeline configuration"""

        default_config = {
            "analysis_modules": [
                "cfg_analysis",
                "function_analysis",
                "symbolic_execution",
                "vulnerability_detection",
                "static_analysis"
            ],
            "symbolic_execution": {
                "max_steps": 1000,
                "max_states": 50,
                "timeout": 300
            },
            "vulnerability_detection": {
                "buffer_overflow": True,
                "format_string": True,
                "integer_overflow": True,
                "use_after_free": True
            },
            "output": {
                "formats": ["json", "html", "txt"],
                "directory": "./analysis_results"
            },
            "performance": {
                "parallel_workers": 4,
                "memory_limit": "4GB",
                "timeout_per_binary": 1800
            }
        }

        if config_file and Path(config_file).exists():
            with open(config_file, 'r') as f:
                user_config = json.load(f)
                # Merge configurations
                for key, value in user_config.items():
                    if isinstance(value, dict) and key in default_config:
                        default_config[key].update(value)
                    else:
                        default_config[key] = value

        return default_config

    def setup_logging(self):
        """Setup logging for the pipeline"""

        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('analysis_pipeline.log'),
                logging.StreamHandler()
            ]
        )

        self.logger = logging.getLogger(__name__)

        # Reduce angr logging verbosity
        logging.getLogger('angr').setLevel(logging.WARNING)
        logging.getLogger('cle').setLevel(logging.WARNING)
        logging.getLogger('pyvex').setLevel(logging.WARNING)

    def analyze_binary(self, binary_path, analysis_id=None):
        """Analyze single binary"""

        if analysis_id is None:
            analysis_id = f"analysis_{int(time.time())}"

        self.logger.info(f"Starting analysis of {binary_path} (ID: {analysis_id})")

        start_time = time.time()
        analysis_result = {
            'binary_path': str(binary_path),
            'analysis_id': analysis_id,
            'start_time': datetime.now().isoformat(),
            'status': 'running',
            'modules': {},
            'errors': [],
            'warnings': []
        }

        try:
            # Load binary
            project = angr.Project(binary_path, auto_load_libs=False)
            analysis_result['binary_info'] = {
                'architecture': project.arch.name,
                'entry_point': hex(project.entry),
                'base_address': hex(project.loader.main_object.min_addr),
                'size': project.loader.main_object.max_addr - project.loader.main_object.min_addr
            }

            # Run analysis modules
            for module in self.config['analysis_modules']:
                try:
                    self.logger.info(f"Running module: {module}")
                    module_result = self.run_analysis_module(project, module)
                    analysis_result['modules'][module] = module_result

                except Exception as e:
                    error_msg = f"Error in module {module}: {str(e)}"
                    self.logger.error(error_msg)
                    analysis_result['errors'].append(error_msg)

            analysis_result['status'] = 'completed'

        except Exception as e:
            error_msg = f"Failed to analyze {binary_path}: {str(e)}"
            self.logger.error(error_msg)
            analysis_result['status'] = 'failed'
            analysis_result['errors'].append(error_msg)

        finally:
            analysis_result['end_time'] = datetime.now().isoformat()
            analysis_result['duration'] = time.time() - start_time

            self.results[analysis_id] = analysis_result
            self.save_results(analysis_result)

        return analysis_result

    def run_analysis_module(self, project, module_name):
        """Run specific analysis module"""

        if module_name == "cfg_analysis":
            return self.run_cfg_analysis(project)
        elif module_name == "function_analysis":
            return self.run_function_analysis(project)
        elif module_name == "symbolic_execution":
            return self.run_symbolic_execution(project)
        elif module_name == "vulnerability_detection":
            return self.run_vulnerability_detection(project)
        elif module_name == "static_analysis":
            return self.run_static_analysis(project)
        else:
            raise ValueError(f"Unknown analysis module: {module_name}")

    def run_cfg_analysis(self, project):
        """Run CFG analysis module"""

        cfg = project.analyses.CFGFast()

        result = {
            'total_functions': len(cfg.functions),
            'total_nodes': len(cfg.nodes()),
            'total_edges': len(cfg.edges()),
            'entry_point': hex(project.entry),
            'functions': []
        }

        # Analyze top 10 largest functions
        sorted_functions = sorted(cfg.functions.items(), 
                                key=lambda x: x[1].size, reverse=True)

        for addr, func in sorted_functions[:10]:
            func_info = {
                'name': func.name,
                'address': hex(addr),
                'size': func.size,
                'blocks': len(func.blocks),
                'callers': len(func.callers),
                'callees': len(func.callees)
            }
            result['functions'].append(func_info)

        return result

    def run_function_analysis(self, project):
        """Run function analysis module"""

        cfg = project.analyses.CFGFast()

        result = {
            'total_functions': len(cfg.functions),
            'complexity_distribution': {'low': 0, 'medium': 0, 'high': 0, 'very_high': 0},
            'interesting_functions': {
                'large_functions': [],
                'complex_functions': [],
                'recursive_functions': []
            }
        }

        for addr, func in cfg.functions.items():
            # Calculate complexity
            complexity = len(func.blocks)

            if complexity <= 5:
                result['complexity_distribution']['low'] += 1
            elif complexity <= 15:
                result['complexity_distribution']['medium'] += 1
            elif complexity <= 30:
                result['complexity_distribution']['high'] += 1
            else:
                result['complexity_distribution']['very_high'] += 1

            # Find interesting functions
            if func.size > 1000:
                result['interesting_functions']['large_functions'].append({
                    'name': func.name,
                    'address': hex(addr),
                    'size': func.size
                })

            if complexity > 20:
                result['interesting_functions']['complex_functions'].append({
                    'name': func.name,
                    'address': hex(addr),
                    'complexity': complexity
                })

            # Check for recursion
            if addr in [callee.addr for callee in func.callees]:
                result['interesting_functions']['recursive_functions'].append({
                    'name': func.name,
                    'address': hex(addr)
                })

        return result

    def run_symbolic_execution(self, project):
        """Run symbolic execution module"""

        config = self.config['symbolic_execution']

        result = {
            'executed': False,
            'states_explored': 0,
            'paths_found': 0,
            'errors': [],
            'timeout': False
        }

        try:
            # Create initial state
            initial_state = project.factory.entry_state()

            # Run symbolic execution with timeout
            sm = project.factory.simulation_manager(initial_state)

            start_time = time.time()
            step_count = 0

            while (sm.active and 
                   step_count < config['max_steps'] and 
                   time.time() - start_time < config['timeout']):

                sm.step()
                step_count += 1

                # Limit active states
                if len(sm.active) > config['max_states']:
                    sm.active = sm.active[:config['max_states']]

            result['executed'] = True
            result['states_explored'] = step_count
            result['paths_found'] = len(sm.deadended) + len(sm.found)

            if time.time() - start_time >= config['timeout']:
                result['timeout'] = True

        except Exception as e:
            result['errors'].append(str(e))

        return result

    def run_vulnerability_detection(self, project):
        """Run vulnerability detection module"""

        config = self.config['vulnerability_detection']

        result = {
            'vulnerabilities_found': [],
            'checks_performed': []
        }

        cfg = project.analyses.CFGFast()

        # Buffer overflow detection
        if config.get('buffer_overflow', True):
            result['checks_performed'].append('buffer_overflow')

            for addr, func in cfg.functions.items():
                if 'strcpy' in func.name or 'gets' in func.name or 'sprintf' in func.name:
                    result['vulnerabilities_found'].append({
                        'type': 'potential_buffer_overflow',
                        'function': func.name,
                        'address': hex(addr),
                        'description': f'Dangerous function {func.name} detected'
                    })

        # Format string detection
        if config.get('format_string', True):
            result['checks_performed'].append('format_string')

            for addr, func in cfg.functions.items():
                if 'printf' in func.name and 'sprintf' not in func.name:
                    result['vulnerabilities_found'].append({
                        'type': 'potential_format_string',
                        'function': func.name,
                        'address': hex(addr),
                        'description': f'Format string function {func.name} detected'
                    })

        return result

    def run_static_analysis(self, project):
        """Run static analysis module"""

        result = {
            'strings': [],
            'imports': [],
            'exports': [],
            'sections': []
        }

        # Extract strings
        try:
            for addr, string in project.loader.main_object.strings.items():
                if len(string) > 4 and string.isprintable():
                    result['strings'].append({
                        'address': hex(addr),
                        'value': string[:100]  # Truncate long strings
                    })
        except:
            pass

        # Extract imports
        try:
            for symbol in project.loader.main_object.imports:
                result['imports'].append({
                    'name': symbol.name,
                    'address': hex(symbol.rebased_addr) if symbol.rebased_addr else None
                })
        except:
            pass

        # Extract exports
        try:
            for symbol in project.loader.main_object.exports:
                result['exports'].append({
                    'name': symbol.name,
                    'address': hex(symbol.rebased_addr)
                })
        except:
            pass

        return result

    def batch_analyze(self, binary_paths, parallel=True):
        """Analyze multiple binaries"""

        self.logger.info(f"Starting batch analysis of {len(binary_paths)} binaries")

        if parallel and len(binary_paths) > 1:
            return self.parallel_batch_analyze(binary_paths)
        else:
            return self.sequential_batch_analyze(binary_paths)

    def sequential_batch_analyze(self, binary_paths):
        """Sequential batch analysis"""

        results = []

        for i, binary_path in enumerate(binary_paths):
            self.logger.info(f"Analyzing binary {i+1}/{len(binary_paths)}: {binary_path}")

            try:
                result = self.analyze_binary(binary_path)
                results.append(result)
            except Exception as e:
                self.logger.error(f"Failed to analyze {binary_path}: {e}")
                results.append({
                    'binary_path': str(binary_path),
                    'status': 'failed',
                    'error': str(e)
                })

        return results

    def parallel_batch_analyze(self, binary_paths):
        """Parallel batch analysis"""

        num_workers = min(self.config['performance']['parallel_workers'], len(binary_paths))

        # Create worker processes
        workers = []
        for i in range(num_workers):
            worker = multiprocessing.Process(
                target=self.analysis_worker,
                args=(self.analysis_queue, self.result_queue)
            )
            workers.append(worker)
            worker.start()

        # Add tasks to queue
        for binary_path in binary_paths:
            self.analysis_queue.put(binary_path)

        # Add sentinel values to stop workers
        for _ in range(num_workers):
            self.analysis_queue.put(None)

        # Collect results
        results = []
        for _ in range(len(binary_paths)):
            result = self.result_queue.get()
            results.append(result)

        # Wait for workers to finish
        for worker in workers:
            worker.join()

        return results

    def analysis_worker(self, task_queue, result_queue):
        """Worker process for parallel analysis"""

        while True:
            binary_path = task_queue.get()

            if binary_path is None:  # Sentinel value
                break

            try:
                result = self.analyze_binary(binary_path)
                result_queue.put(result)
            except Exception as e:
                result_queue.put({
                    'binary_path': str(binary_path),
                    'status': 'failed',
                    'error': str(e)
                })

    def save_results(self, analysis_result):
        """Save analysis results"""

        output_dir = Path(self.config['output']['directory'])
        output_dir.mkdir(exist_ok=True)

        analysis_id = analysis_result['analysis_id']

        # Save JSON result
        if 'json' in self.config['output']['formats']:
            json_file = output_dir / f"{analysis_id}.json"
            with open(json_file, 'w') as f:
                json.dump(analysis_result, f, indent=2)

        # Save text report
        if 'txt' in self.config['output']['formats']:
            txt_file = output_dir / f"{analysis_id}.txt"
            self.generate_text_report(analysis_result, txt_file)

        # Save HTML report
        if 'html' in self.config['output']['formats']:
            html_file = output_dir / f"{analysis_id}.html"
            self.generate_html_report(analysis_result, html_file)

    def generate_text_report(self, analysis_result, output_file):
        """Generate text report"""

        with open(output_file, 'w') as f:
            f.write("ANGR ANALYSIS REPORT\n")
            f.write("=" * 50 + "\n\n")

            # Basic information
            f.write(f"Binary: {analysis_result['binary_path']}\n")
            f.write(f"Analysis ID: {analysis_result['analysis_id']}\n")
            f.write(f"Status: {analysis_result['status']}\n")
            f.write(f"Duration: {analysis_result.get('duration', 0):.2f} seconds\n\n")

            # Binary information
            if 'binary_info' in analysis_result:
                info = analysis_result['binary_info']
                f.write("BINARY INFORMATION\n")
                f.write("-" * 20 + "\n")
                for key, value in info.items():
                    f.write(f"{key}: {value}\n")
                f.write("\n")

            # Module results
            for module, result in analysis_result.get('modules', {}).items():
                f.write(f"{module.upper().replace('_', ' ')}\n")
                f.write("-" * 20 + "\n")
                self.write_module_result(f, result)
                f.write("\n")

            # Errors and warnings
            if analysis_result.get('errors'):
                f.write("ERRORS\n")
                f.write("-" * 20 + "\n")
                for error in analysis_result['errors']:
                    f.write(f"- {error}\n")
                f.write("\n")

    def write_module_result(self, file_handle, result):
        """Write module result to file"""

        if isinstance(result, dict):
            for key, value in result.items():
                if isinstance(value, (list, dict)) and len(str(value)) > 100:
                    file_handle.write(f"{key}: {type(value).__name__} with {len(value)} items\n")
                else:
                    file_handle.write(f"{key}: {value}\n")
        else:
            file_handle.write(f"Result: {result}\n")

    def generate_html_report(self, analysis_result, output_file):
        """Generate HTML report"""

        html_content = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>angr Analysis Report - {analysis_result['analysis_id']}</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .header {{ background-color: #f0f0f0; padding: 10px; border-radius: 5px; }}
                .section {{ margin: 20px 0; }}
                .error {{ color: red; }}
                .warning {{ color: orange; }}
                table {{ border-collapse: collapse; width: 100%; }}
                th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                th {{ background-color: #f2f2f2; }}
            </style>
        </head>
        <body>
            <div class="header">
                <h1>angr Analysis Report</h1>
                <p><strong>Binary:</strong> {analysis_result['binary_path']}</p>
                <p><strong>Analysis ID:</strong> {analysis_result['analysis_id']}</p>
                <p><strong>Status:</strong> {analysis_result['status']}</p>
                <p><strong>Duration:</strong> {analysis_result.get('duration', 0):.2f} seconds</p>
            </div>
        """

        # Add module results
        for module, result in analysis_result.get('modules', {}).items():
            html_content += f'<div class="section"><h2>{module.replace("_", " ").title()}</h2>'
            html_content += self.format_result_html(result)
            html_content += '</div>'

        html_content += """
        </body>
        </html>
        """

        with open(output_file, 'w') as f:
            f.write(html_content)

    def format_result_html(self, result):
        """Format result as HTML"""

        if isinstance(result, dict):
            html = "<table>"
            for key, value in result.items():
                html += f"<tr><td><strong>{key}</strong></td><td>{value}</td></tr>"
            html += "</table>"
            return html
        else:
            return f"<p>{result}</p>"

# Example usage and configuration
def create_analysis_pipeline():
    """Create and configure analysis pipeline"""

    # Create configuration
    config = {
        "analysis_modules": [
            "cfg_analysis",
            "function_analysis",
            "static_analysis",
            "vulnerability_detection"
        ],
        "output": {
            "formats": ["json", "txt", "html"],
            "directory": "./angr_analysis_results"
        },
        "performance": {
            "parallel_workers": 2,
            "timeout_per_binary": 300
        }
    }

    # Save configuration
    with open('pipeline_config.json', 'w') as f:
        json.dump(config, f, indent=2)

    # Create pipeline
    pipeline = AnalysisPipeline('pipeline_config.json')

    return pipeline

def run_pipeline_example():
    """Example of running analysis pipeline"""

    # Create pipeline
    pipeline = create_analysis_pipeline()

    # Analyze single binary
    binary_path = "/bin/ls"
    result = pipeline.analyze_binary(binary_path)

    print(f"Analysis completed: {result['status']}")
    print(f"Duration: {result.get('duration', 0):.2f} seconds")

    # Batch analysis example
    binary_paths = ["/bin/ls", "/bin/cat", "/bin/echo"]
    batch_results = pipeline.batch_analyze(binary_paths, parallel=True)

    print(f"Batch analysis completed: {len(batch_results)} binaries")

    return pipeline, batch_results

if __name__ == "__main__":
    pipeline, results = run_pipeline_example()
    print("Pipeline example completed")

Las mejores prácticas y consejos

Optimización del rendimiento

# angr performance optimization techniques
import angr
import angr.options as o

class PerformanceOptimizer:
    def __init__(self):
        self.optimization_strategies = {
            'memory': self.optimize_memory,
            'solver': self.optimize_solver,
            'state_management': self.optimize_state_management,
            'analysis': self.optimize_analysis
        }

    def optimize_memory(self, project, state=None):
        """Optimize memory usage"""

        # Memory optimization options
        memory_opts = {
            o.ABSTRACT_MEMORY,
            o.FAST_MEMORY,
            o.APPROXIMATE_MEMORY_SIZES,
            o.APPROXIMATE_MEMORY_INDICES
        }

        if state:
            state.options.update(memory_opts)

        # Configure memory backend
        if state:
            state.memory.set_state_options(
                write_strategies=[angr.storage.memory_mixins.DefaultMemory.write_strategy_store],
                read_strategies=[angr.storage.memory_mixins.DefaultMemory.read_strategy_load]
            )

        return memory_opts

    def optimize_solver(self, project, state=None):
        """Optimize constraint solver"""

        # Solver optimization options
        solver_opts = {
            o.ABSTRACT_SOLVER,
            o.APPROXIMATE_FIRST,
            o.APPROXIMATE_GUARDS,
            o.APPROXIMATE_SATISFIABILITY
        }

        if state:
            state.options.update(solver_opts)

            # Use faster solver backend
            state.solver._solver = angr.solvers.SolverCacheless(
                backend=angr.solvers.backends.z3
            )

            # Set solver timeout
            state.solver.timeout = 5000  # 5 seconds

        return solver_opts

    def optimize_state_management(self, simulation_manager):
        """Optimize state management"""

        # Limit number of active states
        max_active = 20
        if len(simulation_manager.active) > max_active:
            # Keep most promising states
            scored_states = []
            for state in simulation_manager.active:
                score = self.calculate_state_score(state)
                scored_states.append((score, state))

            scored_states.sort(key=lambda x: x[0], reverse=True)
            simulation_manager.active = [state for _, state in scored_states[:max_active]]

        # Merge similar states
        simulation_manager.merge()

        return simulation_manager

    def calculate_state_score(self, state):
        """Calculate state priority score"""

        score = 0

        # Prefer states with fewer constraints
        score += 1000 / (len(state.solver.constraints) + 1)

        # Prefer states that have made more progress
        score += state.history.depth

        # Prefer states in main executable
        main_object = state.project.loader.main_object
        if main_object.min_addr <= state.addr <= main_object.max_addr:
            score += 100

        return score

    def optimize_analysis(self, project):
        """Optimize analysis settings"""

        # CFG optimization
        cfg_opts = {
            'normalize': True,
            'resolve_indirect_jumps': False,  # Disable for speed
            'force_complete_scan': False,
            'show_progressbar': False
        }

        return cfg_opts

    def create_optimized_state(self, project, addr=None):
        """Create optimized initial state"""

        if addr is None:
            addr = project.entry

        # Performance-oriented options
        perf_options = {
            o.ABSTRACT_SOLVER,
            o.ABSTRACT_MEMORY,
            o.FAST_MEMORY,
            o.FAST_REGISTERS,
            o.APPROXIMATE_FIRST,
            o.APPROXIMATE_GUARDS,
            o.LAZY_SOLVES
        }

        # Create state with optimizations
        state = project.factory.entry_state(
            addr=addr,
            add_options=perf_options
        )

        # Additional optimizations
        self.optimize_memory(project, state)
        self.optimize_solver(project, state)

        return state

# Error handling and debugging
class AngrDebugger:
    def __init__(self, project):
        self.project = project
        self.debug_info = {}

    def setup_debugging(self, state):
        """Setup debugging for state"""

        # Enable tracking options
        debug_options = {
            o.TRACK_MEMORY_ACTIONS,
            o.TRACK_REGISTER_ACTIONS,
            o.TRACK_JMP_ACTIONS,
            o.TRACK_CONSTRAINT_ACTIONS
        }

        state.options.update(debug_options)

        # Setup breakpoints
        state.inspect.b('mem_read', when=angr.BP_BEFORE, action=self.on_memory_read)
        state.inspect.b('mem_write', when=angr.BP_BEFORE, action=self.on_memory_write)
        state.inspect.b('call', when=angr.BP_BEFORE, action=self.on_function_call)

        return state

    def on_memory_read(self, state):
        """Handle memory read events"""

        addr = state.inspect.mem_read_address
        size = state.inspect.mem_read_length

        if state.solver.symbolic(addr):
            print(f"Symbolic memory read at {addr} (size: {size})")

    def on_memory_write(self, state):
        """Handle memory write events"""

        addr = state.inspect.mem_write_address
        data = state.inspect.mem_write_expr

        if state.solver.symbolic(addr) or state.solver.symbolic(data):
            print(f"Symbolic memory write at {addr}: {data}")

    def on_function_call(self, state):
        """Handle function call events"""

        target = state.inspect.function_address
        print(f"Function call to {hex(target)}")

    def analyze_state_history(self, state):
        """Analyze state execution history"""

        history = []
        current = state

        while current.history.parent is not None:
            history.append({
                'address': hex(current.addr),
                'depth': current.history.depth,
                'constraints': len(current.solver.constraints),
                'actions': len(current.history.actions)
            })
            current = current.history.parent

        history.reverse()
        return history

    def debug_constraint_solving(self, state):
        """Debug constraint solving issues"""

        print(f"Constraint count: {len(state.solver.constraints)}")
        print(f"Variables: {list(state.solver.variables)}")

        # Check satisfiability
        if not state.solver.satisfiable():
            print("State is unsatisfiable!")

            # Find conflicting constraints
            for i, constraint in enumerate(state.solver.constraints):
                temp_solver = state.solver.branch()
                temp_solver.add(constraint)

                if not temp_solver.satisfiable():
                    print(f"Constraint {i} makes state unsatisfiable: {constraint}")

        # Check solver performance
        import time
        start_time = time.time()
        state.solver.satisfiable()
        solve_time = time.time() - start_time

        if solve_time > 1.0:
            print(f"Slow constraint solving: {solve_time:.2f} seconds")

# Best practices guide
def angr_best_practices():
    """Guide to angr best practices"""

    practices = {
        "Performance": [
            "Use ABSTRACT_MEMORY and ABSTRACT_SOLVER options for better performance",
            "Limit the number of active states to prevent state explosion",
            "Use timeouts for constraint solving",
            "Disable unnecessary tracking options",
            "Use CFGFast instead of CFGEmulated when possible"
        ],

        "Memory Management": [
            "Use FAST_MEMORY option for large binaries",
            "Avoid storing large amounts of data in state globals",
            "Clear unused states regularly",
            "Use memory-mapped files for large inputs"
        ],

        "Symbolic Execution": [
            "Start with concrete inputs and gradually make them symbolic",
            "Use find/avoid addresses to guide exploration",
            "Implement custom exploration techniques for complex targets",
            "Use state merging to reduce path explosion"
        ],

        "Debugging": [
            "Use state.inspect for debugging symbolic execution",
            "Enable tracking options only when needed",
            "Analyze constraint complexity regularly",
            "Use logging to track analysis progress"
        ],

        "Analysis Design": [
            "Break complex analysis into smaller modules",
            "Use caching for expensive computations",
            "Implement timeouts for all analysis phases",
            "Validate results with multiple techniques"
        ]
    }

    return practices

# Example of optimized analysis
def optimized_analysis_example(binary_path):
    """Example of optimized angr analysis"""

    # Load project
    project = angr.Project(binary_path, auto_load_libs=False)

    # Create optimizer
    optimizer = PerformanceOptimizer()

    # Create optimized state
    initial_state = optimizer.create_optimized_state(project)

    # Setup debugging if needed
    debugger = AngrDebugger(project)
    # initial_state = debugger.setup_debugging(initial_state)

    # Run optimized symbolic execution
    sm = project.factory.simulation_manager(initial_state)

    # Exploration with optimization
    step_count = 0
    max_steps = 100

    while sm.active and step_count < max_steps:
        sm.step()
        step_count += 1

        # Apply optimizations
        sm = optimizer.optimize_state_management(sm)

        if step_count % 10 == 0:
            print(f"Step {step_count}: {len(sm.active)} active states")

    print(f"Analysis completed: {step_count} steps")
    print(f"Final states: {len(sm.active)} active, {len(sm.deadended)} deadended")

    return sm

if __name__ == "__main__":
    binary_path = "/bin/ls"
    sm = optimized_analysis_example(binary_path)

    # Print best practices
    practices = angr_best_practices()
    print("\nangr Best Practices:")
    for category, tips in practices.items():
        print(f"\n{category}:")
        for tip in tips:
            print(f"  - {tip}")

Recursos

Documentación y aprendizaje

Formación y Tutoriales

Comunidad y Apoyo

Herramientas e integración relacionadas