Aller au contenu

Ninja binaire Feuille de chaleur

Copier toutes les commandes Générer PDF

Aperçu général

Binary Ninja est une plateforme moderne et légère d'ingénierie inverse qui fournit de puissantes capacités d'analyse grâce à son système de langage intermédiaire unique (IL). Développé par Vector 35, il offre une interface scriptable avec les API Python et C, ce qui le rend idéal pour l'analyse interactive et les flux de travail automatisés d'ingénierie inverse.

C'est-à-dire Key Strengths: représentation intermédiaire basée sur SSA, analyse en direct, API Python étendue, support multiplateforme et écosystème de plugin actif.

Installation et configuration

Installation de licence commerciale

# Download from official website
# Visit: https://binary.ninja/
# Choose Personal, Commercial, or Educational license

# Linux Installation:
wget https://cdn.binary.ninja/installers/BinaryNinja-personal.zip
unzip BinaryNinja-personal.zip
cd binaryninja
./binaryninja

# macOS Installation:
# Download .dmg file and drag to Applications folder

# Windows Installation:
# Download .exe installer and run with administrator privileges

Version Cloud/gratuite

# Binary Ninja Cloud (Free tier available)
# Access at: https://cloud.binary.ninja/
# Features: Limited analysis time, cloud-based processing
# Good for: Learning, small projects, evaluation

# Educational License:
# Free for students and educators
# Apply at: https://binary.ninja/educational/
```_

### Installation du plugin
```bash
# Plugin Manager (GUI):
# Tools -> Manage Plugins -> Browse Online

# Manual Plugin Installation:
# Linux/macOS: ~/.binaryninja/plugins/
# Windows: %APPDATA%\Binary Ninja\plugins\

# Popular Plugins:
# - Binja Toolkit: Enhanced UI and analysis features
# - Sourcery: Source code recovery and analysis
# - Binja Debugger: Integrated debugging capabilities
# - Kaitai Struct: Binary format parsing
```_

## Utilisation et navigation de base

### Aperçu de l'interface
```bash
# Main Components:
# 1. Disassembly View - Assembly code with analysis
# 2. Linear View - Raw disassembly without analysis
# 3. Graph View - Control flow graph visualization
# 4. Hex View - Raw binary data
# 5. Types View - Data type definitions
# 6. Strings View - String references
# 7. Log View - Analysis progress and messages

# View Switching:
# Tab key: Cycle through views
# Ctrl+1: Disassembly view
# Ctrl+2: Graph view
# Ctrl+3: Linear view
# Ctrl+4: Hex view

Commandes de navigation

# Basic Navigation:
# G: Go to address/function
# Space: Switch between graph and linear view
# Tab: Switch between disassembly views
# Esc: Go back in navigation history
# Ctrl+G: Go to address dialog

# Function Navigation:
# Ctrl+J: Jump to function
# Ctrl+Shift+J: Jump to function by name
# P: Go to previous function
# N: Go to next function
# Enter: Follow reference/call

# Cross-References:
# X: Show cross-references to current location
# Ctrl+X: Show cross-references from current location
# Shift+X: Show data cross-references

Contrôle d'analyse

# Analysis Management:
# Ctrl+A: Start/restart analysis
# Ctrl+Shift+A: Analysis options
# F5: Force function analysis
# U: Undefine function/data
# D: Define data at cursor

# Function Management:
# F: Create function at cursor
# Alt+F: Edit function properties
# Ctrl+F: Find text/bytes
# Y: Set function type

Système de langue intermédiaire (IL)

Aperçu des niveaux d'IL

# Binary Ninja's Multi-Level IL System:

# 1. Low Level IL (LLIL) - Architecture-specific
# 2. Medium Level IL (MLIL) - Architecture-independent
# 3. High Level IL (HLIL) - C-like representation
# 4. Static Single Assignment (SSA) forms for each level

# Accessing IL in Python API:
import binaryninja as bn

# Load binary
bv = bn.open_view("/path/to/binary")

# Get function
func = bv.get_function_at(0x401000)

# Access different IL levels
llil = func.llil
mlil = func.mlil
hlil = func.hlil

# SSA forms
llil_ssa = func.llil.ssa_form
mlil_ssa = func.mlil.ssa_form
hlil_ssa = func.hlil.ssa_form

Exemples d'analyses IL

# Analyze function control flow
def analyze_function_complexity(func):
    """Calculate cyclomatic complexity using MLIL"""

    mlil = func.mlil
    if not mlil:
        return 0

    # Count decision points
    decision_points = 0
    for block in mlil.basic_blocks:
        for instr in block:
            if instr.operation in [
                bn.MediumLevelILOperation.MLIL_IF,
                bn.MediumLevelILOperation.MLIL_WHILE,
                bn.MediumLevelILOperation.MLIL_FOR,
                bn.MediumLevelILOperation.MLIL_SWITCH
            ]:
                decision_points += 1

    # Cyclomatic complexity = edges - nodes + 2
    edges = sum(len(block.outgoing_edges) for block in mlil.basic_blocks)
    nodes = len(mlil.basic_blocks)
    complexity = edges - nodes + 2

    return complexity

# Find function calls in HLIL
def find_function_calls(func):
    """Extract all function calls from HLIL"""

    calls = []
    hlil = func.hlil

    if not hlil:
        return calls

    for block in hlil.basic_blocks:
        for instr in block:
            if instr.operation == bn.HighLevelILOperation.HLIL_CALL:
                dest = instr.dest
                if hasattr(dest, 'constant'):
                    # Direct call
                    target_addr = dest.constant
                    target_func = func.view.get_function_at(target_addr)
                    if target_func:
                        calls.append({
                            'address': instr.address,
                            'target': target_func.name,
                            'target_address': target_addr
                        })
                else:
                    # Indirect call
                    calls.append({
                        'address': instr.address,
                        'target': 'indirect',
                        'expression': str(dest)
                    })

    return calls

# Data flow analysis using SSA
def trace_variable_usage(func, var_name):
    """Trace usage of a variable through SSA form"""

    mlil_ssa = func.mlil.ssa_form
    if not mlil_ssa:
        return []

    usage_points = []

    for block in mlil_ssa.basic_blocks:
        for instr in block:
            # Check for variable definitions
            if hasattr(instr, 'dest') and str(instr.dest).startswith(var_name):
                usage_points.append({
                    'address': instr.address,
                    'type': 'definition',
                    'instruction': str(instr)
                })

            # Check for variable uses
            if hasattr(instr, 'src') and str(instr.src).find(var_name) != -1:
                usage_points.append({
                    'address': instr.address,
                    'type': 'use',
                    'instruction': str(instr)
                })

    return usage_points

API et scripts Python

API de base Utilisation

import binaryninja as bn
from binaryninja import log

# Open binary file
bv = bn.open_view("/path/to/binary")
if not bv:
    log.log_error("Failed to open binary")
    exit(1)

# Basic binary information
print(f"Architecture: {bv.arch.name}")
print(f"Platform: {bv.platform.name}")
print(f"Entry point: 0x{bv.entry_point:x}")
print(f"Start address: 0x{bv.start:x}")
print(f"End address: 0x{bv.end:x}")

