angr Cheat Sheet
"Clase de la hoja" id="copy-btn" class="copy-btn" onclick="copyAllCommands()" Copiar todos los comandos id="pdf-btn" class="pdf-btn" onclick="generatePDF()" Generar PDF seleccionado/button ■/div titulada
Sinopsis
angr es una poderosa plataforma de análisis binario basado en Python que proporciona funciones simbólicas de ejecución, análisis estático y análisis dinámico. Está diseñado para el descubrimiento automatizado de vulnerabilidad, ingeniería inversa y análisis del programa. angr puede analizar binarios a través de múltiples arquitecturas y proporciona una rica API para la construcción de herramientas de análisis personalizado.
■ Key Strengths: Ejecución simbólica, soporte multiarquitectura, detección automatizada de vulnerabilidad, resolución de restricciones y API de Python extensa para flujos de trabajo de análisis personalizados.
Instalación y configuración
Instalación básica
# Install angr via pip (recommended)
pip install angr
# Install with additional dependencies
pip install angr[all]
# Install development version
pip install git+https://github.com/angr/angr.git
# Install in virtual environment (recommended)
python -m venv angr-env
source angr-env/bin/activate # Linux/macOS
# angr-env\Scripts\activate # Windows
pip install angr
# Verify installation
python -c "import angr; print(angr.__version__)"
# Install additional tools
pip install angr-management # GUI for angr
pip install angr-doc # Documentation
pip install angr-utils # Utility functions
Dependencias y necesidades
# System dependencies (Ubuntu/Debian)
sudo apt-get update
sudo apt-get install -y \
python3-dev \
build-essential \
libffi-dev \
libssl-dev \
libtool \
pkg-config \
cmake \
libgmp-dev \
libmpfr-dev \
libmpc-dev
# System dependencies (CentOS/RHEL)
sudo yum groupinstall -y "Development Tools"
sudo yum install -y \
python3-devel \
libffi-devel \
openssl-devel \
libtool \
pkgconfig \
cmake \
gmp-devel \
mpfr-devel \
libmpc-devel
# macOS dependencies (with Homebrew)
brew install \
libffi \
openssl \
libtool \
pkg-config \
cmake \
gmp \
mpfr \
libmpc
# Install Z3 solver (recommended)
pip install z3-solver
# Install additional solvers
pip install pysmt
pysmt-install --z3 --cvc4 --boolector
# Verify solver installation
python -c "import z3; print('Z3 version:', z3.get_version_string())"
Configuración y medio ambiente
# angr configuration and environment setup
import angr
import logging
import os
# Configure logging
logging.getLogger('angr').setLevel(logging.INFO)
logging.getLogger('cle').setLevel(logging.WARNING)
logging.getLogger('pyvex').setLevel(logging.WARNING)
# Set environment variables
os.environ['ANGR_CACHE_DIR'] = '/tmp/angr_cache'
os.environ['ANGR_LOG_LEVEL'] = 'INFO'
# Configure angr options
angr.options.ZERO_FILL_UNCONSTRAINED_MEMORY = True
angr.options.ZERO_FILL_UNCONSTRAINED_REGISTERS = True
# Memory and performance settings
import angr.options as o
# Common option sets
common_options = {
o.ABSTRACT_SOLVER,
o.ABSTRACT_MEMORY,
o.APPROXIMATE_FIRST,
o.APPROXIMATE_GUARDS,
o.APPROXIMATE_SATISFIABILITY
}
# Debugging options
debug_options = {
o.TRACK_MEMORY_ACTIONS,
o.TRACK_REGISTER_ACTIONS,
o.TRACK_JMP_ACTIONS,
o.TRACK_CONSTRAINT_ACTIONS
}
# Performance options
performance_options = {
o.EFFICIENT_STATE_MERGING,
o.LAZY_SOLVES,
o.FAST_MEMORY,
o.FAST_REGISTERS
}
print("angr environment configured successfully")
Análisis binario básico
Binarios de carga y análisis
# Basic binary loading and analysis
import angr
import archinfo
def load_binary(binary_path, **kwargs):
"""Load binary with angr"""
# Basic loading
project = angr.Project(binary_path, auto_load_libs=False)
print(f"Binary: {binary_path}")
print(f"Architecture: {project.arch}")
print(f"Entry point: {hex(project.entry)}")
print(f"Base address: {hex(project.loader.main_object.min_addr)}")
print(f"Binary type: {project.loader.main_object.os}")
return project
# Load binary with custom options
def load_binary_advanced(binary_path):
"""Load binary with advanced options"""
# Custom loading options
project = angr.Project(
binary_path,
auto_load_libs=True, # Load shared libraries
use_sim_procedures=True, # Use SimProcedures for library functions
exclude_sim_procedures_func=['malloc', 'free'], # Exclude specific functions
force_load_libs=['libc.so.6'], # Force load specific libraries
main_opts={
'base_addr': 0x400000, # Custom base address
'backend': 'elf' # Force backend type
}
)
# Print loaded objects
print("Loaded objects:")
for obj in project.loader.all_objects:
print(f" {obj.binary}: {hex(obj.min_addr)}-{hex(obj.max_addr)}")
# Print symbols
print(f"\nSymbols: {len(project.loader.main_object.symbols)}")
for name, symbol in list(project.loader.main_object.symbols.items())[:10]:
print(f" {name}: {hex(symbol.rebased_addr)}")
return project
# Analyze binary structure
def analyze_binary_structure(project):
"""Analyze binary structure and sections"""
main_object = project.loader.main_object
print("Binary Structure Analysis:")
print(f" Entry point: {hex(project.entry)}")
print(f" Architecture: {project.arch.name}")
print(f" Word size: {project.arch.bits} bits")
print(f" Endianness: {project.arch.memory_endness}")
print(f" Address space: {hex(main_object.min_addr)} - {hex(main_object.max_addr)}")
# Analyze sections
if hasattr(main_object, 'sections'):
print(f"\nSections ({len(main_object.sections)}):")
for section in main_object.sections:
print(f" {section.name}: {hex(section.vaddr)} - {hex(section.vaddr + section.memsize)} "
f"({section.memsize} bytes, {section.flags})")
# Analyze segments
if hasattr(main_object, 'segments'):
print(f"\nSegments ({len(main_object.segments)}):")
for segment in main_object.segments:
print(f" {hex(segment.vaddr)} - {hex(segment.vaddr + segment.memsize)} "
f"({segment.memsize} bytes, flags: {segment.flags})")
# Find functions
cfg = project.analyses.CFGFast()
functions = cfg.functions
print(f"\nFunctions found: {len(functions)}")
for addr, func in list(functions.items())[:10]:
print(f" {func.name}: {hex(addr)} ({func.size} bytes)")
return cfg
# Example usage
if __name__ == "__main__":
# Load a binary
binary_path = "/bin/ls" # Example binary
project = load_binary_advanced(binary_path)
# Analyze structure
cfg = analyze_binary_structure(project)
Análisis de flujo de flujo de control
# Control Flow Graph (CFG) analysis with angr
import angr
import networkx as nx
import matplotlib.pyplot as plt
class CFGAnalyzer:
def __init__(self, project):
self.project = project
self.cfg = None
self.functions = {}
def generate_cfg(self, normalize=True, resolve_indirect_jumps=True):
"""Generate Control Flow Graph"""
print("Generating CFG...")
# Generate CFG with options
self.cfg = self.project.analyses.CFGFast(
normalize=normalize,
resolve_indirect_jumps=resolve_indirect_jumps,
force_complete_scan=False,
show_progressbar=True
)
print(f"CFG generated: {len(self.cfg.nodes())} nodes, {len(self.cfg.edges())} edges")
print(f"Functions discovered: {len(self.cfg.functions)}")
return self.cfg
def analyze_function(self, function_addr):
"""Analyze specific function"""
if self.cfg is None:
self.generate_cfg()
if function_addr not in self.cfg.functions:
print(f"Function at {hex(function_addr)} not found")
return None
func = self.cfg.functions[function_addr]
analysis = {
'name': func.name,
'address': hex(func.addr),
'size': func.size,
'blocks': len(func.blocks),
'calling_convention': func.calling_convention,
'is_plt': func.is_plt,
'is_simprocedure': func.is_simprocedure,
'endpoints': [hex(ep.addr) for ep in func.endpoints],
'callsites': [hex(cs.addr) for cs in func.get_call_sites()],
'callers': [hex(caller.addr) for caller in func.callers],
'callees': [hex(callee.addr) for callee in func.callees]
}
print(f"Function Analysis: {func.name}")
for key, value in analysis.items():
if isinstance(value, list) and len(value) > 5:
print(f" {key}: {value[:5]}... ({len(value)} total)")
else:
print(f" {key}: {value}")
return analysis
def find_interesting_functions(self):
"""Find potentially interesting functions"""
if self.cfg is None:
self.generate_cfg()
interesting = {
'large_functions': [],
'complex_functions': [],
'leaf_functions': [],
'recursive_functions': [],
'plt_functions': []
}
for addr, func in self.cfg.functions.items():
# Large functions (>1000 bytes)
if func.size > 1000:
interesting['large_functions'].append((func.name, hex(addr), func.size))
# Complex functions (>20 basic blocks)
if len(func.blocks) > 20:
interesting['complex_functions'].append((func.name, hex(addr), len(func.blocks)))
# Leaf functions (no callees)
if not func.callees:
interesting['leaf_functions'].append((func.name, hex(addr)))
# Recursive functions
if addr in [callee.addr for callee in func.callees]:
interesting['recursive_functions'].append((func.name, hex(addr)))
# PLT functions
if func.is_plt:
interesting['plt_functions'].append((func.name, hex(addr)))
# Print results
for category, functions in interesting.items():
print(f"\n{category.replace('_', ' ').title()}: {len(functions)}")
for func_info in functions[:5]: # Show first 5
print(f" {func_info}")
return interesting
def analyze_call_graph(self):
"""Analyze call graph relationships"""
if self.cfg is None:
self.generate_cfg()
# Build call graph
call_graph = nx.DiGraph()
for addr, func in self.cfg.functions.items():
call_graph.add_node(addr, name=func.name, size=func.size)
for callee in func.callees:
call_graph.add_edge(addr, callee.addr)
# Analyze call graph properties
analysis = {
'total_functions': len(call_graph.nodes()),
'total_calls': len(call_graph.edges()),
'strongly_connected_components': len(list(nx.strongly_connected_components(call_graph))),
'weakly_connected_components': len(list(nx.weakly_connected_components(call_graph))),
'cycles': len(list(nx.simple_cycles(call_graph))),
'average_degree': sum(dict(call_graph.degree()).values()) / len(call_graph.nodes()) if call_graph.nodes() else 0
}
# Find central functions (high degree)
degree_centrality = nx.degree_centrality(call_graph)
central_functions = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)[:10]
print("Call Graph Analysis:")
for key, value in analysis.items():
print(f" {key}: {value}")
print("\nMost Central Functions:")
for addr, centrality in central_functions:
func_name = self.cfg.functions[addr].name
print(f" {func_name} ({hex(addr)}): {centrality:.3f}")
return call_graph, analysis
def visualize_function_cfg(self, function_addr, output_file="function_cfg.png"):
"""Visualize function CFG"""
if self.cfg is None:
self.generate_cfg()
if function_addr not in self.cfg.functions:
print(f"Function at {hex(function_addr)} not found")
return
func = self.cfg.functions[function_addr]
# Create graph for function
G = nx.DiGraph()
# Add nodes (basic blocks)
for block in func.blocks:
G.add_node(block.addr, label=f"{hex(block.addr)}\n{block.size} bytes")
# Add edges
for block in func.blocks:
for successor in block.successors:
G.add_edge(block.addr, successor.addr)
# Create visualization
plt.figure(figsize=(12, 8))
pos = nx.spring_layout(G, k=1, iterations=50)
# Draw nodes
nx.draw_networkx_nodes(G, pos, node_color='lightblue',
node_size=1000, alpha=0.7)
# Draw edges
nx.draw_networkx_edges(G, pos, edge_color='gray',
arrows=True, arrowsize=20)
# Draw labels
labels = nx.get_node_attributes(G, 'label')
nx.draw_networkx_labels(G, pos, labels, font_size=8)
plt.title(f"CFG for function {func.name} ({hex(function_addr)})")
plt.axis('off')
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Function CFG saved to {output_file}")
# Example usage
def analyze_binary_cfg(binary_path):
"""Complete CFG analysis example"""
# Load binary
project = angr.Project(binary_path, auto_load_libs=False)
# Create CFG analyzer
analyzer = CFGAnalyzer(project)
# Generate and analyze CFG
cfg = analyzer.generate_cfg()
# Find interesting functions
interesting = analyzer.find_interesting_functions()
# Analyze call graph
call_graph, cg_analysis = analyzer.analyze_call_graph()
# Analyze main function if available
main_addr = project.loader.main_object.get_symbol('main')
if main_addr:
analyzer.analyze_function(main_addr.rebased_addr)
analyzer.visualize_function_cfg(main_addr.rebased_addr)
return analyzer
# Run analysis
if __name__ == "__main__":
binary_path = "/bin/ls"
analyzer = analyze_binary_cfg(binary_path)
Ejecución simbólica
Ejecución simbólica básica
# Symbolic execution with angr
import angr
import claripy
class SymbolicExecutor:
def __init__(self, project):
self.project = project
self.simulation_manager = None
self.initial_state = None
def create_initial_state(self, addr=None, **kwargs):
"""Create initial state for symbolic execution"""
if addr is None:
addr = self.project.entry
# Create initial state
self.initial_state = self.project.factory.entry_state(
addr=addr,
add_options={
angr.options.ABSTRACT_SOLVER,
angr.options.ABSTRACT_MEMORY,
angr.options.APPROXIMATE_FIRST
},
**kwargs
)
print(f"Initial state created at {hex(addr)}")
return self.initial_state
def setup_symbolic_input(self, input_size=100, input_name="symbolic_input"):
"""Setup symbolic input for analysis"""
if self.initial_state is None:
self.create_initial_state()
# Create symbolic bitvector
symbolic_input = claripy.BVS(input_name, input_size * 8)
# Add constraints (printable characters)
for i in range(input_size):
byte = symbolic_input.get_byte(i)
self.initial_state.solver.add(byte >= 0x20) # Printable ASCII
self.initial_state.solver.add(byte <= 0x7e)
print(f"Symbolic input created: {input_name} ({input_size} bytes)")
return symbolic_input
def setup_symbolic_arguments(self, argc=2, argv_size=100):
"""Setup symbolic command line arguments"""
if self.initial_state is None:
self.create_initial_state()
# Create symbolic argv
argv = []
for i in range(argc):
arg = claripy.BVS(f"argv_{i}", argv_size * 8)
# Add null terminator constraint
for j in range(argv_size - 1):
byte = arg.get_byte(j)
self.initial_state.solver.add(byte >= 0x20)
self.initial_state.solver.add(byte <= 0x7e)
# Null terminate
self.initial_state.solver.add(arg.get_byte(argv_size - 1) == 0)
argv.append(arg)
# Set up argv in memory
argv_ptrs = []
base_addr = 0x7fff0000
for i, arg in enumerate(argv):
arg_addr = base_addr + i * argv_size
self.initial_state.memory.store(arg_addr, arg)
argv_ptrs.append(arg_addr)
# Set up argv pointer array
argv_array_addr = base_addr + len(argv) * argv_size
for i, ptr in enumerate(argv_ptrs):
self.initial_state.memory.store(
argv_array_addr + i * self.project.arch.bytes,
ptr,
size=self.project.arch.bytes
)
# Set argc and argv in registers/stack
if self.project.arch.name == 'AMD64':
self.initial_state.regs.rdi = argc
self.initial_state.regs.rsi = argv_array_addr
elif self.project.arch.name == 'X86':
# Push argv and argc onto stack
self.initial_state.stack_push(argv_array_addr)
self.initial_state.stack_push(argc)
print(f"Symbolic arguments setup: argc={argc}, argv at {hex(argv_array_addr)}")
return argv
def run_symbolic_execution(self, find_addrs=None, avoid_addrs=None, max_steps=1000):
"""Run symbolic execution"""
if self.initial_state is None:
self.create_initial_state()
# Create simulation manager
self.simulation_manager = self.project.factory.simulation_manager(self.initial_state)
print("Starting symbolic execution...")
# Run exploration
if find_addrs or avoid_addrs:
self.simulation_manager.explore(
find=find_addrs,
avoid=avoid_addrs,
step_func=self.step_callback,
num_find=10 # Limit number of solutions
)
else:
# Step-by-step execution
for step in range(max_steps):
if not self.simulation_manager.active:
break
self.simulation_manager.step()
if step % 100 == 0:
print(f"Step {step}: {len(self.simulation_manager.active)} active states")
# Check for interesting states
if self.simulation_manager.deadended:
print(f"Found {len(self.simulation_manager.deadended)} deadended states")
if self.simulation_manager.errored:
print(f"Found {len(self.simulation_manager.errored)} errored states")
print("Symbolic execution completed")
return self.simulation_manager
def step_callback(self, sm):
"""Callback function for each step"""
# Limit number of active states to prevent state explosion
if len(sm.active) > 50:
sm.active = sm.active[:50]
return sm
def analyze_found_states(self):
"""Analyze states that reached target"""
if not self.simulation_manager or not self.simulation_manager.found:
print("No found states to analyze")
return []
solutions = []
for i, state in enumerate(self.simulation_manager.found):
print(f"\n--- Found State {i+1} ---")
print(f"Address: {hex(state.addr)}")
# Get concrete values for symbolic variables
solution = {}
for var_name in state.solver.variables:
if state.solver.satisfiable():
concrete_value = state.solver.eval(var_name, cast_to=bytes)
solution[str(var_name)] = concrete_value
print(f" {var_name}: {concrete_value}")
solutions.append(solution)
return solutions
def find_path_to_address(self, target_addr, avoid_addrs=None):
"""Find execution path to specific address"""
if self.initial_state is None:
self.create_initial_state()
print(f"Finding path to {hex(target_addr)}")
# Run exploration
sm = self.project.factory.simulation_manager(self.initial_state)
sm.explore(find=target_addr, avoid=avoid_addrs)
if sm.found:
found_state = sm.found[0]
# Get execution trace
trace = []
state = found_state
while state.history.parent is not None:
trace.append({
'address': hex(state.addr),
'instruction': state.block().capstone.insns[0] if state.block().capstone.insns else None,
'constraints': len(state.solver.constraints)
})
state = state.history.parent
trace.reverse()
print(f"Path found with {len(trace)} steps:")
for i, step in enumerate(trace[:10]): # Show first 10 steps
print(f" {i}: {step['address']} - {step.get('instruction', 'N/A')}")
return trace, found_state
else:
print("No path found to target address")
return None, None
# Vulnerability discovery with symbolic execution
class VulnerabilityFinder:
def __init__(self, project):
self.project = project
self.vulnerabilities = []
def find_buffer_overflows(self, function_addr, buffer_size=100):
"""Find buffer overflow vulnerabilities"""
print(f"Searching for buffer overflows in function {hex(function_addr)}")
# Create initial state at function entry
initial_state = self.project.factory.call_state(function_addr)
# Create symbolic input buffer
symbolic_buffer = claripy.BVS("input_buffer", buffer_size * 8)
# Store symbolic buffer in memory
buffer_addr = 0x7fff0000
initial_state.memory.store(buffer_addr, symbolic_buffer)
# Set up function arguments to point to buffer
if self.project.arch.name == 'AMD64':
initial_state.regs.rdi = buffer_addr
initial_state.regs.rsi = buffer_size
# Run symbolic execution
sm = self.project.factory.simulation_manager(initial_state)
# Look for crashes or overwrites
crash_addrs = []
try:
sm.run(until=lambda sm: len(sm.active) == 0 or len(sm.errored) > 0)
# Check for errors that might indicate buffer overflow
for errored_state in sm.errored:
error = errored_state.error
if "Segmentation fault" in str(error) or "Invalid memory access" in str(error):
# Potential buffer overflow
vulnerability = {
'type': 'buffer_overflow',
'function': hex(function_addr),
'error_addr': hex(errored_state.state.addr),
'error': str(error),
'input_constraints': errored_state.state.solver.constraints
}
self.vulnerabilities.append(vulnerability)
print(f"Potential buffer overflow found at {hex(errored_state.state.addr)}")
except Exception as e:
print(f"Error during buffer overflow analysis: {e}")
return self.vulnerabilities
def find_format_string_bugs(self, printf_addr):
"""Find format string vulnerabilities"""
print(f"Searching for format string bugs at {hex(printf_addr)}")
# Create state at printf call
initial_state = self.project.factory.call_state(printf_addr)
# Create symbolic format string
format_string = claripy.BVS("format_string", 100 * 8)
# Add constraints for format string characters
for i in range(100):
byte = format_string.get_byte(i)
initial_state.solver.add(claripy.Or(
claripy.And(byte >= 0x20, byte <= 0x7e), # Printable
byte == 0 # Null terminator
))
# Set format string as first argument
format_addr = 0x7fff1000
initial_state.memory.store(format_addr, format_string)
if self.project.arch.name == 'AMD64':
initial_state.regs.rdi = format_addr
# Look for format string specifiers that could be dangerous
dangerous_patterns = [b'%n', b'%s', b'%x']
for pattern in dangerous_patterns:
# Check if format string can contain dangerous pattern
pattern_bv = claripy.BVV(pattern)
# This is a simplified check - in practice, you'd need more sophisticated analysis
if initial_state.solver.satisfiable(extra_constraints=[
format_string.get_bytes(0, len(pattern)) == pattern_bv
]):
vulnerability = {
'type': 'format_string',
'function': hex(printf_addr),
'pattern': pattern.decode(),
'description': f"Format string can contain {pattern.decode()}"
}
self.vulnerabilities.append(vulnerability)
print(f"Potential format string vulnerability: {pattern.decode()}")
return self.vulnerabilities
def find_integer_overflows(self, function_addr):
"""Find integer overflow vulnerabilities"""
print(f"Searching for integer overflows in function {hex(function_addr)}")
# Create initial state
initial_state = self.project.factory.call_state(function_addr)
# Create symbolic integers
int1 = claripy.BVS("int1", 32)
int2 = claripy.BVS("int2", 32)
# Set up function arguments
if self.project.arch.name == 'AMD64':
initial_state.regs.rdi = int1.zero_extend(32)
initial_state.regs.rsi = int2.zero_extend(32)
# Run symbolic execution
sm = self.project.factory.simulation_manager(initial_state)
try:
sm.run(until=lambda sm: len(sm.active) == 0)
# Check for integer overflow conditions
for state in sm.deadended:
# Look for arithmetic operations that could overflow
for action in state.history.actions:
if hasattr(action, 'op') and action.op in ['__add__', '__mul__', '__sub__']:
# Check if result could overflow
if state.solver.satisfiable(extra_constraints=[
action.result > 0xffffffff # 32-bit overflow
]):
vulnerability = {
'type': 'integer_overflow',
'function': hex(function_addr),
'operation': action.op,
'address': hex(state.addr)
}
self.vulnerabilities.append(vulnerability)
print(f"Potential integer overflow in {action.op} at {hex(state.addr)}")
except Exception as e:
print(f"Error during integer overflow analysis: {e}")
return self.vulnerabilities
# Example usage
def symbolic_execution_example(binary_path):
"""Complete symbolic execution example"""
# Load binary
project = angr.Project(binary_path, auto_load_libs=False)
# Create symbolic executor
executor = SymbolicExecutor(project)
# Setup symbolic input
executor.create_initial_state()
symbolic_input = executor.setup_symbolic_input(input_size=50)
# Find main function
cfg = project.analyses.CFGFast()
main_func = None
for addr, func in cfg.functions.items():
if func.name == 'main':
main_func = func
break
if main_func:
print(f"Found main function at {hex(main_func.addr)}")
# Find interesting addresses in main
interesting_addrs = []
for block in main_func.blocks:
# Look for calls to interesting functions
for insn in block.capstone.insns:
if insn.mnemonic == 'call':
interesting_addrs.append(insn.address)
if interesting_addrs:
# Run symbolic execution to reach interesting addresses
sm = executor.run_symbolic_execution(
find_addrs=interesting_addrs[:3], # First 3 interesting addresses
max_steps=500
)
# Analyze results
solutions = executor.analyze_found_states()
return executor, solutions
return executor, []
# Run example
if __name__ == "__main__":
binary_path = "/bin/ls"
executor, solutions = symbolic_execution_example(binary_path)
print(f"Found {len(solutions)} solutions")
Técnicas avanzadas de ejecución simbólica
# Advanced symbolic execution techniques
import angr
import claripy
import itertools
class AdvancedSymbolicAnalysis:
def __init__(self, project):
self.project = project
self.custom_hooks = {}
self.analysis_results = {}
def setup_custom_hooks(self):
"""Setup custom hooks for library functions"""
# Hook malloc to track allocations
@self.project.hook_symbol('malloc')
def malloc_hook(state):
size = state.solver.eval(state.regs.rdi) # First argument
# Allocate memory
addr = state.heap.allocate(size)
# Track allocation
if not hasattr(state.globals, 'allocations'):
state.globals['allocations'] = {}
state.globals['allocations'][addr] = {
'size': size,
'allocated_at': state.addr
}
state.regs.rax = addr # Return address
print(f"malloc({size}) = {hex(addr)}")
# Hook free to track deallocations
@self.project.hook_symbol('free')
def free_hook(state):
addr = state.solver.eval(state.regs.rdi)
if hasattr(state.globals, 'allocations') and addr in state.globals['allocations']:
del state.globals['allocations'][addr]
print(f"free({hex(addr)})")
# Mark memory as freed (for use-after-free detection)
if not hasattr(state.globals, 'freed_memory'):
state.globals['freed_memory'] = set()
state.globals['freed_memory'].add(addr)
# Hook strcpy to detect buffer overflows
@self.project.hook_symbol('strcpy')
def strcpy_hook(state):
dest = state.solver.eval(state.regs.rdi)
src = state.solver.eval(state.regs.rsi)
# Read source string
src_data = state.memory.load(src, 1000) # Max 1000 bytes
# Find null terminator
null_pos = None
for i in range(1000):
byte = src_data.get_byte(i)
if state.solver.eval(byte) == 0:
null_pos = i
break
if null_pos is not None:
# Copy string including null terminator
string_data = src_data.get_bytes(0, null_pos + 1)
state.memory.store(dest, string_data)
print(f"strcpy({hex(dest)}, {hex(src)}) - copied {null_pos + 1} bytes")
# Check for potential buffer overflow
if hasattr(state.globals, 'allocations') and dest in state.globals['allocations']:
allocated_size = state.globals['allocations'][dest]['size']
if null_pos + 1 > allocated_size:
print(f"WARNING: Potential buffer overflow! Copied {null_pos + 1} bytes to {allocated_size} byte buffer")
state.regs.rax = dest # Return destination
print("Custom hooks installed")
def concolic_execution(self, initial_inputs, target_function):
"""Perform concolic execution with concrete and symbolic inputs"""
results = []
for concrete_input in initial_inputs:
print(f"Concolic execution with input: {concrete_input}")
# Create initial state
state = self.project.factory.entry_state()
# Set up concrete input
input_addr = 0x7fff0000
state.memory.store(input_addr, concrete_input)
# Create symbolic version of input
symbolic_input = claripy.BVS("symbolic_input", len(concrete_input) * 8)
# Add constraint that symbolic input equals concrete input initially
state.solver.add(symbolic_input == claripy.BVV(concrete_input))
# Run execution
sm = self.project.factory.simulation_manager(state)
sm.explore(find=target_function)
if sm.found:
found_state = sm.found[0]
# Generate new inputs by negating path constraints
new_inputs = self.generate_new_inputs(found_state, symbolic_input)
results.extend(new_inputs)
return results
def generate_new_inputs(self, state, symbolic_var):
"""Generate new inputs by negating path constraints"""
new_inputs = []
constraints = state.solver.constraints
# Try negating each constraint to explore different paths
for i, constraint in enumerate(constraints):
# Create new constraint set with negated constraint
new_constraints = constraints[:i] + [claripy.Not(constraint)] + constraints[i+1:]
# Check if new constraint set is satisfiable
if state.solver.satisfiable(extra_constraints=new_constraints):
# Generate concrete input
new_input = state.solver.eval(symbolic_var, extra_constraints=new_constraints, cast_to=bytes)
new_inputs.append(new_input)
return new_inputs
def taint_analysis(self, taint_sources, sink_functions):
"""Perform taint analysis to track data flow"""
print("Starting taint analysis...")
# Create initial state with taint tracking
initial_state = self.project.factory.entry_state(
add_options={angr.options.TRACK_MEMORY_ACTIONS}
)
# Mark taint sources
for source_addr in taint_sources:
# Create tainted symbolic variable
tainted_data = claripy.BVS(f"tainted_{hex(source_addr)}", 64)
initial_state.memory.store(source_addr, tainted_data)
# Track taint
if not hasattr(initial_state.globals, 'tainted_data'):
initial_state.globals['tainted_data'] = set()
initial_state.globals['tainted_data'].add(tainted_data)
# Run symbolic execution
sm = self.project.factory.simulation_manager(initial_state)
taint_flows = []
def check_taint_at_sinks(state):
"""Check if tainted data reaches sink functions"""
if state.addr in sink_functions:
sink_name = sink_functions[state.addr]
# Check function arguments for taint
if self.project.arch.name == 'AMD64':
args = [state.regs.rdi, state.regs.rsi, state.regs.rdx, state.regs.rcx]
else:
# Get arguments from stack for x86
args = []
for i in range(4):
arg = state.memory.load(state.regs.esp + (i + 1) * 4, 4)
args.append(arg)
for i, arg in enumerate(args):
if hasattr(state.globals, 'tainted_data'):
for tainted_var in state.globals['tainted_data']:
if state.solver.satisfiable(extra_constraints=[arg == tainted_var]):
taint_flow = {
'sink_function': sink_name,
'sink_address': hex(state.addr),
'argument_index': i,
'tainted_variable': str(tainted_var)
}
taint_flows.append(taint_flow)
print(f"Taint flow detected: {tainted_var} -> {sink_name} arg {i}")
# Step through execution and check for taint at each step
for _ in range(1000): # Limit steps
if not sm.active:
break
sm.step()
for state in sm.active:
check_taint_at_sinks(state)
return taint_flows
def constraint_solving_optimization(self, state, optimization_target):
"""Optimize constraint solving for specific targets"""
# Use different solvers for different types of constraints
if 'crypto' in optimization_target.lower():
# Use specialized solver for cryptographic constraints
state.solver._solver = claripy.SolverCacheless(backend=claripy.backends.z3)
elif 'arithmetic' in optimization_target.lower():
# Use arithmetic-optimized solver
state.solver._solver = claripy.SolverCacheless(backend=claripy.backends.vsa)
# Add solver optimizations
state.solver.simplify()
# Cache frequently used constraints
if not hasattr(state.globals, 'constraint_cache'):
state.globals['constraint_cache'] = {}
return state
def path_explosion_mitigation(self, sm, max_active_states=50):
"""Mitigate path explosion in symbolic execution"""
if len(sm.active) > max_active_states:
# Prioritize states based on different criteria
scored_states = []
for state in sm.active:
score = 0
# Prefer states with fewer constraints (simpler paths)
score += 1000 / (len(state.solver.constraints) + 1)
# Prefer states that have made more progress
score += state.history.depth
# Prefer states in interesting code regions
if self.is_interesting_address(state.addr):
score += 500
scored_states.append((score, state))
# Keep top-scored states
scored_states.sort(key=lambda x: x[0], reverse=True)
sm.active = [state for score, state in scored_states[:max_active_states]]
print(f"Pruned to {len(sm.active)} active states")
return sm
def is_interesting_address(self, addr):
"""Check if address is in interesting code region"""
# Check if address is in main executable (not libraries)
main_object = self.project.loader.main_object
return main_object.min_addr <= addr <= main_object.max_addr
def symbolic_memory_modeling(self, state, memory_model='flat'):
"""Configure symbolic memory modeling"""
if memory_model == 'flat':
# Simple flat memory model
state.options.add(angr.options.ABSTRACT_MEMORY)
elif memory_model == 'paged':
# Paged memory model for better performance
state.options.add(angr.options.FAST_MEMORY)
state.options.add(angr.options.ABSTRACT_MEMORY)
elif memory_model == 'concrete':
# Concrete memory with symbolic overlay
state.options.add(angr.options.CONCRETE_MEMORY)
state.options.add(angr.options.SYMBOLIC_INITIAL_VALUES)
return state
def run_comprehensive_analysis(self, target_function, analysis_config):
"""Run comprehensive symbolic analysis"""
print("Starting comprehensive symbolic analysis...")
# Setup hooks
self.setup_custom_hooks()
# Create initial state
initial_state = self.project.factory.entry_state()
# Configure memory model
initial_state = self.symbolic_memory_modeling(
initial_state,
analysis_config.get('memory_model', 'flat')
)
# Setup symbolic inputs
if 'symbolic_inputs' in analysis_config:
for input_config in analysis_config['symbolic_inputs']:
symbolic_var = claripy.BVS(
input_config['name'],
input_config['size'] * 8
)
initial_state.memory.store(input_config['address'], symbolic_var)
# Run symbolic execution
sm = self.project.factory.simulation_manager(initial_state)
# Configure exploration
find_addrs = analysis_config.get('find_addresses', [])
avoid_addrs = analysis_config.get('avoid_addresses', [])
step_count = 0
max_steps = analysis_config.get('max_steps', 1000)
while sm.active and step_count < max_steps:
sm.step()
step_count += 1
# Mitigate path explosion
sm = self.path_explosion_mitigation(sm)
# Check for target addresses
if find_addrs:
found_states = [s for s in sm.active if s.addr in find_addrs]
if found_states:
sm.found.extend(found_states)
sm.active = [s for s in sm.active if s.addr not in find_addrs]
# Avoid specified addresses
if avoid_addrs:
sm.active = [s for s in sm.active if s.addr not in avoid_addrs]
if step_count % 100 == 0:
print(f"Step {step_count}: {len(sm.active)} active, {len(sm.found)} found")
# Analyze results
results = {
'found_states': len(sm.found),
'deadended_states': len(sm.deadended),
'errored_states': len(sm.errored),
'total_steps': step_count,
'solutions': []
}
# Extract solutions from found states
for state in sm.found:
solution = {}
for var_name in state.solver.variables:
if state.solver.satisfiable():
concrete_value = state.solver.eval(var_name, cast_to=bytes)
solution[str(var_name)] = concrete_value
results['solutions'].append(solution)
self.analysis_results[target_function] = results
return results
# Example usage
def advanced_symbolic_analysis_example():
"""Example of advanced symbolic analysis"""
binary_path = "/bin/ls"
project = angr.Project(binary_path, auto_load_libs=False)
analyzer = AdvancedSymbolicAnalysis(project)
# Configuration for analysis
analysis_config = {
'memory_model': 'paged',
'max_steps': 500,
'symbolic_inputs': [
{
'name': 'user_input',
'address': 0x7fff0000,
'size': 100
}
],
'find_addresses': [], # Add target addresses
'avoid_addresses': [] # Add addresses to avoid
}
# Run analysis
results = analyzer.run_comprehensive_analysis(
target_function=project.entry,
analysis_config=analysis_config
)
print(f"Analysis completed: {results}")
return analyzer, results
if __name__ == "__main__":
analyzer, results = advanced_symbolic_analysis_example()
Análisis estadístico y descubrimiento de código
Función y análisis del código
# Static analysis and code discovery with angr
import angr
import networkx as nx
from collections import defaultdict
class StaticAnalyzer:
def __init__(self, project):
self.project = project
self.cfg = None
self.functions = {}
self.call_graph = None
self.analysis_cache = {}
def generate_comprehensive_cfg(self):
"""Generate comprehensive CFG with multiple techniques"""
print("Generating comprehensive CFG...")
# Generate fast CFG first
cfg_fast = self.project.analyses.CFGFast(
normalize=True,
resolve_indirect_jumps=True,
force_complete_scan=False
)
# Generate accurate CFG for better precision
cfg_accurate = self.project.analyses.CFGEmulated(
context_sensitivity_level=1,
keep_state=True,
state_add_options=angr.sim_options.refs,
state_remove_options=angr.sim_options.simplification
)
# Combine results
self.cfg = cfg_accurate if cfg_accurate.functions else cfg_fast
self.functions = self.cfg.functions
print(f"CFG generated: {len(self.cfg.nodes())} nodes, {len(self.functions)} functions")
return self.cfg
def analyze_function_complexity(self, function_addr):
"""Analyze function complexity metrics"""
if function_addr not in self.functions:
return None
func = self.functions[function_addr]
# Basic metrics
complexity = {
'name': func.name,
'address': hex(func.addr),
'size': func.size,
'basic_blocks': len(func.blocks),
'instructions': sum(block.instructions for block in func.blocks),
'cyclomatic_complexity': 0,
'call_sites': len(func.get_call_sites()),
'callers': len(func.callers),
'callees': len(func.callees),
'loops': 0,
'depth': 0
}
# Calculate cyclomatic complexity
# V(G) = E - N + 2P (where E=edges, N=nodes, P=connected components)
if func.graph:
edges = len(func.graph.edges())
nodes = len(func.graph.nodes())
complexity['cyclomatic_complexity'] = edges - nodes + 2
# Detect loops
if func.graph:
try:
cycles = list(nx.simple_cycles(func.graph))
complexity['loops'] = len(cycles)
except:
complexity['loops'] = 0
# Calculate call depth
complexity['depth'] = self.calculate_call_depth(func)
# Analyze instruction types
instruction_types = defaultdict(int)
for block in func.blocks:
for insn in block.capstone.insns:
instruction_types[insn.mnemonic] += 1
complexity['instruction_distribution'] = dict(instruction_types)
# Complexity classification
if complexity['cyclomatic_complexity'] > 20:
complexity['complexity_level'] = 'very_high'
elif complexity['cyclomatic_complexity'] > 10:
complexity['complexity_level'] = 'high'
elif complexity['cyclomatic_complexity'] > 5:
complexity['complexity_level'] = 'medium'
else:
complexity['complexity_level'] = 'low'
return complexity
def calculate_call_depth(self, func, visited=None, depth=0):
"""Calculate maximum call depth from function"""
if visited is None:
visited = set()
if func.addr in visited or depth > 20: # Prevent infinite recursion
return depth
visited.add(func.addr)
max_depth = depth
for callee in func.callees:
if callee.addr in self.functions:
callee_func = self.functions[callee.addr]
callee_depth = self.calculate_call_depth(callee_func, visited.copy(), depth + 1)
max_depth = max(max_depth, callee_depth)
return max_depth
def find_code_patterns(self):
"""Find interesting code patterns"""
patterns = {
'crypto_functions': [],
'string_functions': [],
'file_operations': [],
'network_operations': [],
'system_calls': [],
'anti_analysis': [],
'obfuscation': []
}
for addr, func in self.functions.items():
func_name = func.name.lower()
# Cryptographic functions
crypto_keywords = ['crypt', 'hash', 'md5', 'sha', 'aes', 'des', 'rsa', 'cipher']
if any(keyword in func_name for keyword in crypto_keywords):
patterns['crypto_functions'].append((func.name, hex(addr)))
# String manipulation functions
string_keywords = ['str', 'mem', 'copy', 'cmp', 'len', 'cat', 'chr']
if any(keyword in func_name for keyword in string_keywords):
patterns['string_functions'].append((func.name, hex(addr)))
# File operations
file_keywords = ['file', 'open', 'read', 'write', 'close', 'fopen', 'fread']
if any(keyword in func_name for keyword in file_keywords):
patterns['file_operations'].append((func.name, hex(addr)))
# Network operations
network_keywords = ['socket', 'connect', 'send', 'recv', 'bind', 'listen', 'accept']
if any(keyword in func_name for keyword in network_keywords):
patterns['network_operations'].append((func.name, hex(addr)))
# System calls
syscall_keywords = ['system', 'exec', 'fork', 'exit', 'kill', 'signal']
if any(keyword in func_name for keyword in syscall_keywords):
patterns['system_calls'].append((func.name, hex(addr)))
# Anti-analysis techniques
anti_keywords = ['debug', 'trace', 'ptrace', 'isdebuggerpresent', 'checkremotedebugger']
if any(keyword in func_name for keyword in anti_keywords):
patterns['anti_analysis'].append((func.name, hex(addr)))
# Analyze function content for patterns
self.analyze_function_content(func, patterns)
# Print results
for category, functions in patterns.items():
if functions:
print(f"\n{category.replace('_', ' ').title()}: {len(functions)}")
for func_name, addr in functions[:5]: # Show first 5
print(f" {func_name} at {addr}")
return patterns
def analyze_function_content(self, func, patterns):
"""Analyze function content for specific patterns"""
for block in func.blocks:
for insn in block.capstone.insns:
mnemonic = insn.mnemonic.lower()
# Look for obfuscation patterns
if mnemonic in ['nop', 'xor', 'rol', 'ror', 'shl', 'shr']:
# Check for excessive use of these instructions
obfuscation_count = sum(1 for b in func.blocks
for i in b.capstone.insns
if i.mnemonic.lower() in ['nop', 'xor', 'rol', 'ror'])
if obfuscation_count > 10: # Threshold for obfuscation
if (func.name, hex(func.addr)) not in patterns['obfuscation']:
patterns['obfuscation'].append((func.name, hex(func.addr)))
# Look for anti-debugging instructions
if mnemonic in ['rdtsc', 'cpuid', 'int']:
if (func.name, hex(func.addr)) not in patterns['anti_analysis']:
patterns['anti_analysis'].append((func.name, hex(func.addr)))
def build_call_graph(self):
"""Build comprehensive call graph"""
if self.cfg is None:
self.generate_comprehensive_cfg()
self.call_graph = nx.DiGraph()
# Add nodes (functions)
for addr, func in self.functions.items():
self.call_graph.add_node(addr,
name=func.name,
size=func.size,
complexity=len(func.blocks))
# Add edges (calls)
for addr, func in self.functions.items():
for callee in func.callees:
if callee.addr in self.functions:
self.call_graph.add_edge(addr, callee.addr)
print(f"Call graph built: {len(self.call_graph.nodes())} functions, {len(self.call_graph.edges())} calls")
return self.call_graph
def analyze_call_graph_properties(self):
"""Analyze call graph properties"""
if self.call_graph is None:
self.build_call_graph()
analysis = {
'total_functions': len(self.call_graph.nodes()),
'total_calls': len(self.call_graph.edges()),
'strongly_connected_components': len(list(nx.strongly_connected_components(self.call_graph))),
'weakly_connected_components': len(list(nx.weakly_connected_components(self.call_graph))),
'average_degree': sum(dict(self.call_graph.degree()).values()) / len(self.call_graph.nodes()) if self.call_graph.nodes() else 0,
'density': nx.density(self.call_graph),
'diameter': 0,
'radius': 0
}
# Calculate diameter and radius (for connected components)
try:
if nx.is_weakly_connected(self.call_graph):
undirected = self.call_graph.to_undirected()
analysis['diameter'] = nx.diameter(undirected)
analysis['radius'] = nx.radius(undirected)
except:
pass
# Find central functions
centrality_measures = {
'degree_centrality': nx.degree_centrality(self.call_graph),
'betweenness_centrality': nx.betweenness_centrality(self.call_graph),
'closeness_centrality': nx.closeness_centrality(self.call_graph),
'pagerank': nx.pagerank(self.call_graph)
}
# Get top functions for each centrality measure
analysis['central_functions'] = {}
for measure, values in centrality_measures.items():
top_functions = sorted(values.items(), key=lambda x: x[1], reverse=True)[:5]
analysis['central_functions'][measure] = [
(self.functions[addr].name, hex(addr), score)
for addr, score in top_functions
]
return analysis
def find_entry_points(self):
"""Find all possible entry points"""
entry_points = {
'main_entry': self.project.entry,
'exported_functions': [],
'constructor_functions': [],
'exception_handlers': [],
'signal_handlers': []
}
# Find exported functions
for symbol in self.project.loader.main_object.symbols:
if symbol.is_export and symbol.rebased_addr in self.functions:
entry_points['exported_functions'].append({
'name': symbol.name,
'address': hex(symbol.rebased_addr),
'type': 'export'
})
# Find constructor functions
for addr, func in self.functions.items():
if 'init' in func.name.lower() or 'constructor' in func.name.lower():
entry_points['constructor_functions'].append({
'name': func.name,
'address': hex(addr),
'type': 'constructor'
})
# Find exception handlers (simplified)
try:
if hasattr(self.project.loader.main_object, 'exception_handoff_table'):
for handler in self.project.loader.main_object.exception_handoff_table:
if handler in self.functions:
entry_points['exception_handlers'].append({
'name': self.functions[handler].name,
'address': hex(handler),
'type': 'exception_handler'
})
except:
pass
return entry_points
def analyze_data_references(self):
"""Analyze data references and constants"""
data_analysis = {
'string_references': [],
'numeric_constants': [],
'function_pointers': [],
'global_variables': []
}
# Analyze each function for data references
for addr, func in self.functions.items():
for block in func.blocks:
for insn in block.capstone.insns:
# Look for memory references
if insn.operands:
for operand in insn.operands:
if operand.type == 3: # Memory operand
mem_addr = operand.value.mem.disp
# Check if it's a string reference
try:
string_data = self.project.loader.memory.load(mem_addr, 100)
if self.is_printable_string(string_data):
data_analysis['string_references'].append({
'function': func.name,
'address': hex(mem_addr),
'string': string_data.decode('ascii', errors='ignore')[:50]
})
except:
pass
# Check if it's a function pointer
if mem_addr in self.functions:
data_analysis['function_pointers'].append({
'function': func.name,
'target': self.functions[mem_addr].name,
'address': hex(mem_addr)
})
elif operand.type == 2: # Immediate operand
value = operand.value.imm
# Collect interesting constants
if self.is_interesting_constant(value):
data_analysis['numeric_constants'].append({
'function': func.name,
'value': hex(value),
'decimal': value,
'instruction': f"{insn.mnemonic} {insn.op_str}"
})
return data_analysis
def is_printable_string(self, data):
"""Check if data represents a printable string"""
if len(data) < 4:
return False
# Check for null terminator
null_pos = data.find(b'\x00')
if null_pos == -1:
return False
string_data = data[:null_pos]
# Check if mostly printable ASCII
printable_count = sum(1 for byte in string_data if 32 <= byte <= 126)
return printable_count / len(string_data) > 0.8 if string_data else False
def is_interesting_constant(self, value):
"""Check if constant is potentially interesting"""
# Common interesting constants
interesting_values = {
0x41414141, # 'AAAA'
0x42424242, # 'BBBB'
0xdeadbeef,
0xcafebabe,
0xfeedface,
0x12345678,
0x87654321
}
if value in interesting_values:
return True
# Large constants that might be addresses or keys
if value > 0x10000000:
return True
# Powers of 2
if value > 0 and (value & (value - 1)) == 0:
return True
return False
def generate_analysis_report(self, output_file="static_analysis_report.txt"):
"""Generate comprehensive static analysis report"""
print("Generating static analysis report...")
# Run all analyses
if self.cfg is None:
self.generate_comprehensive_cfg()
patterns = self.find_code_patterns()
call_graph_analysis = self.analyze_call_graph_properties()
entry_points = self.find_entry_points()
data_analysis = self.analyze_data_references()
# Generate report
with open(output_file, 'w') as f:
f.write("STATIC ANALYSIS REPORT\n")
f.write("=" * 50 + "\n\n")
# Binary information
f.write("BINARY INFORMATION\n")
f.write("-" * 20 + "\n")
f.write(f"Architecture: {self.project.arch.name}\n")
f.write(f"Entry point: {hex(self.project.entry)}\n")
f.write(f"Base address: {hex(self.project.loader.main_object.min_addr)}\n")
f.write(f"Binary size: {self.project.loader.main_object.max_addr - self.project.loader.main_object.min_addr} bytes\n")
f.write(f"Functions discovered: {len(self.functions)}\n\n")
# Call graph analysis
f.write("CALL GRAPH ANALYSIS\n")
f.write("-" * 20 + "\n")
for key, value in call_graph_analysis.items():
if key != 'central_functions':
f.write(f"{key}: {value}\n")
f.write("\nCentral Functions:\n")
for measure, functions in call_graph_analysis['central_functions'].items():
f.write(f" {measure}:\n")
for name, addr, score in functions:
f.write(f" {name} ({addr}): {score:.3f}\n")
f.write("\n")
# Code patterns
f.write("CODE PATTERNS\n")
f.write("-" * 20 + "\n")
for category, functions in patterns.items():
if functions:
f.write(f"{category}: {len(functions)} functions\n")
for name, addr in functions[:3]: # Show first 3
f.write(f" {name} at {addr}\n")
f.write("\n")
# Entry points
f.write("ENTRY POINTS\n")
f.write("-" * 20 + "\n")
f.write(f"Main entry: {hex(entry_points['main_entry'])}\n")
f.write(f"Exported functions: {len(entry_points['exported_functions'])}\n")
f.write(f"Constructor functions: {len(entry_points['constructor_functions'])}\n")
f.write("\n")
# Data analysis
f.write("DATA ANALYSIS\n")
f.write("-" * 20 + "\n")
f.write(f"String references: {len(data_analysis['string_references'])}\n")
f.write(f"Numeric constants: {len(data_analysis['numeric_constants'])}\n")
f.write(f"Function pointers: {len(data_analysis['function_pointers'])}\n")
f.write("\n")
# Top complex functions
f.write("COMPLEX FUNCTIONS\n")
f.write("-" * 20 + "\n")
complex_functions = []
for addr, func in self.functions.items():
complexity = self.analyze_function_complexity(addr)
if complexity and complexity['cyclomatic_complexity'] > 10:
complex_functions.append(complexity)
complex_functions.sort(key=lambda x: x['cyclomatic_complexity'], reverse=True)
for func in complex_functions[:10]: # Top 10
f.write(f"{func['name']} ({func['address']}): "
f"CC={func['cyclomatic_complexity']}, "
f"Size={func['size']}, "
f"Blocks={func['basic_blocks']}\n")
print(f"Static analysis report saved to {output_file}")
return output_file
# Example usage
def comprehensive_static_analysis(binary_path):
"""Perform comprehensive static analysis"""
# Load binary
project = angr.Project(binary_path, auto_load_libs=False)
# Create analyzer
analyzer = StaticAnalyzer(project)
# Generate CFG
cfg = analyzer.generate_comprehensive_cfg()
# Analyze specific functions
main_symbol = project.loader.main_object.get_symbol('main')
if main_symbol:
main_complexity = analyzer.analyze_function_complexity(main_symbol.rebased_addr)
print(f"Main function complexity: {main_complexity}")
# Generate comprehensive report
report_file = analyzer.generate_analysis_report()
return analyzer, report_file
if __name__ == "__main__":
binary_path = "/bin/ls"
analyzer, report = comprehensive_static_analysis(binary_path)
print(f"Analysis completed. Report: {report}")
Automatización e integración
Pipelines de análisis automatizados
# Automated analysis pipelines with angr
import angr
import json
import time
import logging
import multiprocessing
from pathlib import Path
from datetime import datetime
class AnalysisPipeline:
def __init__(self, config_file=None):
self.config = self.load_config(config_file)
self.setup_logging()
self.results = {}
self.analysis_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
def load_config(self, config_file):
"""Load pipeline configuration"""
default_config = {
"analysis_modules": [
"cfg_analysis",
"function_analysis",
"symbolic_execution",
"vulnerability_detection",
"static_analysis"
],
"symbolic_execution": {
"max_steps": 1000,
"max_states": 50,
"timeout": 300
},
"vulnerability_detection": {
"buffer_overflow": True,
"format_string": True,
"integer_overflow": True,
"use_after_free": True
},
"output": {
"formats": ["json", "html", "txt"],
"directory": "./analysis_results"
},
"performance": {
"parallel_workers": 4,
"memory_limit": "4GB",
"timeout_per_binary": 1800
}
}
if config_file and Path(config_file).exists():
with open(config_file, 'r') as f:
user_config = json.load(f)
# Merge configurations
for key, value in user_config.items():
if isinstance(value, dict) and key in default_config:
default_config[key].update(value)
else:
default_config[key] = value
return default_config
def setup_logging(self):
"""Setup logging for the pipeline"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('analysis_pipeline.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# Reduce angr logging verbosity
logging.getLogger('angr').setLevel(logging.WARNING)
logging.getLogger('cle').setLevel(logging.WARNING)
logging.getLogger('pyvex').setLevel(logging.WARNING)
def analyze_binary(self, binary_path, analysis_id=None):
"""Analyze single binary"""
if analysis_id is None:
analysis_id = f"analysis_{int(time.time())}"
self.logger.info(f"Starting analysis of {binary_path} (ID: {analysis_id})")
start_time = time.time()
analysis_result = {
'binary_path': str(binary_path),
'analysis_id': analysis_id,
'start_time': datetime.now().isoformat(),
'status': 'running',
'modules': {},
'errors': [],
'warnings': []
}
try:
# Load binary
project = angr.Project(binary_path, auto_load_libs=False)
analysis_result['binary_info'] = {
'architecture': project.arch.name,
'entry_point': hex(project.entry),
'base_address': hex(project.loader.main_object.min_addr),
'size': project.loader.main_object.max_addr - project.loader.main_object.min_addr
}
# Run analysis modules
for module in self.config['analysis_modules']:
try:
self.logger.info(f"Running module: {module}")
module_result = self.run_analysis_module(project, module)
analysis_result['modules'][module] = module_result
except Exception as e:
error_msg = f"Error in module {module}: {str(e)}"
self.logger.error(error_msg)
analysis_result['errors'].append(error_msg)
analysis_result['status'] = 'completed'
except Exception as e:
error_msg = f"Failed to analyze {binary_path}: {str(e)}"
self.logger.error(error_msg)
analysis_result['status'] = 'failed'
analysis_result['errors'].append(error_msg)
finally:
analysis_result['end_time'] = datetime.now().isoformat()
analysis_result['duration'] = time.time() - start_time
self.results[analysis_id] = analysis_result
self.save_results(analysis_result)
return analysis_result
def run_analysis_module(self, project, module_name):
"""Run specific analysis module"""
if module_name == "cfg_analysis":
return self.run_cfg_analysis(project)
elif module_name == "function_analysis":
return self.run_function_analysis(project)
elif module_name == "symbolic_execution":
return self.run_symbolic_execution(project)
elif module_name == "vulnerability_detection":
return self.run_vulnerability_detection(project)
elif module_name == "static_analysis":
return self.run_static_analysis(project)
else:
raise ValueError(f"Unknown analysis module: {module_name}")
def run_cfg_analysis(self, project):
"""Run CFG analysis module"""
cfg = project.analyses.CFGFast()
result = {
'total_functions': len(cfg.functions),
'total_nodes': len(cfg.nodes()),
'total_edges': len(cfg.edges()),
'entry_point': hex(project.entry),
'functions': []
}
# Analyze top 10 largest functions
sorted_functions = sorted(cfg.functions.items(),
key=lambda x: x[1].size, reverse=True)
for addr, func in sorted_functions[:10]:
func_info = {
'name': func.name,
'address': hex(addr),
'size': func.size,
'blocks': len(func.blocks),
'callers': len(func.callers),
'callees': len(func.callees)
}
result['functions'].append(func_info)
return result
def run_function_analysis(self, project):
"""Run function analysis module"""
cfg = project.analyses.CFGFast()
result = {
'total_functions': len(cfg.functions),
'complexity_distribution': {'low': 0, 'medium': 0, 'high': 0, 'very_high': 0},
'interesting_functions': {
'large_functions': [],
'complex_functions': [],
'recursive_functions': []
}
}
for addr, func in cfg.functions.items():
# Calculate complexity
complexity = len(func.blocks)
if complexity <= 5:
result['complexity_distribution']['low'] += 1
elif complexity <= 15:
result['complexity_distribution']['medium'] += 1
elif complexity <= 30:
result['complexity_distribution']['high'] += 1
else:
result['complexity_distribution']['very_high'] += 1
# Find interesting functions
if func.size > 1000:
result['interesting_functions']['large_functions'].append({
'name': func.name,
'address': hex(addr),
'size': func.size
})
if complexity > 20:
result['interesting_functions']['complex_functions'].append({
'name': func.name,
'address': hex(addr),
'complexity': complexity
})
# Check for recursion
if addr in [callee.addr for callee in func.callees]:
result['interesting_functions']['recursive_functions'].append({
'name': func.name,
'address': hex(addr)
})
return result
def run_symbolic_execution(self, project):
"""Run symbolic execution module"""
config = self.config['symbolic_execution']
result = {
'executed': False,
'states_explored': 0,
'paths_found': 0,
'errors': [],
'timeout': False
}
try:
# Create initial state
initial_state = project.factory.entry_state()
# Run symbolic execution with timeout
sm = project.factory.simulation_manager(initial_state)
start_time = time.time()
step_count = 0
while (sm.active and
step_count < config['max_steps'] and
time.time() - start_time < config['timeout']):
sm.step()
step_count += 1
# Limit active states
if len(sm.active) > config['max_states']:
sm.active = sm.active[:config['max_states']]
result['executed'] = True
result['states_explored'] = step_count
result['paths_found'] = len(sm.deadended) + len(sm.found)
if time.time() - start_time >= config['timeout']:
result['timeout'] = True
except Exception as e:
result['errors'].append(str(e))
return result
def run_vulnerability_detection(self, project):
"""Run vulnerability detection module"""
config = self.config['vulnerability_detection']
result = {
'vulnerabilities_found': [],
'checks_performed': []
}
cfg = project.analyses.CFGFast()
# Buffer overflow detection
if config.get('buffer_overflow', True):
result['checks_performed'].append('buffer_overflow')
for addr, func in cfg.functions.items():
if 'strcpy' in func.name or 'gets' in func.name or 'sprintf' in func.name:
result['vulnerabilities_found'].append({
'type': 'potential_buffer_overflow',
'function': func.name,
'address': hex(addr),
'description': f'Dangerous function {func.name} detected'
})
# Format string detection
if config.get('format_string', True):
result['checks_performed'].append('format_string')
for addr, func in cfg.functions.items():
if 'printf' in func.name and 'sprintf' not in func.name:
result['vulnerabilities_found'].append({
'type': 'potential_format_string',
'function': func.name,
'address': hex(addr),
'description': f'Format string function {func.name} detected'
})
return result
def run_static_analysis(self, project):
"""Run static analysis module"""
result = {
'strings': [],
'imports': [],
'exports': [],
'sections': []
}
# Extract strings
try:
for addr, string in project.loader.main_object.strings.items():
if len(string) > 4 and string.isprintable():
result['strings'].append({
'address': hex(addr),
'value': string[:100] # Truncate long strings
})
except:
pass
# Extract imports
try:
for symbol in project.loader.main_object.imports:
result['imports'].append({
'name': symbol.name,
'address': hex(symbol.rebased_addr) if symbol.rebased_addr else None
})
except:
pass
# Extract exports
try:
for symbol in project.loader.main_object.exports:
result['exports'].append({
'name': symbol.name,
'address': hex(symbol.rebased_addr)
})
except:
pass
return result
def batch_analyze(self, binary_paths, parallel=True):
"""Analyze multiple binaries"""
self.logger.info(f"Starting batch analysis of {len(binary_paths)} binaries")
if parallel and len(binary_paths) > 1:
return self.parallel_batch_analyze(binary_paths)
else:
return self.sequential_batch_analyze(binary_paths)
def sequential_batch_analyze(self, binary_paths):
"""Sequential batch analysis"""
results = []
for i, binary_path in enumerate(binary_paths):
self.logger.info(f"Analyzing binary {i+1}/{len(binary_paths)}: {binary_path}")
try:
result = self.analyze_binary(binary_path)
results.append(result)
except Exception as e:
self.logger.error(f"Failed to analyze {binary_path}: {e}")
results.append({
'binary_path': str(binary_path),
'status': 'failed',
'error': str(e)
})
return results
def parallel_batch_analyze(self, binary_paths):
"""Parallel batch analysis"""
num_workers = min(self.config['performance']['parallel_workers'], len(binary_paths))
# Create worker processes
workers = []
for i in range(num_workers):
worker = multiprocessing.Process(
target=self.analysis_worker,
args=(self.analysis_queue, self.result_queue)
)
workers.append(worker)
worker.start()
# Add tasks to queue
for binary_path in binary_paths:
self.analysis_queue.put(binary_path)
# Add sentinel values to stop workers
for _ in range(num_workers):
self.analysis_queue.put(None)
# Collect results
results = []
for _ in range(len(binary_paths)):
result = self.result_queue.get()
results.append(result)
# Wait for workers to finish
for worker in workers:
worker.join()
return results
def analysis_worker(self, task_queue, result_queue):
"""Worker process for parallel analysis"""
while True:
binary_path = task_queue.get()
if binary_path is None: # Sentinel value
break
try:
result = self.analyze_binary(binary_path)
result_queue.put(result)
except Exception as e:
result_queue.put({
'binary_path': str(binary_path),
'status': 'failed',
'error': str(e)
})
def save_results(self, analysis_result):
"""Save analysis results"""
output_dir = Path(self.config['output']['directory'])
output_dir.mkdir(exist_ok=True)
analysis_id = analysis_result['analysis_id']
# Save JSON result
if 'json' in self.config['output']['formats']:
json_file = output_dir / f"{analysis_id}.json"
with open(json_file, 'w') as f:
json.dump(analysis_result, f, indent=2)
# Save text report
if 'txt' in self.config['output']['formats']:
txt_file = output_dir / f"{analysis_id}.txt"
self.generate_text_report(analysis_result, txt_file)
# Save HTML report
if 'html' in self.config['output']['formats']:
html_file = output_dir / f"{analysis_id}.html"
self.generate_html_report(analysis_result, html_file)
def generate_text_report(self, analysis_result, output_file):
"""Generate text report"""
with open(output_file, 'w') as f:
f.write("ANGR ANALYSIS REPORT\n")
f.write("=" * 50 + "\n\n")
# Basic information
f.write(f"Binary: {analysis_result['binary_path']}\n")
f.write(f"Analysis ID: {analysis_result['analysis_id']}\n")
f.write(f"Status: {analysis_result['status']}\n")
f.write(f"Duration: {analysis_result.get('duration', 0):.2f} seconds\n\n")
# Binary information
if 'binary_info' in analysis_result:
info = analysis_result['binary_info']
f.write("BINARY INFORMATION\n")
f.write("-" * 20 + "\n")
for key, value in info.items():
f.write(f"{key}: {value}\n")
f.write("\n")
# Module results
for module, result in analysis_result.get('modules', {}).items():
f.write(f"{module.upper().replace('_', ' ')}\n")
f.write("-" * 20 + "\n")
self.write_module_result(f, result)
f.write("\n")
# Errors and warnings
if analysis_result.get('errors'):
f.write("ERRORS\n")
f.write("-" * 20 + "\n")
for error in analysis_result['errors']:
f.write(f"- {error}\n")
f.write("\n")
def write_module_result(self, file_handle, result):
"""Write module result to file"""
if isinstance(result, dict):
for key, value in result.items():
if isinstance(value, (list, dict)) and len(str(value)) > 100:
file_handle.write(f"{key}: {type(value).__name__} with {len(value)} items\n")
else:
file_handle.write(f"{key}: {value}\n")
else:
file_handle.write(f"Result: {result}\n")
def generate_html_report(self, analysis_result, output_file):
"""Generate HTML report"""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>angr Analysis Report - {analysis_result['analysis_id']}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background-color: #f0f0f0; padding: 10px; border-radius: 5px; }}
.section {{ margin: 20px 0; }}
.error {{ color: red; }}
.warning {{ color: orange; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<div class="header">
<h1>angr Analysis Report</h1>
<p><strong>Binary:</strong> {analysis_result['binary_path']}</p>
<p><strong>Analysis ID:</strong> {analysis_result['analysis_id']}</p>
<p><strong>Status:</strong> {analysis_result['status']}</p>
<p><strong>Duration:</strong> {analysis_result.get('duration', 0):.2f} seconds</p>
</div>
"""
# Add module results
for module, result in analysis_result.get('modules', {}).items():
html_content += f'<div class="section"><h2>{module.replace("_", " ").title()}</h2>'
html_content += self.format_result_html(result)
html_content += '</div>'
html_content += """
</body>
</html>
"""
with open(output_file, 'w') as f:
f.write(html_content)
def format_result_html(self, result):
"""Format result as HTML"""
if isinstance(result, dict):
html = "<table>"
for key, value in result.items():
html += f"<tr><td><strong>{key}</strong></td><td>{value}</td></tr>"
html += "</table>"
return html
else:
return f"<p>{result}</p>"
# Example usage and configuration
def create_analysis_pipeline():
"""Create and configure analysis pipeline"""
# Create configuration
config = {
"analysis_modules": [
"cfg_analysis",
"function_analysis",
"static_analysis",
"vulnerability_detection"
],
"output": {
"formats": ["json", "txt", "html"],
"directory": "./angr_analysis_results"
},
"performance": {
"parallel_workers": 2,
"timeout_per_binary": 300
}
}
# Save configuration
with open('pipeline_config.json', 'w') as f:
json.dump(config, f, indent=2)
# Create pipeline
pipeline = AnalysisPipeline('pipeline_config.json')
return pipeline
def run_pipeline_example():
"""Example of running analysis pipeline"""
# Create pipeline
pipeline = create_analysis_pipeline()
# Analyze single binary
binary_path = "/bin/ls"
result = pipeline.analyze_binary(binary_path)
print(f"Analysis completed: {result['status']}")
print(f"Duration: {result.get('duration', 0):.2f} seconds")
# Batch analysis example
binary_paths = ["/bin/ls", "/bin/cat", "/bin/echo"]
batch_results = pipeline.batch_analyze(binary_paths, parallel=True)
print(f"Batch analysis completed: {len(batch_results)} binaries")
return pipeline, batch_results
if __name__ == "__main__":
pipeline, results = run_pipeline_example()
print("Pipeline example completed")
Las mejores prácticas y consejos
Optimización del rendimiento
# angr performance optimization techniques
import angr
import angr.options as o
class PerformanceOptimizer:
def __init__(self):
self.optimization_strategies = {
'memory': self.optimize_memory,
'solver': self.optimize_solver,
'state_management': self.optimize_state_management,
'analysis': self.optimize_analysis
}
def optimize_memory(self, project, state=None):
"""Optimize memory usage"""
# Memory optimization options
memory_opts = {
o.ABSTRACT_MEMORY,
o.FAST_MEMORY,
o.APPROXIMATE_MEMORY_SIZES,
o.APPROXIMATE_MEMORY_INDICES
}
if state:
state.options.update(memory_opts)
# Configure memory backend
if state:
state.memory.set_state_options(
write_strategies=[angr.storage.memory_mixins.DefaultMemory.write_strategy_store],
read_strategies=[angr.storage.memory_mixins.DefaultMemory.read_strategy_load]
)
return memory_opts
def optimize_solver(self, project, state=None):
"""Optimize constraint solver"""
# Solver optimization options
solver_opts = {
o.ABSTRACT_SOLVER,
o.APPROXIMATE_FIRST,
o.APPROXIMATE_GUARDS,
o.APPROXIMATE_SATISFIABILITY
}
if state:
state.options.update(solver_opts)
# Use faster solver backend
state.solver._solver = angr.solvers.SolverCacheless(
backend=angr.solvers.backends.z3
)
# Set solver timeout
state.solver.timeout = 5000 # 5 seconds
return solver_opts
def optimize_state_management(self, simulation_manager):
"""Optimize state management"""
# Limit number of active states
max_active = 20
if len(simulation_manager.active) > max_active:
# Keep most promising states
scored_states = []
for state in simulation_manager.active:
score = self.calculate_state_score(state)
scored_states.append((score, state))
scored_states.sort(key=lambda x: x[0], reverse=True)
simulation_manager.active = [state for _, state in scored_states[:max_active]]
# Merge similar states
simulation_manager.merge()
return simulation_manager
def calculate_state_score(self, state):
"""Calculate state priority score"""
score = 0
# Prefer states with fewer constraints
score += 1000 / (len(state.solver.constraints) + 1)
# Prefer states that have made more progress
score += state.history.depth
# Prefer states in main executable
main_object = state.project.loader.main_object
if main_object.min_addr <= state.addr <= main_object.max_addr:
score += 100
return score
def optimize_analysis(self, project):
"""Optimize analysis settings"""
# CFG optimization
cfg_opts = {
'normalize': True,
'resolve_indirect_jumps': False, # Disable for speed
'force_complete_scan': False,
'show_progressbar': False
}
return cfg_opts
def create_optimized_state(self, project, addr=None):
"""Create optimized initial state"""
if addr is None:
addr = project.entry
# Performance-oriented options
perf_options = {
o.ABSTRACT_SOLVER,
o.ABSTRACT_MEMORY,
o.FAST_MEMORY,
o.FAST_REGISTERS,
o.APPROXIMATE_FIRST,
o.APPROXIMATE_GUARDS,
o.LAZY_SOLVES
}
# Create state with optimizations
state = project.factory.entry_state(
addr=addr,
add_options=perf_options
)
# Additional optimizations
self.optimize_memory(project, state)
self.optimize_solver(project, state)
return state
# Error handling and debugging
class AngrDebugger:
def __init__(self, project):
self.project = project
self.debug_info = {}
def setup_debugging(self, state):
"""Setup debugging for state"""
# Enable tracking options
debug_options = {
o.TRACK_MEMORY_ACTIONS,
o.TRACK_REGISTER_ACTIONS,
o.TRACK_JMP_ACTIONS,
o.TRACK_CONSTRAINT_ACTIONS
}
state.options.update(debug_options)
# Setup breakpoints
state.inspect.b('mem_read', when=angr.BP_BEFORE, action=self.on_memory_read)
state.inspect.b('mem_write', when=angr.BP_BEFORE, action=self.on_memory_write)
state.inspect.b('call', when=angr.BP_BEFORE, action=self.on_function_call)
return state
def on_memory_read(self, state):
"""Handle memory read events"""
addr = state.inspect.mem_read_address
size = state.inspect.mem_read_length
if state.solver.symbolic(addr):
print(f"Symbolic memory read at {addr} (size: {size})")
def on_memory_write(self, state):
"""Handle memory write events"""
addr = state.inspect.mem_write_address
data = state.inspect.mem_write_expr
if state.solver.symbolic(addr) or state.solver.symbolic(data):
print(f"Symbolic memory write at {addr}: {data}")
def on_function_call(self, state):
"""Handle function call events"""
target = state.inspect.function_address
print(f"Function call to {hex(target)}")
def analyze_state_history(self, state):
"""Analyze state execution history"""
history = []
current = state
while current.history.parent is not None:
history.append({
'address': hex(current.addr),
'depth': current.history.depth,
'constraints': len(current.solver.constraints),
'actions': len(current.history.actions)
})
current = current.history.parent
history.reverse()
return history
def debug_constraint_solving(self, state):
"""Debug constraint solving issues"""
print(f"Constraint count: {len(state.solver.constraints)}")
print(f"Variables: {list(state.solver.variables)}")
# Check satisfiability
if not state.solver.satisfiable():
print("State is unsatisfiable!")
# Find conflicting constraints
for i, constraint in enumerate(state.solver.constraints):
temp_solver = state.solver.branch()
temp_solver.add(constraint)
if not temp_solver.satisfiable():
print(f"Constraint {i} makes state unsatisfiable: {constraint}")
# Check solver performance
import time
start_time = time.time()
state.solver.satisfiable()
solve_time = time.time() - start_time
if solve_time > 1.0:
print(f"Slow constraint solving: {solve_time:.2f} seconds")
# Best practices guide
def angr_best_practices():
"""Guide to angr best practices"""
practices = {
"Performance": [
"Use ABSTRACT_MEMORY and ABSTRACT_SOLVER options for better performance",
"Limit the number of active states to prevent state explosion",
"Use timeouts for constraint solving",
"Disable unnecessary tracking options",
"Use CFGFast instead of CFGEmulated when possible"
],
"Memory Management": [
"Use FAST_MEMORY option for large binaries",
"Avoid storing large amounts of data in state globals",
"Clear unused states regularly",
"Use memory-mapped files for large inputs"
],
"Symbolic Execution": [
"Start with concrete inputs and gradually make them symbolic",
"Use find/avoid addresses to guide exploration",
"Implement custom exploration techniques for complex targets",
"Use state merging to reduce path explosion"
],
"Debugging": [
"Use state.inspect for debugging symbolic execution",
"Enable tracking options only when needed",
"Analyze constraint complexity regularly",
"Use logging to track analysis progress"
],
"Analysis Design": [
"Break complex analysis into smaller modules",
"Use caching for expensive computations",
"Implement timeouts for all analysis phases",
"Validate results with multiple techniques"
]
}
return practices
# Example of optimized analysis
def optimized_analysis_example(binary_path):
"""Example of optimized angr analysis"""
# Load project
project = angr.Project(binary_path, auto_load_libs=False)
# Create optimizer
optimizer = PerformanceOptimizer()
# Create optimized state
initial_state = optimizer.create_optimized_state(project)
# Setup debugging if needed
debugger = AngrDebugger(project)
# initial_state = debugger.setup_debugging(initial_state)
# Run optimized symbolic execution
sm = project.factory.simulation_manager(initial_state)
# Exploration with optimization
step_count = 0
max_steps = 100
while sm.active and step_count < max_steps:
sm.step()
step_count += 1
# Apply optimizations
sm = optimizer.optimize_state_management(sm)
if step_count % 10 == 0:
print(f"Step {step_count}: {len(sm.active)} active states")
print(f"Analysis completed: {step_count} steps")
print(f"Final states: {len(sm.active)} active, {len(sm.deadended)} deadended")
return sm
if __name__ == "__main__":
binary_path = "/bin/ls"
sm = optimized_analysis_example(binary_path)
# Print best practices
practices = angr_best_practices()
print("\nangr Best Practices:")
for category, tips in practices.items():
print(f"\n{category}:")
for tip in tips:
print(f" - {tip}")
Recursos
Documentación y aprendizaje
- angr Documentation - Documentación general oficial
- angr API Reference - Documentación completa de API
- angr Ejemplos - Guiones de ejemplo oficiales
- angr Blog - Actualizaciones y tutoriales más recientes
Formación y Tutoriales
- angr CTF - Desafíos de la CTF
- Binary Analysis Course - Modern Binary Exploitation course
- Tutorial de ejecución simbólica - Guía de ejecución simbólica
- Grupo de Manejo - GUI for angr
Comunidad y Apoyo
- angr Slack - chat comunitario y soporte
- angr GitHub - Código fuente y cuestiones
- angr Mailing List - Debates de desarrollo
- Stack Overflow - Q PulA with angr tag
Herramientas e integración relacionadas
- Ghidra angr Plugin - Integración Ghidra
- IDA angr Plugin - IDA Integración profesional
- angr-utils - Funciones de utilidad y ayudantes
- Manticore - Motor de ejecución simbólica alternativa