Ninja binaire Feuille de chaleur
Aperçu général
Binary Ninja est une plateforme moderne et légère d'ingénierie inverse qui fournit de puissantes capacités d'analyse grâce à son système de langage intermédiaire unique (IL). Développé par Vector 35, il offre une interface scriptable avec les API Python et C, ce qui le rend idéal pour l'analyse interactive et les flux de travail automatisés d'ingénierie inverse.
C'est-à-dire Key Strengths: représentation intermédiaire basée sur SSA, analyse en direct, API Python étendue, support multiplateforme et écosystème de plugin actif.
Installation et configuration
Installation de licence commerciale
# Download from official website
# Visit: https://binary.ninja/
# Choose Personal, Commercial, or Educational license
# Linux Installation:
wget https://cdn.binary.ninja/installers/BinaryNinja-personal.zip
unzip BinaryNinja-personal.zip
cd binaryninja
./binaryninja
# macOS Installation:
# Download .dmg file and drag to Applications folder
# Windows Installation:
# Download .exe installer and run with administrator privileges
Version Cloud/gratuite
# Binary Ninja Cloud (Free tier available)
# Access at: https://cloud.binary.ninja/
# Features: Limited analysis time, cloud-based processing
# Good for: Learning, small projects, evaluation
# Educational License:
# Free for students and educators
# Apply at: https://binary.ninja/educational/
```_
### Installation du plugin
```bash
# Plugin Manager (GUI):
# Tools -> Manage Plugins -> Browse Online
# Manual Plugin Installation:
# Linux/macOS: ~/.binaryninja/plugins/
# Windows: %APPDATA%\Binary Ninja\plugins\
# Popular Plugins:
# - Binja Toolkit: Enhanced UI and analysis features
# - Sourcery: Source code recovery and analysis
# - Binja Debugger: Integrated debugging capabilities
# - Kaitai Struct: Binary format parsing
```_
## Utilisation et navigation de base
### Aperçu de l'interface
```bash
# Main Components:
# 1. Disassembly View - Assembly code with analysis
# 2. Linear View - Raw disassembly without analysis
# 3. Graph View - Control flow graph visualization
# 4. Hex View - Raw binary data
# 5. Types View - Data type definitions
# 6. Strings View - String references
# 7. Log View - Analysis progress and messages
# View Switching:
# Tab key: Cycle through views
# Ctrl+1: Disassembly view
# Ctrl+2: Graph view
# Ctrl+3: Linear view
# Ctrl+4: Hex view
Commandes de navigation
# Basic Navigation:
# G: Go to address/function
# Space: Switch between graph and linear view
# Tab: Switch between disassembly views
# Esc: Go back in navigation history
# Ctrl+G: Go to address dialog
# Function Navigation:
# Ctrl+J: Jump to function
# Ctrl+Shift+J: Jump to function by name
# P: Go to previous function
# N: Go to next function
# Enter: Follow reference/call
# Cross-References:
# X: Show cross-references to current location
# Ctrl+X: Show cross-references from current location
# Shift+X: Show data cross-references
Contrôle d'analyse
# Analysis Management:
# Ctrl+A: Start/restart analysis
# Ctrl+Shift+A: Analysis options
# F5: Force function analysis
# U: Undefine function/data
# D: Define data at cursor
# Function Management:
# F: Create function at cursor
# Alt+F: Edit function properties
# Ctrl+F: Find text/bytes
# Y: Set function type
Système de langue intermédiaire (IL)
Aperçu des niveaux d'IL
# Binary Ninja's Multi-Level IL System:
# 1. Low Level IL (LLIL) - Architecture-specific
# 2. Medium Level IL (MLIL) - Architecture-independent
# 3. High Level IL (HLIL) - C-like representation
# 4. Static Single Assignment (SSA) forms for each level
# Accessing IL in Python API:
import binaryninja as bn
# Load binary
bv = bn.open_view("/path/to/binary")
# Get function
func = bv.get_function_at(0x401000)
# Access different IL levels
llil = func.llil
mlil = func.mlil
hlil = func.hlil
# SSA forms
llil_ssa = func.llil.ssa_form
mlil_ssa = func.mlil.ssa_form
hlil_ssa = func.hlil.ssa_form
Exemples d'analyses IL
# Analyze function control flow
def analyze_function_complexity(func):
"""Calculate cyclomatic complexity using MLIL"""
mlil = func.mlil
if not mlil:
return 0
# Count decision points
decision_points = 0
for block in mlil.basic_blocks:
for instr in block:
if instr.operation in [
bn.MediumLevelILOperation.MLIL_IF,
bn.MediumLevelILOperation.MLIL_WHILE,
bn.MediumLevelILOperation.MLIL_FOR,
bn.MediumLevelILOperation.MLIL_SWITCH
]:
decision_points += 1
# Cyclomatic complexity = edges - nodes + 2
edges = sum(len(block.outgoing_edges) for block in mlil.basic_blocks)
nodes = len(mlil.basic_blocks)
complexity = edges - nodes + 2
return complexity
# Find function calls in HLIL
def find_function_calls(func):
"""Extract all function calls from HLIL"""
calls = []
hlil = func.hlil
if not hlil:
return calls
for block in hlil.basic_blocks:
for instr in block:
if instr.operation == bn.HighLevelILOperation.HLIL_CALL:
dest = instr.dest
if hasattr(dest, 'constant'):
# Direct call
target_addr = dest.constant
target_func = func.view.get_function_at(target_addr)
if target_func:
calls.append({
'address': instr.address,
'target': target_func.name,
'target_address': target_addr
})
else:
# Indirect call
calls.append({
'address': instr.address,
'target': 'indirect',
'expression': str(dest)
})
return calls
# Data flow analysis using SSA
def trace_variable_usage(func, var_name):
"""Trace usage of a variable through SSA form"""
mlil_ssa = func.mlil.ssa_form
if not mlil_ssa:
return []
usage_points = []
for block in mlil_ssa.basic_blocks:
for instr in block:
# Check for variable definitions
if hasattr(instr, 'dest') and str(instr.dest).startswith(var_name):
usage_points.append({
'address': instr.address,
'type': 'definition',
'instruction': str(instr)
})
# Check for variable uses
if hasattr(instr, 'src') and str(instr.src).find(var_name) != -1:
usage_points.append({
'address': instr.address,
'type': 'use',
'instruction': str(instr)
})
return usage_points
API et scripts Python
API de base Utilisation
import binaryninja as bn
from binaryninja import log
# Open binary file
bv = bn.open_view("/path/to/binary")
if not bv:
log.log_error("Failed to open binary")
exit(1)
# Basic binary information
print(f"Architecture: {bv.arch.name}")
print(f"Platform: {bv.platform.name}")
print(f"Entry point: 0x{bv.entry_point:x}")
print(f"Start address: 0x{bv.start:x}")
print(f"End address: 0x{bv.end:x}")
# Get all functions
functions = bv.functions
print(f"Total functions: {len(functions)}")
# Iterate through functions
for func in functions:
print(f"Function: {func.name} at 0x{func.start:x}")
print(f" Size: {len(func)} bytes")
print(f" Basic blocks: {len(func.basic_blocks)}")
Scénarios d'analyse avancée
# Comprehensive binary analysis script
class BinaryAnalyzer:
def __init__(self, binary_path):
self.bv = bn.open_view(binary_path)
if not self.bv:
raise ValueError(f"Cannot open binary: {binary_path}")
# Wait for analysis to complete
self.bv.update_analysis_and_wait()
def analyze_strings(self):
"""Analyze string references and usage"""
strings_analysis = {
'total_strings': 0,
'referenced_strings': 0,
'unreferenced_strings': 0,
'string_details': []
}
for string in self.bv.strings:
string_info = {
'address': string.start,
'length': string.length,
'value': string.value,
'type': string.string_type.name,
'references': []
}
# Find references to this string
refs = self.bv.get_code_refs(string.start)
for ref in refs:
func = self.bv.get_function_at(ref.address)
string_info['references'].append({
'address': ref.address,
'function': func.name if func else 'unknown'
})
strings_analysis['string_details'].append(string_info)
strings_analysis['total_strings'] += 1
if string_info['references']:
strings_analysis['referenced_strings'] += 1
else:
strings_analysis['unreferenced_strings'] += 1
return strings_analysis
def find_crypto_constants(self):
"""Search for cryptographic constants"""
crypto_constants = {
'md5': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],
'sha1': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0],
'sha256': [0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a],
'aes_sbox': [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5],
'rc4_sbox': list(range(256))
}
found_constants = []
for crypto_name, constants in crypto_constants.items():
for constant in constants[:4]: # Check first few constants
# Search for 32-bit constant
results = self.bv.find_all_constant(constant, 4)
for addr in results:
found_constants.append({
'algorithm': crypto_name,
'constant': hex(constant),
'address': hex(addr),
'context': self.get_context_info(addr)
})
return found_constants
def analyze_imports(self):
"""Analyze imported functions and libraries"""
imports_analysis = {
'total_imports': 0,
'libraries': {},
'dangerous_functions': [],
'network_functions': [],
'crypto_functions': []
}
# Dangerous function patterns
dangerous_patterns = [
'strcpy', 'strcat', 'sprintf', 'gets', 'scanf',
'system', 'exec', 'eval', 'shell'
]
network_patterns = [
'socket', 'connect', 'bind', 'listen', 'accept',
'send', 'recv', 'WSA', 'inet_'
]
crypto_patterns = [
'crypt', 'hash', 'md5', 'sha', 'aes', 'des',
'rsa', 'encrypt', 'decrypt', 'cipher'
]
for symbol in self.bv.symbols:
if symbol.type == bn.SymbolType.ImportedFunctionSymbol:
func_name = symbol.name
imports_analysis['total_imports'] += 1
# Extract library name
if '@' in func_name:
lib_name = func_name.split('@')[1]
else:
lib_name = 'unknown'
if lib_name not in imports_analysis['libraries']:
imports_analysis['libraries'][lib_name] = []
imports_analysis['libraries'][lib_name].append(func_name)
# Categorize functions
func_lower = func_name.lower()
if any(pattern in func_lower for pattern in dangerous_patterns):
imports_analysis['dangerous_functions'].append(func_name)
if any(pattern in func_lower for pattern in network_patterns):
imports_analysis['network_functions'].append(func_name)
if any(pattern in func_lower for pattern in crypto_patterns):
imports_analysis['crypto_functions'].append(func_name)
return imports_analysis
def detect_packing(self):
"""Detect potential packing or obfuscation"""
packing_indicators = {
'high_entropy_sections': [],
'unusual_entry_point': False,
'few_imports': False,
'suspicious_sections': [],
'packed_probability': 0.0
}
# Check entropy of sections
for section in self.bv.sections:
section_data = self.bv.read(section.start, section.length)
if section_data:
entropy = self.calculate_entropy(section_data)
if entropy > 7.0: # High entropy threshold
packing_indicators['high_entropy_sections'].append({
'name': section.name,
'entropy': entropy,
'address': hex(section.start),
'size': section.length
})
# Check entry point location
entry_point = self.bv.entry_point
entry_section = self.bv.get_section_at(entry_point)
if entry_section and entry_section.name not in ['.text', 'CODE']:
packing_indicators['unusual_entry_point'] = True
# Check import count
import_count = len([s for s in self.bv.symbols
if s.type == bn.SymbolType.ImportedFunctionSymbol])
if import_count < 10:
packing_indicators['few_imports'] = True
# Calculate packing probability
score = 0
if packing_indicators['high_entropy_sections']:
score += 0.4
if packing_indicators['unusual_entry_point']:
score += 0.3
if packing_indicators['few_imports']:
score += 0.3
packing_indicators['packed_probability'] = score
return packing_indicators
def calculate_entropy(self, data):
"""Calculate Shannon entropy of data"""
import math
from collections import Counter
if not data:
return 0
# Count byte frequencies
byte_counts = Counter(data)
data_len = len(data)
# Calculate entropy
entropy = 0
for count in byte_counts.values():
probability = count / data_len
entropy -= probability * math.log2(probability)
return entropy
def get_context_info(self, address):
"""Get context information for an address"""
func = self.bv.get_function_at(address)
if func:
return f"Function: {func.name}"
section = self.bv.get_section_at(address)
if section:
return f"Section: {section.name}"
return "Unknown context"
def generate_report(self):
"""Generate comprehensive analysis report"""
report = {
'binary_info': {
'architecture': self.bv.arch.name,
'platform': self.bv.platform.name,
'entry_point': hex(self.bv.entry_point),
'file_size': self.bv.end - self.bv.start,
'function_count': len(self.bv.functions)
},
'strings_analysis': self.analyze_strings(),
'crypto_constants': self.find_crypto_constants(),
'imports_analysis': self.analyze_imports(),
'packing_detection': self.detect_packing()
}
return report
# Usage example
analyzer = BinaryAnalyzer("/path/to/binary")
report = analyzer.generate_report()
# Print summary
print("=== Binary Analysis Report ===")
print(f"Architecture: {report['binary_info']['architecture']}")
print(f"Functions: {report['binary_info']['function_count']}")
print(f"Strings: {report['strings_analysis']['total_strings']}")
print(f"Imports: {report['imports_analysis']['total_imports']}")
print(f"Packing probability: {report['packing_detection']['packed_probability']:.2f}")
Développement de plugins
# Basic Binary Ninja plugin structure
from binaryninja import *
import json
class CustomAnalysisPlugin:
def __init__(self):
self.name = "Custom Analysis Plugin"
self.description = "Performs custom binary analysis"
def analyze_function_calls(self, bv):
"""Analyze function call patterns"""
call_graph = {}
for func in bv.functions:
call_graph[func.name] = {
'address': hex(func.start),
'calls_to': [],
'called_by': []
}
# Find functions called by this function
for block in func.basic_blocks:
for instr in block:
if instr.operation == LowLevelILOperation.LLIL_CALL:
target = instr.dest
if hasattr(target, 'constant'):
target_func = bv.get_function_at(target.constant)
if target_func:
call_graph[func.name]['calls_to'].append(target_func.name)
# Add reverse reference
if target_func.name not in call_graph:
call_graph[target_func.name] = {
'address': hex(target_func.start),
'calls_to': [],
'called_by': []
}
call_graph[target_func.name]['called_by'].append(func.name)
return call_graph
def find_vulnerabilities(self, bv):
"""Search for potential vulnerabilities"""
vulnerabilities = []
# Dangerous function calls
dangerous_functions = {
'strcpy': 'Buffer overflow risk',
'strcat': 'Buffer overflow risk',
'sprintf': 'Format string vulnerability',
'gets': 'Buffer overflow risk',
'system': 'Command injection risk'
}
for func in bv.functions:
for block in func.basic_blocks:
for instr in block:
if instr.operation == LowLevelILOperation.LLIL_CALL:
target = instr.dest
if hasattr(target, 'constant'):
target_func = bv.get_function_at(target.constant)
if target_func and target_func.name in dangerous_functions:
vulnerabilities.append({
'type': 'dangerous_function',
'function': target_func.name,
'risk': dangerous_functions[target_func.name],
'location': hex(instr.address),
'caller': func.name
})
return vulnerabilities
# Plugin registration
def register_plugin():
plugin = CustomAnalysisPlugin()
def run_analysis(bv):
# Run custom analysis
call_graph = plugin.analyze_function_calls(bv)
vulnerabilities = plugin.find_vulnerabilities(bv)
# Display results
log.log_info(f"Found {len(call_graph)} functions in call graph")
log.log_info(f"Found {len(vulnerabilities)} potential vulnerabilities")
# Save results to file
results = {
'call_graph': call_graph,
'vulnerabilities': vulnerabilities
}
with open('analysis_results.json', 'w') as f:
json.dump(results, f, indent=2)
log.log_info("Analysis results saved to analysis_results.json")
# Register menu item
PluginCommand.register(
"Custom Analysis\\Run Analysis",
"Run custom binary analysis",
run_analysis
)
# Call registration function
register_plugin()
Techniques d'analyse avancées
Analyse du débit de contrôle
# Advanced control flow analysis
def analyze_control_flow(func):
"""Comprehensive control flow analysis"""
analysis = {
'basic_blocks': len(func.basic_blocks),
'edges': 0,
'loops': [],
'unreachable_blocks': [],
'complexity_metrics': {}
}
# Count edges
for block in func.basic_blocks:
analysis['edges'] += len(block.outgoing_edges)
# Detect loops using dominance analysis
dominators = func.dominators
for block in func.basic_blocks:
for edge in block.outgoing_edges:
target = edge.target
# Back edge indicates loop
if target in dominators[block]:
analysis['loops'].append({
'header': hex(target.start),
'back_edge': hex(block.start)
})
# Find unreachable blocks
reachable = set()
def dfs_reachable(block):
if block in reachable:
return
reachable.add(block)
for edge in block.outgoing_edges:
dfs_reachable(edge.target)
# Start from entry block
if func.basic_blocks:
dfs_reachable(func.basic_blocks[0])
for block in func.basic_blocks:
if block not in reachable:
analysis['unreachable_blocks'].append(hex(block.start))
# Calculate complexity metrics
nodes = analysis['basic_blocks']
edges = analysis['edges']
analysis['complexity_metrics'] = {
'cyclomatic_complexity': edges - nodes + 2,
'essential_complexity': len(analysis['loops']),
'npath_complexity': calculate_npath_complexity(func)
}
return analysis
def calculate_npath_complexity(func):
"""Calculate nPath complexity"""
complexity = 1
for block in func.basic_blocks:
# Count decision points
if len(block.outgoing_edges) > 1:
complexity *= len(block.outgoing_edges)
# Account for loops
for edge in block.outgoing_edges:
if edge.target.start <= block.start: # Back edge
complexity *= 2
return complexity
Analyse du flux de données
# Advanced data flow analysis
class DataFlowAnalyzer:
def __init__(self, func):
self.func = func
self.mlil = func.mlil
self.ssa = func.mlil.ssa_form if func.mlil else None
def analyze_variable_definitions(self):
"""Track variable definitions and uses"""
if not self.ssa:
return {}
definitions = {}
uses = {}
for block in self.ssa.basic_blocks:
for instr in block:
# Track definitions
if hasattr(instr, 'dest') and instr.dest:
var_name = str(instr.dest)
if var_name not in definitions:
definitions[var_name] = []
definitions[var_name].append({
'address': instr.address,
'instruction': str(instr),
'block': block.index
})
# Track uses
for operand in instr.operands:
if hasattr(operand, 'src') and operand.src:
var_name = str(operand.src)
if var_name not in uses:
uses[var_name] = []
uses[var_name].append({
'address': instr.address,
'instruction': str(instr),
'block': block.index
})
return {'definitions': definitions, 'uses': uses}
def find_uninitialized_variables(self):
"""Find potentially uninitialized variables"""
analysis = self.analyze_variable_definitions()
definitions = analysis['definitions']
uses = analysis['uses']
uninitialized = []
for var_name, use_list in uses.items():
if var_name not in definitions:
# Variable used but never defined in this function
uninitialized.append({
'variable': var_name,
'first_use': use_list[0],
'all_uses': use_list
})
return uninitialized
def trace_taint_propagation(self, source_vars):
"""Trace taint propagation from source variables"""
if not self.ssa:
return []
tainted = set(source_vars)
taint_flow = []
for block in self.ssa.basic_blocks:
for instr in block:
# Check if instruction uses tainted data
uses_tainted = False
for operand in instr.operands:
if hasattr(operand, 'src') and str(operand.src) in tainted:
uses_tainted = True
break
if uses_tainted:
# Mark destination as tainted
if hasattr(instr, 'dest') and instr.dest:
dest_var = str(instr.dest)
tainted.add(dest_var)
taint_flow.append({
'address': instr.address,
'instruction': str(instr),
'tainted_dest': dest_var,
'operation': instr.operation.name
})
return taint_flow
# Usage example
analyzer = DataFlowAnalyzer(func)
var_analysis = analyzer.analyze_variable_definitions()
uninitialized = analyzer.find_uninitialized_variables()
taint_flow = analyzer.trace_taint_propagation(['user_input', 'argv'])
Analyse cryptographique
# Cryptographic algorithm detection
class CryptoAnalyzer:
def __init__(self, bv):
self.bv = bv
self.crypto_signatures = self.load_crypto_signatures()
def load_crypto_signatures(self):
"""Load cryptographic algorithm signatures"""
return {
'aes': {
'sbox': [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5],
'rcon': [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80],
'constants': [0x63636363, 0x7c7c7c7c]
},
'des': {
'initial_permutation': [58, 50, 42, 34, 26, 18, 10, 2],
'sboxes': [
[14, 4, 13, 1, 2, 15, 11, 8, 3, 10, 6, 12, 5, 9, 0, 7],
[0, 15, 7, 4, 14, 2, 13, 1, 10, 6, 12, 11, 9, 5, 3, 8]
]
},
'md5': {
'constants': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],
'round_constants': [0xd76aa478, 0xe8c7b756, 0x242070db, 0xc1bdceee]
},
'sha1': {
'constants': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0],
'round_constants': [0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xca62c1d6]
},
'rsa': {
'common_exponents': [3, 17, 65537],
'key_sizes': [1024, 2048, 3072, 4096]
}
}
def detect_crypto_algorithms(self):
"""Detect cryptographic algorithms in binary"""
detected = []
for algo_name, signatures in self.crypto_signatures.items():
matches = self.search_algorithm_signatures(algo_name, signatures)
if matches:
detected.extend(matches)
return detected
def search_algorithm_signatures(self, algo_name, signatures):
"""Search for specific algorithm signatures"""
matches = []
# Search for constants
if 'constants' in signatures:
for constant in signatures['constants']:
addresses = self.bv.find_all_constant(constant, 4)
for addr in addresses:
matches.append({
'algorithm': algo_name,
'type': 'constant',
'value': hex(constant),
'address': hex(addr),
'confidence': 0.7
})
# Search for S-boxes
if 'sbox' in signatures:
sbox_matches = self.search_sbox_pattern(signatures['sbox'])
for match in sbox_matches:
matches.append({
'algorithm': algo_name,
'type': 'sbox',
'address': hex(match),
'confidence': 0.9
})
# Search for permutation tables
if 'initial_permutation' in signatures:
perm_matches = self.search_permutation_table(signatures['initial_permutation'])
for match in perm_matches:
matches.append({
'algorithm': algo_name,
'type': 'permutation_table',
'address': hex(match),
'confidence': 0.8
})
return matches
def search_sbox_pattern(self, sbox):
"""Search for S-box patterns in binary"""
matches = []
sbox_bytes = bytes(sbox)
# Search for exact S-box pattern
current_addr = self.bv.start
while current_addr < self.bv.end:
data = self.bv.read(current_addr, len(sbox_bytes))
if data == sbox_bytes:
matches.append(current_addr)
current_addr += len(sbox_bytes)
else:
current_addr += 1
return matches
def search_permutation_table(self, perm_table):
"""Search for permutation table patterns"""
matches = []
# Convert to bytes and search
if all(x < 256 for x in perm_table):
perm_bytes = bytes(perm_table)
current_addr = self.bv.start
while current_addr < self.bv.end:
data = self.bv.read(current_addr, len(perm_bytes))
if data == perm_bytes:
matches.append(current_addr)
current_addr += len(perm_bytes)
else:
current_addr += 1
return matches
def analyze_crypto_functions(self):
"""Analyze functions that might implement crypto"""
crypto_functions = []
for func in self.bv.functions:
score = self.calculate_crypto_score(func)
if score > 0.5:
crypto_functions.append({
'function': func.name,
'address': hex(func.start),
'score': score,
'indicators': self.get_crypto_indicators(func)
})
return crypto_functions
def calculate_crypto_score(self, func):
"""Calculate likelihood that function implements crypto"""
score = 0.0
# Check for bit manipulation operations
bit_ops = 0
arithmetic_ops = 0
if func.mlil:
for block in func.mlil.basic_blocks:
for instr in block:
op = instr.operation
# Bit operations
if op in [bn.MediumLevelILOperation.MLIL_XOR,
bn.MediumLevelILOperation.MLIL_AND,
bn.MediumLevelILOperation.MLIL_OR,
bn.MediumLevelILOperation.MLIL_LSL,
bn.MediumLevelILOperation.MLIL_LSR,
bn.MediumLevelILOperation.MLIL_ROR,
bn.MediumLevelILOperation.MLIL_ROL]:
bit_ops += 1
# Arithmetic operations
elif op in [bn.MediumLevelILOperation.MLIL_ADD,
bn.MediumLevelILOperation.MLIL_SUB,
bn.MediumLevelILOperation.MLIL_MUL]:
arithmetic_ops += 1
# High ratio of bit operations suggests crypto
total_ops = bit_ops + arithmetic_ops
if total_ops > 0:
bit_ratio = bit_ops / total_ops
if bit_ratio > 0.3:
score += 0.4
# Check for loops (common in crypto)
if len(func.basic_blocks) > 5:
score += 0.2
# Check for constants
constants_found = 0
for algo_name, signatures in self.crypto_signatures.items():
if 'constants' in signatures:
for constant in signatures['constants']:
if self.function_contains_constant(func, constant):
constants_found += 1
if constants_found > 0:
score += 0.4
return min(score, 1.0)
def function_contains_constant(self, func, constant):
"""Check if function contains specific constant"""
for addr in range(func.start, func.start + len(func)):
try:
value = self.bv.read_int(addr, 4, False) # Little endian
if value == constant:
return True
value = self.bv.read_int(addr, 4, True) # Big endian
if value == constant:
return True
except:
continue
return False
def get_crypto_indicators(self, func):
"""Get specific crypto indicators for function"""
indicators = []
# Check for specific patterns
if self.has_substitution_pattern(func):
indicators.append('substitution_operations')
if self.has_permutation_pattern(func):
indicators.append('permutation_operations')
if self.has_key_schedule_pattern(func):
indicators.append('key_schedule_operations')
return indicators
def has_substitution_pattern(self, func):
"""Check for substitution box patterns"""
# Look for array indexing patterns common in S-boxes
return False # Simplified for example
def has_permutation_pattern(self, func):
"""Check for permutation patterns"""
# Look for bit manipulation patterns
return False # Simplified for example
def has_key_schedule_pattern(self, func):
"""Check for key schedule patterns"""
# Look for iterative key expansion patterns
return False # Simplified for example
# Usage example
crypto_analyzer = CryptoAnalyzer(bv)
detected_algos = crypto_analyzer.detect_crypto_algorithms()
crypto_functions = crypto_analyzer.analyze_crypto_functions()
print(f"Detected {len(detected_algos)} crypto algorithm signatures")
print(f"Found {len(crypto_functions)} potential crypto functions")
Débogue et analyse dynamique
Intégration du débogueur
# Binary Ninja debugger integration
import binaryninja as bn
from binaryninja.debugger import DebuggerController
class DebuggerHelper:
def __init__(self, bv):
self.bv = bv
self.debugger = None
self.breakpoints = {}
self.watchpoints = {}
def start_debugging(self, target_path, args=None):
"""Start debugging session"""
try:
# Create debugger controller
self.debugger = DebuggerController(self.bv)
# Set target
self.debugger.set_target(target_path, args or [])
# Launch process
self.debugger.launch()
log.log_info(f"Started debugging: {target_path}")
return True
except Exception as e:
log.log_error(f"Failed to start debugger: {e}")
return False
def set_breakpoint(self, address, condition=None):
"""Set breakpoint at address"""
if not self.debugger:
log.log_error("Debugger not started")
return False
try:
bp_id = self.debugger.add_breakpoint(address)
self.breakpoints[address] = {
'id': bp_id,
'condition': condition,
'hit_count': 0
}
log.log_info(f"Breakpoint set at 0x{address:x}")
return True
except Exception as e:
log.log_error(f"Failed to set breakpoint: {e}")
return False
def set_watchpoint(self, address, size, access_type='rw'):
"""Set memory watchpoint"""
if not self.debugger:
log.log_error("Debugger not started")
return False
try:
wp_id = self.debugger.add_watchpoint(address, size, access_type)
self.watchpoints[address] = {
'id': wp_id,
'size': size,
'access_type': access_type,
'hit_count': 0
}
log.log_info(f"Watchpoint set at 0x{address:x} ({size} bytes, {access_type})")
return True
except Exception as e:
log.log_error(f"Failed to set watchpoint: {e}")
return False
def trace_execution(self, start_addr, end_addr, max_instructions=10000):
"""Trace execution between two addresses"""
if not self.debugger:
log.log_error("Debugger not started")
return []
trace = []
instruction_count = 0
# Set breakpoint at start
self.set_breakpoint(start_addr)
# Continue until start address
self.debugger.continue_execution()
# Single step and record
while instruction_count < max_instructions:
current_addr = self.debugger.get_current_address()
if current_addr == end_addr:
break
# Record instruction
instr_text = self.bv.get_disassembly(current_addr)
registers = self.debugger.get_registers()
trace.append({
'address': current_addr,
'instruction': instr_text,
'registers': registers.copy(),
'step': instruction_count
})
# Single step
self.debugger.step_into()
instruction_count += 1
return trace
def analyze_memory_access(self, address, size=0x1000):
"""Analyze memory access patterns"""
if not self.debugger:
log.log_error("Debugger not started")
return None
# Set watchpoint on memory region
self.set_watchpoint(address, size, 'rw')
access_log = []
# Continue execution and log accesses
while True:
try:
self.debugger.continue_execution()
# Check if watchpoint hit
if self.debugger.is_watchpoint_hit():
hit_addr = self.debugger.get_watchpoint_address()
access_type = self.debugger.get_access_type()
current_pc = self.debugger.get_current_address()
access_log.append({
'pc': current_pc,
'memory_address': hit_addr,
'access_type': access_type,
'timestamp': self.debugger.get_timestamp()
})
# Continue after hit
self.debugger.continue_execution()
except KeyboardInterrupt:
break
except Exception as e:
log.log_error(f"Error during memory analysis: {e}")
break
return access_log
# Usage example
debugger_helper = DebuggerHelper(bv)
# Start debugging
if debugger_helper.start_debugging("/path/to/target"):
# Set breakpoints
debugger_helper.set_breakpoint(0x401000)
debugger_helper.set_breakpoint(0x401500)
# Trace execution
trace = debugger_helper.trace_execution(0x401000, 0x401500)
# Analyze memory access
memory_access = debugger_helper.analyze_memory_access(0x402000, 0x1000)
Analyse de la couverture du code
# Code coverage analysis with Binary Ninja
class CoverageAnalyzer:
def __init__(self, bv):
self.bv = bv
self.coverage_data = {}
self.basic_block_hits = {}
self.function_coverage = {}
def initialize_coverage(self):
"""Initialize coverage tracking for all functions"""
for func in self.bv.functions:
self.function_coverage[func.start] = {
'name': func.name,
'total_blocks': len(func.basic_blocks),
'hit_blocks': set(),
'coverage_percentage': 0.0
}
for block in func.basic_blocks:
self.basic_block_hits[block.start] = {
'function': func.name,
'hit_count': 0,
'first_hit': None,
'last_hit': None
}
def record_execution(self, address):
"""Record execution of an address"""
# Find which basic block this address belongs to
func = self.bv.get_function_at(address)
if not func:
return
for block in func.basic_blocks:
if block.start <= address < block.end:
# Record block hit
if block.start in self.basic_block_hits:
self.basic_block_hits[block.start]['hit_count'] += 1
if not self.basic_block_hits[block.start]['first_hit']:
self.basic_block_hits[block.start]['first_hit'] = address
self.basic_block_hits[block.start]['last_hit'] = address
# Update function coverage
if func.start in self.function_coverage:
self.function_coverage[func.start]['hit_blocks'].add(block.start)
hit_count = len(self.function_coverage[func.start]['hit_blocks'])
total_count = self.function_coverage[func.start]['total_blocks']
self.function_coverage[func.start]['coverage_percentage'] = \
(hit_count / total_count) * 100.0
break
def import_coverage_data(self, coverage_file):
"""Import coverage data from external tool (e.g., DynamoRIO, Intel PIN)"""
try:
with open(coverage_file, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('0x'):
address = int(line, 16)
self.record_execution(address)
log.log_info(f"Imported coverage data from {coverage_file}")
except Exception as e:
log.log_error(f"Failed to import coverage data: {e}")
def generate_coverage_report(self):
"""Generate comprehensive coverage report"""
report = {
'summary': {
'total_functions': len(self.function_coverage),
'covered_functions': 0,
'total_basic_blocks': len(self.basic_block_hits),
'covered_basic_blocks': 0,
'overall_coverage': 0.0
},
'function_coverage': [],
'uncovered_functions': [],
'hot_spots': []
}
covered_blocks = 0
for func_addr, coverage in self.function_coverage.items():
if coverage['coverage_percentage'] > 0:
report['summary']['covered_functions'] += 1
report['function_coverage'].append({
'name': coverage['name'],
'address': hex(func_addr),
'coverage': coverage['coverage_percentage'],
'hit_blocks': len(coverage['hit_blocks']),
'total_blocks': coverage['total_blocks']
})
else:
report['uncovered_functions'].append({
'name': coverage['name'],
'address': hex(func_addr)
})
covered_blocks += len(coverage['hit_blocks'])
# Calculate overall coverage
if report['summary']['total_basic_blocks'] > 0:
report['summary']['overall_coverage'] = \
(covered_blocks / report['summary']['total_basic_blocks']) * 100.0
report['summary']['covered_basic_blocks'] = covered_blocks
# Find hot spots (frequently executed blocks)
hot_spots = []
for block_addr, hit_data in self.basic_block_hits.items():
if hit_data['hit_count'] > 100: # Threshold for hot spot
hot_spots.append({
'address': hex(block_addr),
'function': hit_data['function'],
'hit_count': hit_data['hit_count']
})
# Sort by hit count
hot_spots.sort(key=lambda x: x['hit_count'], reverse=True)
report['hot_spots'] = hot_spots[:20] # Top 20 hot spots
return report
def visualize_coverage(self, func_name):
"""Create coverage visualization for a function"""
func = None
for f in self.bv.functions:
if f.name == func_name:
func = f
break
if not func:
log.log_error(f"Function {func_name} not found")
return
# Create coverage map
coverage_map = {}
for block in func.basic_blocks:
is_covered = block.start in self.basic_block_hits and \
self.basic_block_hits[block.start]['hit_count'] > 0
hit_count = self.basic_block_hits.get(block.start, {}).get('hit_count', 0)
coverage_map[block.start] = {
'covered': is_covered,
'hit_count': hit_count,
'start': block.start,
'end': block.end,
'size': len(block)
}
# Apply coverage highlighting in Binary Ninja
for block_addr, coverage in coverage_map.items():
if coverage['covered']:
# Highlight covered blocks in green
self.bv.set_comment_at(block_addr, f"Coverage: {coverage['hit_count']} hits")
else:
# Highlight uncovered blocks in red
self.bv.set_comment_at(block_addr, "UNCOVERED")
log.log_info(f"Applied coverage visualization for {func_name}")
# Usage example
coverage_analyzer = CoverageAnalyzer(bv)
coverage_analyzer.initialize_coverage()
# Import coverage data from external tool
coverage_analyzer.import_coverage_data("coverage_trace.txt")
# Generate report
report = coverage_analyzer.generate_coverage_report()
print(f"Overall coverage: {report['summary']['overall_coverage']:.2f}%")
print(f"Covered functions: {report['summary']['covered_functions']}/{report['summary']['total_functions']}")
# Visualize coverage for specific function
coverage_analyzer.visualize_coverage("main")
Intégration et automatisation
Intégration CI/CD
# Binary Ninja in CI/CD pipelines
# GitHub Actions workflow example
name: Binary Analysis
on: [push, pull_request]
jobs:
binary-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Binary Ninja
run: |
# Download and install Binary Ninja headless
wget ${{ secrets.BINJA_DOWNLOAD_URL }}
unzip binaryninja-headless.zip
export PATH=$PATH:$(pwd)/binaryninja
- name: Run Binary Analysis
run: |
python3 analyze_binary.py --binary target_binary --output analysis_report.json
- name: Upload Analysis Results
uses: actions/upload-artifact@v2
with:
name: analysis-results
path: analysis_report.json
# Jenkins pipeline example
pipeline {
agent any
stages {
stage('Binary Analysis') {
steps {
script {
// Run Binary Ninja analysis
sh '''
python3 -c "
import binaryninja as bn
import json
bv = bn.open_view('${BINARY_PATH}')
bv.update_analysis_and_wait()
# Perform analysis
results = {
'functions': len(bv.functions),
'strings': len(bv.strings),
'imports': len([s for s in bv.symbols if s.type == bn.SymbolType.ImportedFunctionSymbol])
}
with open('analysis_results.json', 'w') as f:
json.dump(results, f)
"
'''
}
}
}
stage('Security Analysis') {
steps {
script {
// Run security-focused analysis
sh 'python3 security_analysis.py --input analysis_results.json'
}
}
}
}
post {
always {
archiveArtifacts artifacts: '*.json', fingerprint: true
}
}
}
Traitement par lots
# Batch processing multiple binaries
import os
import json
import multiprocessing
from pathlib import Path
import binaryninja as bn
class BatchAnalyzer:
def __init__(self, output_dir="batch_results"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def analyze_single_binary(self, binary_path):
"""Analyze a single binary file"""
try:
# Open binary
bv = bn.open_view(str(binary_path))
if not bv:
return {'error': f'Failed to open {binary_path}'}
# Wait for analysis
bv.update_analysis_and_wait()
# Collect analysis results
results = {
'file_path': str(binary_path),
'file_size': binary_path.stat().st_size,
'architecture': bv.arch.name,
'platform': bv.platform.name,
'entry_point': hex(bv.entry_point),
'functions': {
'total': len(bv.functions),
'named': len([f for f in bv.functions if not f.name.startswith('sub_')]),
'library': len([f for f in bv.functions if f.symbol and f.symbol.type == bn.SymbolType.LibraryFunctionSymbol])
},
'strings': {
'total': len(bv.strings),
'ascii': len([s for s in bv.strings if s.string_type == bn.StringType.AsciiString]),
'unicode': len([s for s in bv.strings if s.string_type == bn.StringType.Utf16String])
},
'imports': len([s for s in bv.symbols if s.type == bn.SymbolType.ImportedFunctionSymbol]),
'exports': len([s for s in bv.symbols if s.type == bn.SymbolType.FunctionSymbol and s.binding == bn.SymbolBinding.GlobalBinding]),
'sections': [
{
'name': section.name,
'start': hex(section.start),
'length': section.length,
'semantics': section.semantics.name
}
for section in bv.sections
]
}
# Perform additional analysis
results['security_analysis'] = self.perform_security_analysis(bv)
results['complexity_analysis'] = self.perform_complexity_analysis(bv)
# Close binary view
bv.file.close()
return results
except Exception as e:
return {'error': f'Analysis failed for {binary_path}: {str(e)}'}
def perform_security_analysis(self, bv):
"""Perform security-focused analysis"""
security_results = {
'dangerous_functions': [],
'crypto_indicators': [],
'packing_indicators': {},
'stack_strings': []
}
# Check for dangerous functions
dangerous_functions = [
'strcpy', 'strcat', 'sprintf', 'gets', 'scanf',
'system', 'exec', 'eval', 'shell'
]
for func in bv.functions:
if func.name.lower() in [df.lower() for df in dangerous_functions]:
security_results['dangerous_functions'].append({
'name': func.name,
'address': hex(func.start)
})
# Check for crypto constants
crypto_constants = [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476] # MD5/SHA1
for constant in crypto_constants:
addresses = bv.find_all_constant(constant, 4)
if addresses:
security_results['crypto_indicators'].append({
'constant': hex(constant),
'addresses': [hex(addr) for addr in addresses]
})
return security_results
def perform_complexity_analysis(self, bv):
"""Perform complexity analysis"""
complexity_results = {
'total_complexity': 0,
'average_complexity': 0.0,
'complex_functions': []
}
complexities = []
for func in bv.functions:
# Calculate cyclomatic complexity
edges = sum(len(block.outgoing_edges) for block in func.basic_blocks)
nodes = len(func.basic_blocks)
complexity = edges - nodes + 2 if nodes > 0 else 0
complexities.append(complexity)
if complexity > 10: # High complexity threshold
complexity_results['complex_functions'].append({
'name': func.name,
'address': hex(func.start),
'complexity': complexity
})
if complexities:
complexity_results['total_complexity'] = sum(complexities)
complexity_results['average_complexity'] = sum(complexities) / len(complexities)
return complexity_results
def analyze_directory(self, directory_path, file_patterns=None, max_workers=4):
"""Analyze all binaries in a directory"""
if file_patterns is None:
file_patterns = ['*.exe', '*.dll', '*.so', '*.dylib', '*']
# Find all binary files
binary_files = []
directory = Path(directory_path)
for pattern in file_patterns:
binary_files.extend(directory.glob(pattern))
# Filter out non-files and duplicates
binary_files = list(set([f for f in binary_files if f.is_file()]))
print(f"Found {len(binary_files)} files to analyze")
# Analyze files in parallel
with multiprocessing.Pool(max_workers) as pool:
results = pool.map(self.analyze_single_binary, binary_files)
# Save results
batch_results = {
'summary': {
'total_files': len(binary_files),
'successful_analyses': len([r for r in results if 'error' not in r]),
'failed_analyses': len([r for r in results if 'error' in r])
},
'results': results
}
output_file = self.output_dir / 'batch_analysis_results.json'
with open(output_file, 'w') as f:
json.dump(batch_results, f, indent=2)
print(f"Batch analysis complete. Results saved to {output_file}")
return batch_results
def generate_summary_report(self, results):
"""Generate summary report from batch results"""
if isinstance(results, (str, Path)):
# Load results from file
with open(results, 'r') as f:
results = json.load(f)
successful_results = [r for r in results['results'] if 'error' not in r]
# Architecture distribution
arch_dist = {}
for result in successful_results:
arch = result.get('architecture', 'unknown')
arch_dist[arch] = arch_dist.get(arch, 0) + 1
# Platform distribution
platform_dist = {}
for result in successful_results:
platform = result.get('platform', 'unknown')
platform_dist[platform] = platform_dist.get(platform, 0) + 1
# Security summary
total_dangerous_functions = sum(
len(result.get('security_analysis', {}).get('dangerous_functions', []))
for result in successful_results
)
files_with_crypto = len([
result for result in successful_results
if result.get('security_analysis', {}).get('crypto_indicators')
])
# Complexity summary
avg_complexity = sum(
result.get('complexity_analysis', {}).get('average_complexity', 0)
for result in successful_results
) / len(successful_results) if successful_results else 0
summary = {
'analysis_summary': results['summary'],
'architecture_distribution': arch_dist,
'platform_distribution': platform_dist,
'security_summary': {
'total_dangerous_functions': total_dangerous_functions,
'files_with_crypto_indicators': files_with_crypto,
'percentage_with_crypto': (files_with_crypto / len(successful_results)) * 100 if successful_results else 0
},
'complexity_summary': {
'average_complexity': avg_complexity,
'high_complexity_files': len([
result for result in successful_results
if result.get('complexity_analysis', {}).get('average_complexity', 0) > 10
])
}
}
# Save summary report
summary_file = self.output_dir / 'summary_report.json'
with open(summary_file, 'w') as f:
json.dump(summary, f, indent=2)
print(f"Summary report saved to {summary_file}")
return summary
# Usage example
batch_analyzer = BatchAnalyzer("analysis_results")
# Analyze all files in a directory
results = batch_analyzer.analyze_directory(
"/path/to/binaries",
file_patterns=['*.exe', '*.dll'],
max_workers=8
)
# Generate summary report
summary = batch_analyzer.generate_summary_report(results)
print("Batch Analysis Summary:")
print(f"- Total files: {summary['analysis_summary']['total_files']}")
print(f"- Successful analyses: {summary['analysis_summary']['successful_analyses']}")
print(f"- Average complexity: {summary['complexity_summary']['average_complexity']:.2f}")
print(f"- Files with crypto indicators: {summary['security_summary']['files_with_crypto_indicators']}")
Pratiques exemplaires et conseils
Optimisation des performances
# Performance optimization techniques
class PerformanceOptimizer:
def __init__(self, bv):
self.bv = bv
def optimize_analysis_settings(self):
"""Optimize Binary Ninja analysis settings for performance"""
# Get analysis settings
settings = self.bv.analysis_settings
# Disable expensive analysis for large binaries
if self.bv.end - self.bv.start > 50 * 1024 * 1024: # 50MB threshold
settings.set_bool('analysis.linearSweep.autorun', False)
settings.set_bool('analysis.signatureMatcher.autorun', False)
settings.set_int('analysis.limits.maxFunctionSize', 100000)
# Optimize for specific architectures
if self.bv.arch.name in ['x86', 'x86_64']:
settings.set_bool('analysis.x86.disassembly.simplifyFPUInstructions', True)
settings.set_bool('analysis.x86.disassembly.simplifySSEInstructions', True)
# Set reasonable limits
settings.set_int('analysis.limits.maxFunctionAnalysisTime', 300) # 5 minutes
settings.set_int('analysis.limits.maxBasicBlockAnalysisTime', 60) # 1 minute
log.log_info("Analysis settings optimized for performance")
def use_parallel_analysis(self, function_list=None):
"""Use parallel processing for function analysis"""
import concurrent.futures
functions = function_list or self.bv.functions
def analyze_function(func):
"""Analyze a single function"""
try:
# Force function analysis
func.reanalyze()
return {
'name': func.name,
'address': func.start,
'status': 'success'
}
except Exception as e:
return {
'name': func.name,
'address': func.start,
'status': 'error',
'error': str(e)
}
# Analyze functions in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(analyze_function, functions))
successful = len([r for r in results if r['status'] == 'success'])
failed = len([r for r in results if r['status'] == 'error'])
log.log_info(f"Parallel analysis complete: {successful} successful, {failed} failed")
return results
def cache_analysis_results(self, cache_file="analysis_cache.json"):
"""Cache analysis results for faster subsequent loads"""
cache_data = {
'binary_hash': self.calculate_binary_hash(),
'analysis_timestamp': time.time(),
'functions': [],
'strings': [],
'imports': []
}
# Cache function information
for func in self.bv.functions:
cache_data['functions'].append({
'name': func.name,
'start': func.start,
'end': func.start + len(func),
'basic_blocks': len(func.basic_blocks),
'complexity': self.calculate_function_complexity(func)
})
# Cache strings
for string in self.bv.strings:
cache_data['strings'].append({
'address': string.start,
'value': string.value,
'type': string.string_type.name
})
# Cache imports
for symbol in self.bv.symbols:
if symbol.type == bn.SymbolType.ImportedFunctionSymbol:
cache_data['imports'].append({
'name': symbol.name,
'address': symbol.address
})
# Save cache
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
log.log_info(f"Analysis results cached to {cache_file}")
def calculate_binary_hash(self):
"""Calculate hash of binary for cache validation"""
import hashlib
hasher = hashlib.sha256()
# Read binary in chunks
chunk_size = 64 * 1024
current_addr = self.bv.start
while current_addr < self.bv.end:
chunk_size = min(chunk_size, self.bv.end - current_addr)
data = self.bv.read(current_addr, chunk_size)
if data:
hasher.update(data)
current_addr += chunk_size
return hasher.hexdigest()
# Memory management for large binaries
class MemoryManager:
def __init__(self, bv):
self.bv = bv
self.cached_data = {}
self.cache_limit = 100 # Maximum cached items
def get_function_data(self, func_addr):
"""Get function data with caching"""
if func_addr in self.cached_data:
return self.cached_data[func_addr]
func = self.bv.get_function_at(func_addr)
if not func:
return None
# Extract function data
func_data = {
'name': func.name,
'basic_blocks': len(func.basic_blocks),
'instructions': [],
'calls': []
}
# Get instructions (limit to avoid memory issues)
for block in func.basic_blocks[:10]: # Limit blocks
for instr in block[:50]: # Limit instructions per block
func_data['instructions'].append({
'address': instr.address,
'text': str(instr)
})
# Cache data
if len(self.cached_data) >= self.cache_limit:
# Remove oldest entry
oldest_key = next(iter(self.cached_data))
del self.cached_data[oldest_key]
self.cached_data[func_addr] = func_data
return func_data
def clear_cache(self):
"""Clear cached data to free memory"""
self.cached_data.clear()
log.log_info("Memory cache cleared")
Gestion des erreurs et débogage
# Robust error handling for Binary Ninja scripts
import logging
import traceback
from functools import wraps
def safe_analysis(func):
"""Decorator for safe analysis functions"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
log.log_error(f"Error in {func.__name__}: {str(e)}")
log.log_error(f"Traceback: {traceback.format_exc()}")
return None
return wrapper
class RobustAnalyzer:
def __init__(self, bv):
self.bv = bv
self.errors = []
self.warnings = []
@safe_analysis
def analyze_with_validation(self):
"""Perform analysis with extensive validation"""
# Validate binary view
if not self.validate_binary_view():
return False
# Validate architecture support
if not self.validate_architecture():
return False
# Perform analysis with error tracking
try:
self.bv.update_analysis_and_wait()
# Validate analysis results
if not self.validate_analysis_results():
self.warnings.append("Analysis results may be incomplete")
return True
except Exception as e:
self.errors.append(f"Analysis failed: {str(e)}")
return False
def validate_binary_view(self):
"""Validate binary view is properly loaded"""
if not self.bv:
self.errors.append("Binary view is None")
return False
if self.bv.start >= self.bv.end:
self.errors.append("Invalid binary address range")
return False
if not self.bv.arch:
self.errors.append("No architecture detected")
return False
return True
def validate_architecture(self):
"""Validate architecture is supported"""
supported_archs = ['x86', 'x86_64', 'arm', 'aarch64', 'mips']
if self.bv.arch.name not in supported_archs:
self.warnings.append(f"Architecture {self.bv.arch.name} may have limited support")
return True
def validate_analysis_results(self):
"""Validate analysis produced reasonable results"""
if len(self.bv.functions) == 0:
self.warnings.append("No functions detected")
return False
if len(self.bv.strings) == 0:
self.warnings.append("No strings detected")
# Check for reasonable function count
binary_size = self.bv.end - self.bv.start
function_density = len(self.bv.functions) / (binary_size / 1024) # Functions per KB
if function_density < 0.1:
self.warnings.append("Low function density - binary may be packed")
elif function_density > 10:
self.warnings.append("High function density - may indicate analysis errors")
return True
def get_error_report(self):
"""Get comprehensive error report"""
return {
'errors': self.errors,
'warnings': self.warnings,
'error_count': len(self.errors),
'warning_count': len(self.warnings)
}
# Usage example
analyzer = RobustAnalyzer(bv)
success = analyzer.analyze_with_validation()
if not success:
error_report = analyzer.get_error_report()
print(f"Analysis failed with {error_report['error_count']} errors")
for error in error_report['errors']:
print(f" Error: {error}")
Ressources
Documentation et apprentissage
- [Documentation de la Benary Ninja] (LINK_16) - Documentation officielle
- [Binary Ninja API Référence] (LINK_16) - Documentation complète de l'API
- Bibliothèque Benary Ninja - Dernières mises à jour et tutoriels
- [Binary Ninja Community] (LINK_16) - Forum et discussions de l'utilisateur
Développement de plugins
- [Résistoire de lecture] (LINK_16) - Dépôt officiel de plugins
- [Guide de développement de la puce] (LINK_16) - Documentation sur le développement du plugin
- Exemples d'API - Exemples d'API officiels
- Modèle Plugin - Modèle de développement de plugin
Formation et certification
- [Formation du Ninja du Bénin] (LINK_16) - Cours de formation officiels
- [Tutoriels d'ingénierie inversés] (LINK_16) - Déobfuscation binaire avancée
- [Défis du FCT] (LINK_16) - Défis pratiques
- [Ressources universitaires] (LINK_16) - Matériel éducatif