# Get all functions
functions = bv.functions
print(f"Total functions: {len(functions)}")

# Iterate through functions
for func in functions:
    print(f"Function: {func.name} at 0x{func.start:x}")
    print(f"  Size: {len(func)} bytes")
    print(f"  Basic blocks: {len(func.basic_blocks)}")

Scénarios d'analyse avancée

# Comprehensive binary analysis script
class BinaryAnalyzer:
    def __init__(self, binary_path):
        self.bv = bn.open_view(binary_path)
        if not self.bv:
            raise ValueError(f"Cannot open binary: {binary_path}")

        # Wait for analysis to complete
        self.bv.update_analysis_and_wait()

    def analyze_strings(self):
        """Analyze string references and usage"""

        strings_analysis = {
            'total_strings': 0,
            'referenced_strings': 0,
            'unreferenced_strings': 0,
            'string_details': []
        }

        for string in self.bv.strings:
            string_info = {
                'address': string.start,
                'length': string.length,
                'value': string.value,
                'type': string.string_type.name,
                'references': []
            }

            # Find references to this string
            refs = self.bv.get_code_refs(string.start)
            for ref in refs:
                func = self.bv.get_function_at(ref.address)
                string_info['references'].append({
                    'address': ref.address,
                    'function': func.name if func else 'unknown'
                })

            strings_analysis['string_details'].append(string_info)
            strings_analysis['total_strings'] += 1

            if string_info['references']:
                strings_analysis['referenced_strings'] += 1
            else:
                strings_analysis['unreferenced_strings'] += 1

        return strings_analysis

    def find_crypto_constants(self):
        """Search for cryptographic constants"""

        crypto_constants = {
            'md5': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],
            'sha1': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0],
            'sha256': [0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a],
            'aes_sbox': [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5],
            'rc4_sbox': list(range(256))
        }

        found_constants = []

        for crypto_name, constants in crypto_constants.items():
            for constant in constants[:4]:  # Check first few constants
                # Search for 32-bit constant
                results = self.bv.find_all_constant(constant, 4)
                for addr in results:
                    found_constants.append({
                        'algorithm': crypto_name,
                        'constant': hex(constant),
                        'address': hex(addr),
                        'context': self.get_context_info(addr)
                    })

        return found_constants

    def analyze_imports(self):
        """Analyze imported functions and libraries"""

        imports_analysis = {
            'total_imports': 0,
            'libraries': {},
            'dangerous_functions': [],
            'network_functions': [],
            'crypto_functions': []
        }

        # Dangerous function patterns
        dangerous_patterns = [
            'strcpy', 'strcat', 'sprintf', 'gets', 'scanf',
            'system', 'exec', 'eval', 'shell'
        ]

        network_patterns = [
            'socket', 'connect', 'bind', 'listen', 'accept',
            'send', 'recv', 'WSA', 'inet_'
        ]

        crypto_patterns = [
            'crypt', 'hash', 'md5', 'sha', 'aes', 'des',
            'rsa', 'encrypt', 'decrypt', 'cipher'
        ]

        for symbol in self.bv.symbols:
            if symbol.type == bn.SymbolType.ImportedFunctionSymbol:
                func_name = symbol.name
                imports_analysis['total_imports'] += 1

                # Extract library name
                if '@' in func_name:
                    lib_name = func_name.split('@')[1]
                else:
                    lib_name = 'unknown'

                if lib_name not in imports_analysis['libraries']:
                    imports_analysis['libraries'][lib_name] = []

                imports_analysis['libraries'][lib_name].append(func_name)

                # Categorize functions
                func_lower = func_name.lower()

                if any(pattern in func_lower for pattern in dangerous_patterns):
                    imports_analysis['dangerous_functions'].append(func_name)

                if any(pattern in func_lower for pattern in network_patterns):
                    imports_analysis['network_functions'].append(func_name)

                if any(pattern in func_lower for pattern in crypto_patterns):
                    imports_analysis['crypto_functions'].append(func_name)

        return imports_analysis

    def detect_packing(self):
        """Detect potential packing or obfuscation"""

        packing_indicators = {
            'high_entropy_sections': [],
            'unusual_entry_point': False,
            'few_imports': False,
            'suspicious_sections': [],
            'packed_probability': 0.0
        }

        # Check entropy of sections
        for section in self.bv.sections:
            section_data = self.bv.read(section.start, section.length)
            if section_data:
                entropy = self.calculate_entropy(section_data)
                if entropy > 7.0:  # High entropy threshold
                    packing_indicators['high_entropy_sections'].append({
                        'name': section.name,
                        'entropy': entropy,
                        'address': hex(section.start),
                        'size': section.length
                    })

        # Check entry point location
        entry_point = self.bv.entry_point
        entry_section = self.bv.get_section_at(entry_point)
        if entry_section and entry_section.name not in ['.text', 'CODE']:
            packing_indicators['unusual_entry_point'] = True

        # Check import count
        import_count = len([s for s in self.bv.symbols 
                           if s.type == bn.SymbolType.ImportedFunctionSymbol])
        if import_count < 10:
            packing_indicators['few_imports'] = True

        # Calculate packing probability
        score = 0
        if packing_indicators['high_entropy_sections']:
            score += 0.4
        if packing_indicators['unusual_entry_point']:
            score += 0.3
        if packing_indicators['few_imports']:
            score += 0.3

        packing_indicators['packed_probability'] = score

        return packing_indicators

    def calculate_entropy(self, data):
        """Calculate Shannon entropy of data"""
        import math
        from collections import Counter

        if not data:
            return 0

        # Count byte frequencies
        byte_counts = Counter(data)
        data_len = len(data)

        # Calculate entropy
        entropy = 0
        for count in byte_counts.values():
            probability = count / data_len
            entropy -= probability * math.log2(probability)

        return entropy

    def get_context_info(self, address):
        """Get context information for an address"""

        func = self.bv.get_function_at(address)
        if func:
            return f"Function: {func.name}"

        section = self.bv.get_section_at(address)
        if section:
            return f"Section: {section.name}"

        return "Unknown context"

    def generate_report(self):
        """Generate comprehensive analysis report"""

        report = {
            'binary_info': {
                'architecture': self.bv.arch.name,
                'platform': self.bv.platform.name,
                'entry_point': hex(self.bv.entry_point),
                'file_size': self.bv.end - self.bv.start,
                'function_count': len(self.bv.functions)
            },
            'strings_analysis': self.analyze_strings(),
            'crypto_constants': self.find_crypto_constants(),
            'imports_analysis': self.analyze_imports(),
            'packing_detection': self.detect_packing()
        }

        return report

# Usage example
analyzer = BinaryAnalyzer("/path/to/binary")
report = analyzer.generate_report()

# Print summary
print("=== Binary Analysis Report ===")
print(f"Architecture: {report['binary_info']['architecture']}")
print(f"Functions: {report['binary_info']['function_count']}")
print(f"Strings: {report['strings_analysis']['total_strings']}")
print(f"Imports: {report['imports_analysis']['total_imports']}")
print(f"Packing probability: {report['packing_detection']['packed_probability']:.2f}")

Développement de plugins

# Basic Binary Ninja plugin structure
from binaryninja import *
import json

class CustomAnalysisPlugin:
    def __init__(self):
        self.name = "Custom Analysis Plugin"
        self.description = "Performs custom binary analysis"

    def analyze_function_calls(self, bv):
        """Analyze function call patterns"""

        call_graph = {}

        for func in bv.functions:
            call_graph[func.name] = {
                'address': hex(func.start),
                'calls_to': [],
                'called_by': []
            }

            # Find functions called by this function
            for block in func.basic_blocks:
                for instr in block:
                    if instr.operation == LowLevelILOperation.LLIL_CALL:
                        target = instr.dest
                        if hasattr(target, 'constant'):
                            target_func = bv.get_function_at(target.constant)
                            if target_func:
                                call_graph[func.name]['calls_to'].append(target_func.name)

                                # Add reverse reference
                                if target_func.name not in call_graph:
                                    call_graph[target_func.name] = {
                                        'address': hex(target_func.start),
                                        'calls_to': [],
                                        'called_by': []
                                    }
                                call_graph[target_func.name]['called_by'].append(func.name)

        return call_graph

    def find_vulnerabilities(self, bv):
        """Search for potential vulnerabilities"""

        vulnerabilities = []

        # Dangerous function calls
        dangerous_functions = {
            'strcpy': 'Buffer overflow risk',
            'strcat': 'Buffer overflow risk',
            'sprintf': 'Format string vulnerability',
            'gets': 'Buffer overflow risk',
            'system': 'Command injection risk'
        }

        for func in bv.functions:
            for block in func.basic_blocks:
                for instr in block:
                    if instr.operation == LowLevelILOperation.LLIL_CALL:
                        target = instr.dest
                        if hasattr(target, 'constant'):
                            target_func = bv.get_function_at(target.constant)
                            if target_func and target_func.name in dangerous_functions:
                                vulnerabilities.append({
                                    'type': 'dangerous_function',
                                    'function': target_func.name,
                                    'risk': dangerous_functions[target_func.name],
                                    'location': hex(instr.address),
                                    'caller': func.name
                                })

        return vulnerabilities

# Plugin registration
def register_plugin():
    plugin = CustomAnalysisPlugin()

    def run_analysis(bv):
        # Run custom analysis
        call_graph = plugin.analyze_function_calls(bv)
        vulnerabilities = plugin.find_vulnerabilities(bv)

        # Display results
        log.log_info(f"Found {len(call_graph)} functions in call graph")
        log.log_info(f"Found {len(vulnerabilities)} potential vulnerabilities")

        # Save results to file
        results = {
            'call_graph': call_graph,
            'vulnerabilities': vulnerabilities
        }

        with open('analysis_results.json', 'w') as f:
            json.dump(results, f, indent=2)

        log.log_info("Analysis results saved to analysis_results.json")

    # Register menu item
    PluginCommand.register(
        "Custom Analysis\\Run Analysis",
        "Run custom binary analysis",
        run_analysis
    )

# Call registration function
register_plugin()

Techniques d'analyse avancées

Analyse du débit de contrôle

# Advanced control flow analysis
def analyze_control_flow(func):
    """Comprehensive control flow analysis"""

    analysis = {
        'basic_blocks': len(func.basic_blocks),
        'edges': 0,
        'loops': [],
        'unreachable_blocks': [],
        'complexity_metrics': {}
    }

    # Count edges
    for block in func.basic_blocks:
        analysis['edges'] += len(block.outgoing_edges)

    # Detect loops using dominance analysis
    dominators = func.dominators
    for block in func.basic_blocks:
        for edge in block.outgoing_edges:
            target = edge.target
            # Back edge indicates loop
            if target in dominators[block]:
                analysis['loops'].append({
                    'header': hex(target.start),
                    'back_edge': hex(block.start)
                })

    # Find unreachable blocks
    reachable = set()

    def dfs_reachable(block):
        if block in reachable:
            return
        reachable.add(block)
        for edge in block.outgoing_edges:
            dfs_reachable(edge.target)

    # Start from entry block
    if func.basic_blocks:
        dfs_reachable(func.basic_blocks[0])

    for block in func.basic_blocks:
        if block not in reachable:
            analysis['unreachable_blocks'].append(hex(block.start))

    # Calculate complexity metrics
    nodes = analysis['basic_blocks']
    edges = analysis['edges']

    analysis['complexity_metrics'] = {
        'cyclomatic_complexity': edges - nodes + 2,
        'essential_complexity': len(analysis['loops']),
        'npath_complexity': calculate_npath_complexity(func)
    }

    return analysis

def calculate_npath_complexity(func):
    """Calculate nPath complexity"""

    complexity = 1

    for block in func.basic_blocks:
        # Count decision points
        if len(block.outgoing_edges) > 1:
            complexity *= len(block.outgoing_edges)

        # Account for loops
        for edge in block.outgoing_edges:
            if edge.target.start <= block.start:  # Back edge
                complexity *= 2

    return complexity

Analyse du flux de données

# Advanced data flow analysis
class DataFlowAnalyzer:
    def __init__(self, func):
        self.func = func
        self.mlil = func.mlil
        self.ssa = func.mlil.ssa_form if func.mlil else None

    def analyze_variable_definitions(self):
        """Track variable definitions and uses"""

        if not self.ssa:
            return {}

        definitions = {}
        uses = {}

        for block in self.ssa.basic_blocks:
            for instr in block:
                # Track definitions
                if hasattr(instr, 'dest') and instr.dest:
                    var_name = str(instr.dest)
                    if var_name not in definitions:
                        definitions[var_name] = []

                    definitions[var_name].append({
                        'address': instr.address,
                        'instruction': str(instr),
                        'block': block.index
                    })

                # Track uses
                for operand in instr.operands:
                    if hasattr(operand, 'src') and operand.src:
                        var_name = str(operand.src)
                        if var_name not in uses:
                            uses[var_name] = []

                        uses[var_name].append({
                            'address': instr.address,
                            'instruction': str(instr),
                            'block': block.index
                        })

        return {'definitions': definitions, 'uses': uses}

    def find_uninitialized_variables(self):
        """Find potentially uninitialized variables"""

        analysis = self.analyze_variable_definitions()
        definitions = analysis['definitions']
        uses = analysis['uses']

        uninitialized = []

        for var_name, use_list in uses.items():
            if var_name not in definitions:
                # Variable used but never defined in this function
                uninitialized.append({
                    'variable': var_name,
                    'first_use': use_list[0],
                    'all_uses': use_list
                })

        return uninitialized

    def trace_taint_propagation(self, source_vars):
        """Trace taint propagation from source variables"""

        if not self.ssa:
            return []

        tainted = set(source_vars)
        taint_flow = []

        for block in self.ssa.basic_blocks:
            for instr in block:
                # Check if instruction uses tainted data
                uses_tainted = False
                for operand in instr.operands:
                    if hasattr(operand, 'src') and str(operand.src) in tainted:
                        uses_tainted = True
                        break

                if uses_tainted:
                    # Mark destination as tainted
                    if hasattr(instr, 'dest') and instr.dest:
                        dest_var = str(instr.dest)
                        tainted.add(dest_var)

                        taint_flow.append({
                            'address': instr.address,
                            'instruction': str(instr),
                            'tainted_dest': dest_var,
                            'operation': instr.operation.name
                        })

        return taint_flow

# Usage example
analyzer = DataFlowAnalyzer(func)
var_analysis = analyzer.analyze_variable_definitions()
uninitialized = analyzer.find_uninitialized_variables()
taint_flow = analyzer.trace_taint_propagation(['user_input', 'argv'])

Analyse cryptographique

# Cryptographic algorithm detection
class CryptoAnalyzer:
    def __init__(self, bv):
        self.bv = bv
        self.crypto_signatures = self.load_crypto_signatures()

    def load_crypto_signatures(self):
        """Load cryptographic algorithm signatures"""

        return {
            'aes': {
                'sbox': [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5],
                'rcon': [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80],
                'constants': [0x63636363, 0x7c7c7c7c]
            },
            'des': {
                'initial_permutation': [58, 50, 42, 34, 26, 18, 10, 2],
                'sboxes': [
                    [14, 4, 13, 1, 2, 15, 11, 8, 3, 10, 6, 12, 5, 9, 0, 7],
                    [0, 15, 7, 4, 14, 2, 13, 1, 10, 6, 12, 11, 9, 5, 3, 8]
                ]
            },
            'md5': {
                'constants': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],
                'round_constants': [0xd76aa478, 0xe8c7b756, 0x242070db, 0xc1bdceee]
            },
            'sha1': {
                'constants': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0],
                'round_constants': [0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xca62c1d6]
            },
            'rsa': {
                'common_exponents': [3, 17, 65537],
                'key_sizes': [1024, 2048, 3072, 4096]
            }
        }

    def detect_crypto_algorithms(self):
        """Detect cryptographic algorithms in binary"""

        detected = []

        for algo_name, signatures in self.crypto_signatures.items():
            matches = self.search_algorithm_signatures(algo_name, signatures)
            if matches:
                detected.extend(matches)

        return detected

    def search_algorithm_signatures(self, algo_name, signatures):
        """Search for specific algorithm signatures"""

        matches = []

        # Search for constants
        if 'constants' in signatures:
            for constant in signatures['constants']:
                addresses = self.bv.find_all_constant(constant, 4)
                for addr in addresses:
                    matches.append({
                        'algorithm': algo_name,
                        'type': 'constant',
                        'value': hex(constant),
                        'address': hex(addr),
                        'confidence': 0.7
                    })

        # Search for S-boxes
        if 'sbox' in signatures:
            sbox_matches = self.search_sbox_pattern(signatures['sbox'])
            for match in sbox_matches:
                matches.append({
                    'algorithm': algo_name,
                    'type': 'sbox',
                    'address': hex(match),
                    'confidence': 0.9
                })

        # Search for permutation tables
        if 'initial_permutation' in signatures:
            perm_matches = self.search_permutation_table(signatures['initial_permutation'])
            for match in perm_matches:
                matches.append({
                    'algorithm': algo_name,
                    'type': 'permutation_table',
                    'address': hex(match),
                    'confidence': 0.8
                })

        return matches

    def search_sbox_pattern(self, sbox):
        """Search for S-box patterns in binary"""

        matches = []
        sbox_bytes = bytes(sbox)

        # Search for exact S-box pattern
        current_addr = self.bv.start
        while current_addr < self.bv.end:
            data = self.bv.read(current_addr, len(sbox_bytes))
            if data == sbox_bytes:
                matches.append(current_addr)
                current_addr += len(sbox_bytes)
            else:
                current_addr += 1

        return matches

    def search_permutation_table(self, perm_table):
        """Search for permutation table patterns"""

        matches = []

        # Convert to bytes and search
        if all(x < 256 for x in perm_table):
            perm_bytes = bytes(perm_table)
            current_addr = self.bv.start

            while current_addr < self.bv.end:
                data = self.bv.read(current_addr, len(perm_bytes))
                if data == perm_bytes:
                    matches.append(current_addr)
                    current_addr += len(perm_bytes)
                else:
                    current_addr += 1

        return matches

    def analyze_crypto_functions(self):
        """Analyze functions that might implement crypto"""

        crypto_functions = []

        for func in self.bv.functions:
            score = self.calculate_crypto_score(func)
            if score > 0.5:
                crypto_functions.append({
                    'function': func.name,
                    'address': hex(func.start),
                    'score': score,
                    'indicators': self.get_crypto_indicators(func)
                })

        return crypto_functions

    def calculate_crypto_score(self, func):
        """Calculate likelihood that function implements crypto"""

        score = 0.0

        # Check for bit manipulation operations
        bit_ops = 0
        arithmetic_ops = 0

        if func.mlil:
            for block in func.mlil.basic_blocks:
                for instr in block:
                    op = instr.operation

                    # Bit operations
                    if op in [bn.MediumLevelILOperation.MLIL_XOR,
                             bn.MediumLevelILOperation.MLIL_AND,
                             bn.MediumLevelILOperation.MLIL_OR,
                             bn.MediumLevelILOperation.MLIL_LSL,
                             bn.MediumLevelILOperation.MLIL_LSR,
                             bn.MediumLevelILOperation.MLIL_ROR,
                             bn.MediumLevelILOperation.MLIL_ROL]:
                        bit_ops += 1

                    # Arithmetic operations
                    elif op in [bn.MediumLevelILOperation.MLIL_ADD,
                               bn.MediumLevelILOperation.MLIL_SUB,
                               bn.MediumLevelILOperation.MLIL_MUL]:
                        arithmetic_ops += 1

        # High ratio of bit operations suggests crypto
        total_ops = bit_ops + arithmetic_ops
        if total_ops > 0:
            bit_ratio = bit_ops / total_ops
            if bit_ratio > 0.3:
                score += 0.4

        # Check for loops (common in crypto)
        if len(func.basic_blocks) > 5:
            score += 0.2

        # Check for constants
        constants_found = 0
        for algo_name, signatures in self.crypto_signatures.items():
            if 'constants' in signatures:
                for constant in signatures['constants']:
                    if self.function_contains_constant(func, constant):
                        constants_found += 1

        if constants_found > 0:
            score += 0.4

        return min(score, 1.0)

    def function_contains_constant(self, func, constant):
        """Check if function contains specific constant"""

        for addr in range(func.start, func.start + len(func)):
            try:
                value = self.bv.read_int(addr, 4, False)  # Little endian
                if value == constant:
                    return True
                value = self.bv.read_int(addr, 4, True)   # Big endian
                if value == constant:
                    return True
            except:
                continue

        return False

    def get_crypto_indicators(self, func):
        """Get specific crypto indicators for function"""

        indicators = []

        # Check for specific patterns
        if self.has_substitution_pattern(func):
            indicators.append('substitution_operations')

        if self.has_permutation_pattern(func):
            indicators.append('permutation_operations')

        if self.has_key_schedule_pattern(func):
            indicators.append('key_schedule_operations')

        return indicators

    def has_substitution_pattern(self, func):
        """Check for substitution box patterns"""
        # Look for array indexing patterns common in S-boxes
        return False  # Simplified for example

    def has_permutation_pattern(self, func):
        """Check for permutation patterns"""
        # Look for bit manipulation patterns
        return False  # Simplified for example

    def has_key_schedule_pattern(self, func):
        """Check for key schedule patterns"""
        # Look for iterative key expansion patterns
        return False  # Simplified for example

# Usage example
crypto_analyzer = CryptoAnalyzer(bv)
detected_algos = crypto_analyzer.detect_crypto_algorithms()
crypto_functions = crypto_analyzer.analyze_crypto_functions()

print(f"Detected {len(detected_algos)} crypto algorithm signatures")
print(f"Found {len(crypto_functions)} potential crypto functions")

Débogue et analyse dynamique

Intégration du débogueur

# Binary Ninja debugger integration
import binaryninja as bn
from binaryninja.debugger import DebuggerController

class DebuggerHelper:
    def __init__(self, bv):
        self.bv = bv
        self.debugger = None
        self.breakpoints = {}
        self.watchpoints = {}

    def start_debugging(self, target_path, args=None):
        """Start debugging session"""

        try:
            # Create debugger controller
            self.debugger = DebuggerController(self.bv)

            # Set target
            self.debugger.set_target(target_path, args or [])

            # Launch process
            self.debugger.launch()

            log.log_info(f"Started debugging: {target_path}")
            return True

        except Exception as e:
            log.log_error(f"Failed to start debugger: {e}")
            return False

    def set_breakpoint(self, address, condition=None):
        """Set breakpoint at address"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return False

        try:
            bp_id = self.debugger.add_breakpoint(address)

            self.breakpoints[address] = {
                'id': bp_id,
                'condition': condition,
                'hit_count': 0
            }

            log.log_info(f"Breakpoint set at 0x{address:x}")
            return True

        except Exception as e:
            log.log_error(f"Failed to set breakpoint: {e}")
            return False

    def set_watchpoint(self, address, size, access_type='rw'):
        """Set memory watchpoint"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return False

        try:
            wp_id = self.debugger.add_watchpoint(address, size, access_type)

            self.watchpoints[address] = {
                'id': wp_id,
                'size': size,
                'access_type': access_type,
                'hit_count': 0
            }

            log.log_info(f"Watchpoint set at 0x{address:x} ({size} bytes, {access_type})")
            return True

        except Exception as e:
            log.log_error(f"Failed to set watchpoint: {e}")
            return False

    def trace_execution(self, start_addr, end_addr, max_instructions=10000):
        """Trace execution between two addresses"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return []

        trace = []
        instruction_count = 0

        # Set breakpoint at start
        self.set_breakpoint(start_addr)

        # Continue until start address
        self.debugger.continue_execution()

        # Single step and record
        while instruction_count < max_instructions:
            current_addr = self.debugger.get_current_address()

            if current_addr == end_addr:
                break

            # Record instruction
            instr_text = self.bv.get_disassembly(current_addr)
            registers = self.debugger.get_registers()

            trace.append({
                'address': current_addr,
                'instruction': instr_text,
                'registers': registers.copy(),
                'step': instruction_count
            })

            # Single step
            self.debugger.step_into()
            instruction_count += 1

        return trace

    def analyze_memory_access(self, address, size=0x1000):
        """Analyze memory access patterns"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return None

        # Set watchpoint on memory region
        self.set_watchpoint(address, size, 'rw')

        access_log = []

        # Continue execution and log accesses
        while True:
            try:
                self.debugger.continue_execution()

                # Check if watchpoint hit
                if self.debugger.is_watchpoint_hit():
                    hit_addr = self.debugger.get_watchpoint_address()
                    access_type = self.debugger.get_access_type()
                    current_pc = self.debugger.get_current_address()

                    access_log.append({
                        'pc': current_pc,
                        'memory_address': hit_addr,
                        'access_type': access_type,
                        'timestamp': self.debugger.get_timestamp()
                    })

                    # Continue after hit
                    self.debugger.continue_execution()

            except KeyboardInterrupt:
                break
            except Exception as e:
                log.log_error(f"Error during memory analysis: {e}")
                break

        return access_log

# Usage example
debugger_helper = DebuggerHelper(bv)

# Start debugging
if debugger_helper.start_debugging("/path/to/target"):
    # Set breakpoints
    debugger_helper.set_breakpoint(0x401000)
    debugger_helper.set_breakpoint(0x401500)

    # Trace execution
    trace = debugger_helper.trace_execution(0x401000, 0x401500)

    # Analyze memory access
    memory_access = debugger_helper.analyze_memory_access(0x402000, 0x1000)

Analyse de la couverture du code

# Code coverage analysis with Binary Ninja
class CoverageAnalyzer:
    def __init__(self, bv):
        self.bv = bv
        self.coverage_data = {}
        self.basic_block_hits = {}
        self.function_coverage = {}

    def initialize_coverage(self):
        """Initialize coverage tracking for all functions"""

        for func in self.bv.functions:
            self.function_coverage[func.start] = {
                'name': func.name,
                'total_blocks': len(func.basic_blocks),
                'hit_blocks': set(),
                'coverage_percentage': 0.0
            }

            for block in func.basic_blocks:
                self.basic_block_hits[block.start] = {
                    'function': func.name,
                    'hit_count': 0,
                    'first_hit': None,
                    'last_hit': None
                }

    def record_execution(self, address):
        """Record execution of an address"""

        # Find which basic block this address belongs to
        func = self.bv.get_function_at(address)
        if not func:
            return

        for block in func.basic_blocks:
            if block.start <= address < block.end:
                # Record block hit
                if block.start in self.basic_block_hits:
                    self.basic_block_hits[block.start]['hit_count'] += 1

                    if not self.basic_block_hits[block.start]['first_hit']:
                        self.basic_block_hits[block.start]['first_hit'] = address

                    self.basic_block_hits[block.start]['last_hit'] = address

                    # Update function coverage
                    if func.start in self.function_coverage:
                        self.function_coverage[func.start]['hit_blocks'].add(block.start)

                        hit_count = len(self.function_coverage[func.start]['hit_blocks'])
                        total_count = self.function_coverage[func.start]['total_blocks']

                        self.function_coverage[func.start]['coverage_percentage'] = \
                            (hit_count / total_count) * 100.0

                break

    def import_coverage_data(self, coverage_file):
        """Import coverage data from external tool (e.g., DynamoRIO, Intel PIN)"""

        try:
            with open(coverage_file, 'r') as f:
                for line in f:
                    line = line.strip()
                    if line.startswith('0x'):
                        address = int(line, 16)
                        self.record_execution(address)

            log.log_info(f"Imported coverage data from {coverage_file}")

        except Exception as e:
            log.log_error(f"Failed to import coverage data: {e}")

    def generate_coverage_report(self):
        """Generate comprehensive coverage report"""

        report = {
            'summary': {
                'total_functions': len(self.function_coverage),
                'covered_functions': 0,
                'total_basic_blocks': len(self.basic_block_hits),
                'covered_basic_blocks': 0,
                'overall_coverage': 0.0
            },
            'function_coverage': [],
            'uncovered_functions': [],
            'hot_spots': []
        }

        covered_blocks = 0

        for func_addr, coverage in self.function_coverage.items():
            if coverage['coverage_percentage'] > 0:
                report['summary']['covered_functions'] += 1
                report['function_coverage'].append({
                    'name': coverage['name'],
                    'address': hex(func_addr),
                    'coverage': coverage['coverage_percentage'],
                    'hit_blocks': len(coverage['hit_blocks']),
                    'total_blocks': coverage['total_blocks']
                })
            else:
                report['uncovered_functions'].append({
                    'name': coverage['name'],
                    'address': hex(func_addr)
                })

            covered_blocks += len(coverage['hit_blocks'])

        # Calculate overall coverage
        if report['summary']['total_basic_blocks'] > 0:
            report['summary']['overall_coverage'] = \
                (covered_blocks / report['summary']['total_basic_blocks']) * 100.0

        report['summary']['covered_basic_blocks'] = covered_blocks

        # Find hot spots (frequently executed blocks)
        hot_spots = []
        for block_addr, hit_data in self.basic_block_hits.items():
            if hit_data['hit_count'] > 100:  # Threshold for hot spot
                hot_spots.append({
                    'address': hex(block_addr),
                    'function': hit_data['function'],
                    'hit_count': hit_data['hit_count']
                })

        # Sort by hit count
        hot_spots.sort(key=lambda x: x['hit_count'], reverse=True)
        report['hot_spots'] = hot_spots[:20]  # Top 20 hot spots

        return report

    def visualize_coverage(self, func_name):
        """Create coverage visualization for a function"""

        func = None
        for f in self.bv.functions:
            if f.name == func_name:
                func = f
                break

        if not func:
            log.log_error(f"Function {func_name} not found")
            return

        # Create coverage map
        coverage_map = {}

        for block in func.basic_blocks:
            is_covered = block.start in self.basic_block_hits and \
                        self.basic_block_hits[block.start]['hit_count'] > 0

            hit_count = self.basic_block_hits.get(block.start, {}).get('hit_count', 0)

            coverage_map[block.start] = {
                'covered': is_covered,
                'hit_count': hit_count,
                'start': block.start,
                'end': block.end,
                'size': len(block)
            }

        # Apply coverage highlighting in Binary Ninja
        for block_addr, coverage in coverage_map.items():
            if coverage['covered']:
                # Highlight covered blocks in green
                self.bv.set_comment_at(block_addr, f"Coverage: {coverage['hit_count']} hits")
            else:
                # Highlight uncovered blocks in red
                self.bv.set_comment_at(block_addr, "UNCOVERED")

        log.log_info(f"Applied coverage visualization for {func_name}")

# Usage example
coverage_analyzer = CoverageAnalyzer(bv)
coverage_analyzer.initialize_coverage()

# Import coverage data from external tool
coverage_analyzer.import_coverage_data("coverage_trace.txt")

# Generate report
report = coverage_analyzer.generate_coverage_report()
print(f"Overall coverage: {report['summary']['overall_coverage']:.2f}%")
print(f"Covered functions: {report['summary']['covered_functions']}/{report['summary']['total_functions']}")

# Visualize coverage for specific function
coverage_analyzer.visualize_coverage("main")

Intégration et automatisation

Intégration CI/CD

# Binary Ninja in CI/CD pipelines

# GitHub Actions workflow example
name: Binary Analysis
on: [push, pull_request]

jobs:
  binary-analysis:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2

    - name: Setup Binary Ninja
      run: |
        # Download and install Binary Ninja headless
        wget ${{ secrets.BINJA_DOWNLOAD_URL }}
        unzip binaryninja-headless.zip
        export PATH=$PATH:$(pwd)/binaryninja

    - name: Run Binary Analysis
      run: |
        python3 analyze_binary.py --binary target_binary --output analysis_report.json

    - name: Upload Analysis Results
      uses: actions/upload-artifact@v2
      with:
        name: analysis-results
        path: analysis_report.json

# Jenkins pipeline example
pipeline {
    agent any

    stages {
        stage('Binary Analysis') {
            steps {
                script {
                    // Run Binary Ninja analysis
                    sh '''
                        python3 -c "
                        import binaryninja as bn
                        import json

                        bv = bn.open_view('${BINARY_PATH}')
                        bv.update_analysis_and_wait()

                        # Perform analysis
                        results = {
                            'functions': len(bv.functions),
                            'strings': len(bv.strings),
                            'imports': len([s for s in bv.symbols if s.type == bn.SymbolType.ImportedFunctionSymbol])
                        }

                        with open('analysis_results.json', 'w') as f:
                            json.dump(results, f)
                        "
                    '''
                }
            }
        }

        stage('Security Analysis') {
            steps {
                script {
                    // Run security-focused analysis
                    sh 'python3 security_analysis.py --input analysis_results.json'
                }
            }
        }
    }

    post {
        always {
            archiveArtifacts artifacts: '*.json', fingerprint: true
        }
    }
}

Traitement par lots

# Batch processing multiple binaries
import os
import json
import multiprocessing
from pathlib import Path
import binaryninja as bn

class BatchAnalyzer:
    def __init__(self, output_dir="batch_results"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def analyze_single_binary(self, binary_path):
        """Analyze a single binary file"""

        try:
            # Open binary
            bv = bn.open_view(str(binary_path))
            if not bv:
                return {'error': f'Failed to open {binary_path}'}

            # Wait for analysis
            bv.update_analysis_and_wait()

            # Collect analysis results
            results = {
                'file_path': str(binary_path),
                'file_size': binary_path.stat().st_size,
                'architecture': bv.arch.name,
                'platform': bv.platform.name,
                'entry_point': hex(bv.entry_point),
                'functions': {
                    'total': len(bv.functions),
                    'named': len([f for f in bv.functions if not f.name.startswith('sub_')]),
                    'library': len([f for f in bv.functions if f.symbol and f.symbol.type == bn.SymbolType.LibraryFunctionSymbol])
                },
                'strings': {
                    'total': len(bv.strings),
                    'ascii': len([s for s in bv.strings if s.string_type == bn.StringType.AsciiString]),
                    'unicode': len([s for s in bv.strings if s.string_type == bn.StringType.Utf16String])
                },
                'imports': len([s for s in bv.symbols if s.type == bn.SymbolType.ImportedFunctionSymbol]),
                'exports': len([s for s in bv.symbols if s.type == bn.SymbolType.FunctionSymbol and s.binding == bn.SymbolBinding.GlobalBinding]),
                'sections': [
                    {
                        'name': section.name,
                        'start': hex(section.start),
                        'length': section.length,
                        'semantics': section.semantics.name
                    }
                    for section in bv.sections
                ]
            }

            # Perform additional analysis
            results['security_analysis'] = self.perform_security_analysis(bv)
            results['complexity_analysis'] = self.perform_complexity_analysis(bv)

            # Close binary view
            bv.file.close()

            return results

        except Exception as e:
            return {'error': f'Analysis failed for {binary_path}: {str(e)}'}

    def perform_security_analysis(self, bv):
        """Perform security-focused analysis"""

        security_results = {
            'dangerous_functions': [],
            'crypto_indicators': [],
            'packing_indicators': {},
            'stack_strings': []
        }

        # Check for dangerous functions
        dangerous_functions = [
            'strcpy', 'strcat', 'sprintf', 'gets', 'scanf',
            'system', 'exec', 'eval', 'shell'
        ]

        for func in bv.functions:
            if func.name.lower() in [df.lower() for df in dangerous_functions]:
                security_results['dangerous_functions'].append({
                    'name': func.name,
                    'address': hex(func.start)
                })

        # Check for crypto constants
        crypto_constants = [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476]  # MD5/SHA1

        for constant in crypto_constants:
            addresses = bv.find_all_constant(constant, 4)
            if addresses:
                security_results['crypto_indicators'].append({
                    'constant': hex(constant),
                    'addresses': [hex(addr) for addr in addresses]
                })

        return security_results

    def perform_complexity_analysis(self, bv):
        """Perform complexity analysis"""

        complexity_results = {
            'total_complexity': 0,
            'average_complexity': 0.0,
            'complex_functions': []
        }

        complexities = []

        for func in bv.functions:
            # Calculate cyclomatic complexity
            edges = sum(len(block.outgoing_edges) for block in func.basic_blocks)
            nodes = len(func.basic_blocks)
            complexity = edges - nodes + 2 if nodes > 0 else 0

            complexities.append(complexity)

            if complexity > 10:  # High complexity threshold
                complexity_results['complex_functions'].append({
                    'name': func.name,
                    'address': hex(func.start),
                    'complexity': complexity
                })

        if complexities:
            complexity_results['total_complexity'] = sum(complexities)
            complexity_results['average_complexity'] = sum(complexities) / len(complexities)

        return complexity_results

    def analyze_directory(self, directory_path, file_patterns=None, max_workers=4):
        """Analyze all binaries in a directory"""

        if file_patterns is None:
            file_patterns = ['*.exe', '*.dll', '*.so', '*.dylib', '*']

        # Find all binary files
        binary_files = []
        directory = Path(directory_path)

        for pattern in file_patterns:
            binary_files.extend(directory.glob(pattern))

        # Filter out non-files and duplicates
        binary_files = list(set([f for f in binary_files if f.is_file()]))

        print(f"Found {len(binary_files)} files to analyze")

        # Analyze files in parallel
        with multiprocessing.Pool(max_workers) as pool:
            results = pool.map(self.analyze_single_binary, binary_files)

        # Save results
        batch_results = {
            'summary': {
                'total_files': len(binary_files),
                'successful_analyses': len([r for r in results if 'error' not in r]),
                'failed_analyses': len([r for r in results if 'error' in r])
            },
            'results': results
        }

        output_file = self.output_dir / 'batch_analysis_results.json'
        with open(output_file, 'w') as f:
            json.dump(batch_results, f, indent=2)

        print(f"Batch analysis complete. Results saved to {output_file}")
        return batch_results

    def generate_summary_report(self, results):
        """Generate summary report from batch results"""

        if isinstance(results, (str, Path)):
            # Load results from file
            with open(results, 'r') as f:
                results = json.load(f)

        successful_results = [r for r in results['results'] if 'error' not in r]

        # Architecture distribution
        arch_dist = {}
        for result in successful_results:
            arch = result.get('architecture', 'unknown')
            arch_dist[arch] = arch_dist.get(arch, 0) + 1

        # Platform distribution
        platform_dist = {}
        for result in successful_results:
            platform = result.get('platform', 'unknown')
            platform_dist[platform] = platform_dist.get(platform, 0) + 1

        # Security summary
        total_dangerous_functions = sum(
            len(result.get('security_analysis', {}).get('dangerous_functions', []))
            for result in successful_results
        )

        files_with_crypto = len([
            result for result in successful_results
            if result.get('security_analysis', {}).get('crypto_indicators')
        ])

        # Complexity summary
        avg_complexity = sum(
            result.get('complexity_analysis', {}).get('average_complexity', 0)
            for result in successful_results
        ) / len(successful_results) if successful_results else 0

        summary = {
            'analysis_summary': results['summary'],
            'architecture_distribution': arch_dist,
            'platform_distribution': platform_dist,
            'security_summary': {
                'total_dangerous_functions': total_dangerous_functions,
                'files_with_crypto_indicators': files_with_crypto,
                'percentage_with_crypto': (files_with_crypto / len(successful_results)) * 100 if successful_results else 0
            },
            'complexity_summary': {
                'average_complexity': avg_complexity,
                'high_complexity_files': len([
                    result for result in successful_results
                    if result.get('complexity_analysis', {}).get('average_complexity', 0) > 10
                ])
            }
        }

        # Save summary report
        summary_file = self.output_dir / 'summary_report.json'
        with open(summary_file, 'w') as f:
            json.dump(summary, f, indent=2)

        print(f"Summary report saved to {summary_file}")
        return summary

# Usage example
batch_analyzer = BatchAnalyzer("analysis_results")

# Analyze all files in a directory
results = batch_analyzer.analyze_directory(
    "/path/to/binaries",
    file_patterns=['*.exe', '*.dll'],
    max_workers=8
)

# Generate summary report
summary = batch_analyzer.generate_summary_report(results)

print("Batch Analysis Summary:")
print(f"- Total files: {summary['analysis_summary']['total_files']}")
print(f"- Successful analyses: {summary['analysis_summary']['successful_analyses']}")
print(f"- Average complexity: {summary['complexity_summary']['average_complexity']:.2f}")
print(f"- Files with crypto indicators: {summary['security_summary']['files_with_crypto_indicators']}")

Pratiques exemplaires et conseils

Optimisation des performances

# Performance optimization techniques
class PerformanceOptimizer:
    def __init__(self, bv):
        self.bv = bv

    def optimize_analysis_settings(self):
        """Optimize Binary Ninja analysis settings for performance"""

        # Get analysis settings
        settings = self.bv.analysis_settings

        # Disable expensive analysis for large binaries
        if self.bv.end - self.bv.start > 50 * 1024 * 1024:  # 50MB threshold
            settings.set_bool('analysis.linearSweep.autorun', False)
            settings.set_bool('analysis.signatureMatcher.autorun', False)
            settings.set_int('analysis.limits.maxFunctionSize', 100000)

        # Optimize for specific architectures
        if self.bv.arch.name in ['x86', 'x86_64']:
            settings.set_bool('analysis.x86.disassembly.simplifyFPUInstructions', True)
            settings.set_bool('analysis.x86.disassembly.simplifySSEInstructions', True)

        # Set reasonable limits
        settings.set_int('analysis.limits.maxFunctionAnalysisTime', 300)  # 5 minutes
        settings.set_int('analysis.limits.maxBasicBlockAnalysisTime', 60)  # 1 minute

        log.log_info("Analysis settings optimized for performance")

    def use_parallel_analysis(self, function_list=None):
        """Use parallel processing for function analysis"""

        import concurrent.futures

        functions = function_list or self.bv.functions

        def analyze_function(func):
            """Analyze a single function"""
            try:
                # Force function analysis
                func.reanalyze()
                return {
                    'name': func.name,
                    'address': func.start,
                    'status': 'success'
                }
            except Exception as e:
                return {
                    'name': func.name,
                    'address': func.start,
                    'status': 'error',
                    'error': str(e)
                }

        # Analyze functions in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(analyze_function, functions))

        successful = len([r for r in results if r['status'] == 'success'])
        failed = len([r for r in results if r['status'] == 'error'])

        log.log_info(f"Parallel analysis complete: {successful} successful, {failed} failed")
        return results

    def cache_analysis_results(self, cache_file="analysis_cache.json"):
        """Cache analysis results for faster subsequent loads"""

        cache_data = {
            'binary_hash': self.calculate_binary_hash(),
            'analysis_timestamp': time.time(),
            'functions': [],
            'strings': [],
            'imports': []
        }

        # Cache function information
        for func in self.bv.functions:
            cache_data['functions'].append({
                'name': func.name,
                'start': func.start,
                'end': func.start + len(func),
                'basic_blocks': len(func.basic_blocks),
                'complexity': self.calculate_function_complexity(func)
            })

        # Cache strings
        for string in self.bv.strings:
            cache_data['strings'].append({
                'address': string.start,
                'value': string.value,
                'type': string.string_type.name
            })

        # Cache imports
        for symbol in self.bv.symbols:
            if symbol.type == bn.SymbolType.ImportedFunctionSymbol:
                cache_data['imports'].append({
                    'name': symbol.name,
                    'address': symbol.address
                })

        # Save cache
        with open(cache_file, 'w') as f:
            json.dump(cache_data, f, indent=2)

        log.log_info(f"Analysis results cached to {cache_file}")

    def calculate_binary_hash(self):
        """Calculate hash of binary for cache validation"""
        import hashlib

        hasher = hashlib.sha256()

        # Read binary in chunks
        chunk_size = 64 * 1024
        current_addr = self.bv.start

        while current_addr < self.bv.end:
            chunk_size = min(chunk_size, self.bv.end - current_addr)
            data = self.bv.read(current_addr, chunk_size)
            if data:
                hasher.update(data)
            current_addr += chunk_size

        return hasher.hexdigest()

# Memory management for large binaries
class MemoryManager:
    def __init__(self, bv):
        self.bv = bv
        self.cached_data = {}
        self.cache_limit = 100  # Maximum cached items

    def get_function_data(self, func_addr):
        """Get function data with caching"""

        if func_addr in self.cached_data:
            return self.cached_data[func_addr]

        func = self.bv.get_function_at(func_addr)
        if not func:
            return None

        # Extract function data
        func_data = {
            'name': func.name,
            'basic_blocks': len(func.basic_blocks),
            'instructions': [],
            'calls': []
        }

        # Get instructions (limit to avoid memory issues)
        for block in func.basic_blocks[:10]:  # Limit blocks
            for instr in block[:50]:  # Limit instructions per block
                func_data['instructions'].append({
                    'address': instr.address,
                    'text': str(instr)
                })

        # Cache data
        if len(self.cached_data) >= self.cache_limit:
            # Remove oldest entry
            oldest_key = next(iter(self.cached_data))
            del self.cached_data[oldest_key]

        self.cached_data[func_addr] = func_data
        return func_data

    def clear_cache(self):
        """Clear cached data to free memory"""
        self.cached_data.clear()
        log.log_info("Memory cache cleared")

Gestion des erreurs et débogage

# Robust error handling for Binary Ninja scripts
import logging
import traceback
from functools import wraps

def safe_analysis(func):
    """Decorator for safe analysis functions"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            log.log_error(f"Error in {func.__name__}: {str(e)}")
            log.log_error(f"Traceback: {traceback.format_exc()}")
            return None

    return wrapper

class RobustAnalyzer:
    def __init__(self, bv):
        self.bv = bv
        self.errors = []
        self.warnings = []

    @safe_analysis
    def analyze_with_validation(self):
        """Perform analysis with extensive validation"""

        # Validate binary view
        if not self.validate_binary_view():
            return False

        # Validate architecture support
        if not self.validate_architecture():
            return False

        # Perform analysis with error tracking
        try:
            self.bv.update_analysis_and_wait()

            # Validate analysis results
            if not self.validate_analysis_results():
                self.warnings.append("Analysis results may be incomplete")

            return True

        except Exception as e:
            self.errors.append(f"Analysis failed: {str(e)}")
            return False

    def validate_binary_view(self):
        """Validate binary view is properly loaded"""

        if not self.bv:
            self.errors.append("Binary view is None")
            return False

        if self.bv.start >= self.bv.end:
            self.errors.append("Invalid binary address range")
            return False

        if not self.bv.arch:
            self.errors.append("No architecture detected")
            return False

        return True

    def validate_architecture(self):
        """Validate architecture is supported"""

        supported_archs = ['x86', 'x86_64', 'arm', 'aarch64', 'mips']

        if self.bv.arch.name not in supported_archs:
            self.warnings.append(f"Architecture {self.bv.arch.name} may have limited support")

        return True

    def validate_analysis_results(self):
        """Validate analysis produced reasonable results"""

        if len(self.bv.functions) == 0:
            self.warnings.append("No functions detected")
            return False

        if len(self.bv.strings) == 0:
            self.warnings.append("No strings detected")

        # Check for reasonable function count
        binary_size = self.bv.end - self.bv.start
        function_density = len(self.bv.functions) / (binary_size / 1024)  # Functions per KB

        if function_density < 0.1:
            self.warnings.append("Low function density - binary may be packed")
        elif function_density > 10:
            self.warnings.append("High function density - may indicate analysis errors")

        return True

    def get_error_report(self):
        """Get comprehensive error report"""

        return {
            'errors': self.errors,
            'warnings': self.warnings,
            'error_count': len(self.errors),
            'warning_count': len(self.warnings)
        }

# Usage example
analyzer = RobustAnalyzer(bv)
success = analyzer.analyze_with_validation()

if not success:
    error_report = analyzer.get_error_report()
    print(f"Analysis failed with {error_report['error_count']} errors")
    for error in error_report['errors']:
        print(f"  Error: {error}")

Ressources

Documentation et apprentissage

  • [Documentation de la Benary Ninja] (LINK_16) - Documentation officielle
  • [Binary Ninja API Référence] (LINK_16) - Documentation complète de l'API
  • Bibliothèque Benary Ninja - Dernières mises à jour et tutoriels
  • [Binary Ninja Community] (LINK_16) - Forum et discussions de l'utilisateur

Développement de plugins

  • [Résistoire de lecture] (LINK_16) - Dépôt officiel de plugins
  • [Guide de développement de la puce] (LINK_16) - Documentation sur le développement du plugin
  • Exemples d'API - Exemples d'API officiels
  • Modèle Plugin - Modèle de développement de plugin

Formation et certification

  • [Formation du Ninja du Bénin] (LINK_16) - Cours de formation officiels
  • [Tutoriels d'ingénierie inversés] (LINK_16) - Déobfuscation binaire avancée
  • [Défis du FCT] (LINK_16) - Défis pratiques
  • [Ressources universitaires] (LINK_16) - Matériel éducatif

Outils connexes et intégration

  • Ghidra - Cadre d'ingénierie inverse de la NSA
  • IDA Pro - Démonteuse standard de l'industrie
  • Radare2 - Cadre d'ingénierie inversée Open-source
  • [Cutter] (LINK_16) - IGU pour radare2