コンテンツにスキップ

Binary Ninja Cheat Sheet

Overview

Binary Ninja is a modern, lightweight reverse engineering platform that provides powerful analysis capabilities through its unique intermediate language (IL) system. Developed by Vector 35, it offers a scriptable interface with Python and C APIs, making it ideal for both interactive analysis and automated reverse engineering workflows.

💡 Key Strengths: SSA-based intermediate representation, live analysis, extensive Python API, cross-platform support, and active plugin ecosystem.

Installation and Setup

Commercial License Installation

# Download from official website
# Visit: https://binary.ninja/
# Choose Personal, Commercial, or Educational license

# Linux Installation:
wget https://cdn.binary.ninja/installers/BinaryNinja-personal.zip
unzip BinaryNinja-personal.zip
cd binaryninja
./binaryninja

# macOS Installation:
# Download .dmg file and drag to Applications folder

# Windows Installation:
# Download .exe installer and run with administrator privileges

Cloud/Free Version

# Binary Ninja Cloud (Free tier available)
# Access at: https://cloud.binary.ninja/
# Features: Limited analysis time, cloud-based processing
# Good for: Learning, small projects, evaluation

# Educational License:
# Free for students and educators
# Apply at: https://binary.ninja/educational/

Plugin Installation

# Plugin Manager (GUI):
# Tools -> Manage Plugins -> Browse Online

# Manual Plugin Installation:
# Linux/macOS: ~/.binaryninja/plugins/
# Windows: %APPDATA%\Binary Ninja\plugins\

# Popular Plugins:
# - Binja Toolkit: Enhanced UI and analysis features
# - Sourcery: Source code recovery and analysis
# - Binja Debugger: Integrated debugging capabilities
# - Kaitai Struct: Binary format parsing

Basic Usage and Navigation

Interface Overview

# Main Components:
# 1. Disassembly View - Assembly code with analysis
# 2. Linear View - Raw disassembly without analysis
# 3. Graph View - Control flow graph visualization
# 4. Hex View - Raw binary data
# 5. Types View - Data type definitions
# 6. Strings View - String references
# 7. Log View - Analysis progress and messages

# View Switching:
# Tab key: Cycle through views
# Ctrl+1: Disassembly view
# Ctrl+2: Graph view
# Ctrl+3: Linear view
# Ctrl+4: Hex view
# Basic Navigation:
# G: Go to address/function
# Space: Switch between graph and linear view
# Tab: Switch between disassembly views
# Esc: Go back in navigation history
# Ctrl+G: Go to address dialog

# Function Navigation:
# Ctrl+J: Jump to function
# Ctrl+Shift+J: Jump to function by name
# P: Go to previous function
# N: Go to next function
# Enter: Follow reference/call

# Cross-References:
# X: Show cross-references to current location
# Ctrl+X: Show cross-references from current location
# Shift+X: Show data cross-references

Analysis Control

# Analysis Management:
# Ctrl+A: Start/restart analysis
# Ctrl+Shift+A: Analysis options
# F5: Force function analysis
# U: Undefine function/data
# D: Define data at cursor

# Function Management:
# F: Create function at cursor
# Alt+F: Edit function properties
# Ctrl+F: Find text/bytes
# Y: Set function type

Intermediate Language (IL) System

IL Levels Overview

# Binary Ninja's Multi-Level IL System:

# 1. Low Level IL (LLIL) - Architecture-specific
# 2. Medium Level IL (MLIL) - Architecture-independent
# 3. High Level IL (HLIL) - C-like representation
# 4. Static Single Assignment (SSA) forms for each level

# Accessing IL in Python API:
import binaryninja as bn

# Load binary
bv = bn.open_view("/path/to/binary")

# Get function
func = bv.get_function_at(0x401000)

# Access different IL levels
llil = func.llil
mlil = func.mlil
hlil = func.hlil

# SSA forms
llil_ssa = func.llil.ssa_form
mlil_ssa = func.mlil.ssa_form
hlil_ssa = func.hlil.ssa_form

IL Analysis Examples

# Analyze function control flow
def analyze_function_complexity(func):
    """Calculate cyclomatic complexity using MLIL"""

    mlil = func.mlil
    if not mlil:
        return 0

    # Count decision points
    decision_points = 0
    for block in mlil.basic_blocks:
        for instr in block:
            if instr.operation in [
                bn.MediumLevelILOperation.MLIL_IF,
                bn.MediumLevelILOperation.MLIL_WHILE,
                bn.MediumLevelILOperation.MLIL_FOR,
                bn.MediumLevelILOperation.MLIL_SWITCH
            ]:
                decision_points += 1

    # Cyclomatic complexity = edges - nodes + 2
    edges = sum(len(block.outgoing_edges) for block in mlil.basic_blocks)
    nodes = len(mlil.basic_blocks)
    complexity = edges - nodes + 2

    return complexity

# Find function calls in HLIL
def find_function_calls(func):
    """Extract all function calls from HLIL"""

    calls = []
    hlil = func.hlil

    if not hlil:
        return calls

    for block in hlil.basic_blocks:
        for instr in block:
            if instr.operation == bn.HighLevelILOperation.HLIL_CALL:
                dest = instr.dest
                if hasattr(dest, 'constant'):
                    # Direct call
                    target_addr = dest.constant
                    target_func = func.view.get_function_at(target_addr)
                    if target_func:
                        calls.append({
                            'address': instr.address,
                            'target': target_func.name,
                            'target_address': target_addr
                        })
                else:
                    # Indirect call
                    calls.append({
                        'address': instr.address,
                        'target': 'indirect',
                        'expression': str(dest)
                    })

    return calls

# Data flow analysis using SSA
def trace_variable_usage(func, var_name):
    """Trace usage of a variable through SSA form"""

    mlil_ssa = func.mlil.ssa_form
    if not mlil_ssa:
        return []

    usage_points = []

    for block in mlil_ssa.basic_blocks:
        for instr in block:
            # Check for variable definitions
            if hasattr(instr, 'dest') and str(instr.dest).startswith(var_name):
                usage_points.append({
                    'address': instr.address,
                    'type': 'definition',
                    'instruction': str(instr)
                })

            # Check for variable uses
            if hasattr(instr, 'src') and str(instr.src).find(var_name) != -1:
                usage_points.append({
                    'address': instr.address,
                    'type': 'use',
                    'instruction': str(instr)
                })

    return usage_points

Python API and Scripting

Basic API Usage

import binaryninja as bn
from binaryninja import log

# Open binary file
bv = bn.open_view("/path/to/binary")
if not bv:
    log.log_error("Failed to open binary")
    exit(1)

# Basic binary information
print(f"Architecture: {bv.arch.name}")
print(f"Platform: {bv.platform.name}")
print(f"Entry point: 0x{bv.entry_point:x}")
print(f"Start address: 0x{bv.start:x}")
print(f"End address: 0x{bv.end:x}")

# Get all functions
functions = bv.functions
print(f"Total functions: {len(functions)}")

# Iterate through functions
for func in functions:
    print(f"Function: {func.name} at 0x{func.start:x}")
    print(f"  Size: {len(func)} bytes")
    print(f"  Basic blocks: {len(func.basic_blocks)}")

Advanced Analysis Scripts

# Comprehensive binary analysis script
class BinaryAnalyzer:
    def __init__(self, binary_path):
        self.bv = bn.open_view(binary_path)
        if not self.bv:
            raise ValueError(f"Cannot open binary: {binary_path}")

        # Wait for analysis to complete
        self.bv.update_analysis_and_wait()

    def analyze_strings(self):
        """Analyze string references and usage"""

        strings_analysis = {
            'total_strings': 0,
            'referenced_strings': 0,
            'unreferenced_strings': 0,
            'string_details': []
        }

        for string in self.bv.strings:
            string_info = {
                'address': string.start,
                'length': string.length,
                'value': string.value,
                'type': string.string_type.name,
                'references': []
            }

            # Find references to this string
            refs = self.bv.get_code_refs(string.start)
            for ref in refs:
                func = self.bv.get_function_at(ref.address)
                string_info['references'].append({
                    'address': ref.address,
                    'function': func.name if func else 'unknown'
                })

            strings_analysis['string_details'].append(string_info)
            strings_analysis['total_strings'] += 1

            if string_info['references']:
                strings_analysis['referenced_strings'] += 1
            else:
                strings_analysis['unreferenced_strings'] += 1

        return strings_analysis

    def find_crypto_constants(self):
        """Search for cryptographic constants"""

        crypto_constants = {
            'md5': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],
            'sha1': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0],
            'sha256': [0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a],
            'aes_sbox': [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5],
            'rc4_sbox': list(range(256))
        }

        found_constants = []

        for crypto_name, constants in crypto_constants.items():
            for constant in constants[:4]:  # Check first few constants
                # Search for 32-bit constant
                results = self.bv.find_all_constant(constant, 4)
                for addr in results:
                    found_constants.append({
                        'algorithm': crypto_name,
                        'constant': hex(constant),
                        'address': hex(addr),
                        'context': self.get_context_info(addr)
                    })

        return found_constants

    def analyze_imports(self):
        """Analyze imported functions and libraries"""

        imports_analysis = {
            'total_imports': 0,
            'libraries': {},
            'dangerous_functions': [],
            'network_functions': [],
            'crypto_functions': []
        }

        # Dangerous function patterns
        dangerous_patterns = [
            'strcpy', 'strcat', 'sprintf', 'gets', 'scanf',
            'system', 'exec', 'eval', 'shell'
        ]

        network_patterns = [
            'socket', 'connect', 'bind', 'listen', 'accept',
            'send', 'recv', 'WSA', 'inet_'
        ]

        crypto_patterns = [
            'crypt', 'hash', 'md5', 'sha', 'aes', 'des',
            'rsa', 'encrypt', 'decrypt', 'cipher'
        ]

        for symbol in self.bv.symbols:
            if symbol.type == bn.SymbolType.ImportedFunctionSymbol:
                func_name = symbol.name
                imports_analysis['total_imports'] += 1

                # Extract library name
                if '@' in func_name:
                    lib_name = func_name.split('@')[1]
                else:
                    lib_name = 'unknown'

                if lib_name not in imports_analysis['libraries']:
                    imports_analysis['libraries'][lib_name] = []

                imports_analysis['libraries'][lib_name].append(func_name)

                # Categorize functions
                func_lower = func_name.lower()

                if any(pattern in func_lower for pattern in dangerous_patterns):
                    imports_analysis['dangerous_functions'].append(func_name)

                if any(pattern in func_lower for pattern in network_patterns):
                    imports_analysis['network_functions'].append(func_name)

                if any(pattern in func_lower for pattern in crypto_patterns):
                    imports_analysis['crypto_functions'].append(func_name)

        return imports_analysis

    def detect_packing(self):
        """Detect potential packing or obfuscation"""

        packing_indicators = {
            'high_entropy_sections': [],
            'unusual_entry_point': False,
            'few_imports': False,
            'suspicious_sections': [],
            'packed_probability': 0.0
        }

        # Check entropy of sections
        for section in self.bv.sections:
            section_data = self.bv.read(section.start, section.length)
            if section_data:
                entropy = self.calculate_entropy(section_data)
                if entropy > 7.0:  # High entropy threshold
                    packing_indicators['high_entropy_sections'].append({
                        'name': section.name,
                        'entropy': entropy,
                        'address': hex(section.start),
                        'size': section.length
                    })

        # Check entry point location
        entry_point = self.bv.entry_point
        entry_section = self.bv.get_section_at(entry_point)
        if entry_section and entry_section.name not in ['.text', 'CODE']:
            packing_indicators['unusual_entry_point'] = True

        # Check import count
        import_count = len([s for s in self.bv.symbols 
                           if s.type == bn.SymbolType.ImportedFunctionSymbol])
        if import_count < 10:
            packing_indicators['few_imports'] = True

        # Calculate packing probability
        score = 0
        if packing_indicators['high_entropy_sections']:
            score += 0.4
        if packing_indicators['unusual_entry_point']:
            score += 0.3
        if packing_indicators['few_imports']:
            score += 0.3

        packing_indicators['packed_probability'] = score

        return packing_indicators

    def calculate_entropy(self, data):
        """Calculate Shannon entropy of data"""
        import math
        from collections import Counter

        if not data:
            return 0

        # Count byte frequencies
        byte_counts = Counter(data)
        data_len = len(data)

        # Calculate entropy
        entropy = 0
        for count in byte_counts.values():
            probability = count / data_len
            entropy -= probability * math.log2(probability)

        return entropy

    def get_context_info(self, address):
        """Get context information for an address"""

        func = self.bv.get_function_at(address)
        if func:
            return f"Function: {func.name}"

        section = self.bv.get_section_at(address)
        if section:
            return f"Section: {section.name}"

        return "Unknown context"

    def generate_report(self):
        """Generate comprehensive analysis report"""

        report = {
            'binary_info': {
                'architecture': self.bv.arch.name,
                'platform': self.bv.platform.name,
                'entry_point': hex(self.bv.entry_point),
                'file_size': self.bv.end - self.bv.start,
                'function_count': len(self.bv.functions)
            },
            'strings_analysis': self.analyze_strings(),
            'crypto_constants': self.find_crypto_constants(),
            'imports_analysis': self.analyze_imports(),
            'packing_detection': self.detect_packing()
        }

        return report

# Usage example
analyzer = BinaryAnalyzer("/path/to/binary")
report = analyzer.generate_report()

# Print summary
print("=== Binary Analysis Report ===")
print(f"Architecture: {report['binary_info']['architecture']}")
print(f"Functions: {report['binary_info']['function_count']}")
print(f"Strings: {report['strings_analysis']['total_strings']}")
print(f"Imports: {report['imports_analysis']['total_imports']}")
print(f"Packing probability: {report['packing_detection']['packed_probability']:.2f}")

Plugin Development

# Basic Binary Ninja plugin structure
from binaryninja import *
import json

class CustomAnalysisPlugin:
    def __init__(self):
        self.name = "Custom Analysis Plugin"
        self.description = "Performs custom binary analysis"

    def analyze_function_calls(self, bv):
        """Analyze function call patterns"""

        call_graph = {}

        for func in bv.functions:
            call_graph[func.name] = {
                'address': hex(func.start),
                'calls_to': [],
                'called_by': []
            }

            # Find functions called by this function
            for block in func.basic_blocks:
                for instr in block:
                    if instr.operation == LowLevelILOperation.LLIL_CALL:
                        target = instr.dest
                        if hasattr(target, 'constant'):
                            target_func = bv.get_function_at(target.constant)
                            if target_func:
                                call_graph[func.name]['calls_to'].append(target_func.name)

                                # Add reverse reference
                                if target_func.name not in call_graph:
                                    call_graph[target_func.name] = {
                                        'address': hex(target_func.start),
                                        'calls_to': [],
                                        'called_by': []
                                    }
                                call_graph[target_func.name]['called_by'].append(func.name)

        return call_graph

    def find_vulnerabilities(self, bv):
        """Search for potential vulnerabilities"""

        vulnerabilities = []

        # Dangerous function calls
        dangerous_functions = {
            'strcpy': 'Buffer overflow risk',
            'strcat': 'Buffer overflow risk',
            'sprintf': 'Format string vulnerability',
            'gets': 'Buffer overflow risk',
            'system': 'Command injection risk'
        }

        for func in bv.functions:
            for block in func.basic_blocks:
                for instr in block:
                    if instr.operation == LowLevelILOperation.LLIL_CALL:
                        target = instr.dest
                        if hasattr(target, 'constant'):
                            target_func = bv.get_function_at(target.constant)
                            if target_func and target_func.name in dangerous_functions:
                                vulnerabilities.append({
                                    'type': 'dangerous_function',
                                    'function': target_func.name,
                                    'risk': dangerous_functions[target_func.name],
                                    'location': hex(instr.address),
                                    'caller': func.name
                                })

        return vulnerabilities

# Plugin registration
def register_plugin():
    plugin = CustomAnalysisPlugin()

    def run_analysis(bv):
        # Run custom analysis
        call_graph = plugin.analyze_function_calls(bv)
        vulnerabilities = plugin.find_vulnerabilities(bv)

        # Display results
        log.log_info(f"Found {len(call_graph)} functions in call graph")
        log.log_info(f"Found {len(vulnerabilities)} potential vulnerabilities")

        # Save results to file
        results = {
            'call_graph': call_graph,
            'vulnerabilities': vulnerabilities
        }

        with open('analysis_results.json', 'w') as f:
            json.dump(results, f, indent=2)

        log.log_info("Analysis results saved to analysis_results.json")

    # Register menu item
    PluginCommand.register(
        "Custom Analysis\\Run Analysis",
        "Run custom binary analysis",
        run_analysis
    )

# Call registration function
register_plugin()

Advanced Analysis Techniques

Control Flow Analysis

# Advanced control flow analysis
def analyze_control_flow(func):
    """Comprehensive control flow analysis"""

    analysis = {
        'basic_blocks': len(func.basic_blocks),
        'edges': 0,
        'loops': [],
        'unreachable_blocks': [],
        'complexity_metrics': {}
    }

    # Count edges
    for block in func.basic_blocks:
        analysis['edges'] += len(block.outgoing_edges)

    # Detect loops using dominance analysis
    dominators = func.dominators
    for block in func.basic_blocks:
        for edge in block.outgoing_edges:
            target = edge.target
            # Back edge indicates loop
            if target in dominators[block]:
                analysis['loops'].append({
                    'header': hex(target.start),
                    'back_edge': hex(block.start)
                })

    # Find unreachable blocks
    reachable = set()

    def dfs_reachable(block):
        if block in reachable:
            return
        reachable.add(block)
        for edge in block.outgoing_edges:
            dfs_reachable(edge.target)

    # Start from entry block
    if func.basic_blocks:
        dfs_reachable(func.basic_blocks[0])

    for block in func.basic_blocks:
        if block not in reachable:
            analysis['unreachable_blocks'].append(hex(block.start))

    # Calculate complexity metrics
    nodes = analysis['basic_blocks']
    edges = analysis['edges']

    analysis['complexity_metrics'] = {
        'cyclomatic_complexity': edges - nodes + 2,
        'essential_complexity': len(analysis['loops']),
        'npath_complexity': calculate_npath_complexity(func)
    }

    return analysis

def calculate_npath_complexity(func):
    """Calculate nPath complexity"""

    complexity = 1

    for block in func.basic_blocks:
        # Count decision points
        if len(block.outgoing_edges) > 1:
            complexity *= len(block.outgoing_edges)

        # Account for loops
        for edge in block.outgoing_edges:
            if edge.target.start <= block.start:  # Back edge
                complexity *= 2

    return complexity

Data Flow Analysis

# Advanced data flow analysis
class DataFlowAnalyzer:
    def __init__(self, func):
        self.func = func
        self.mlil = func.mlil
        self.ssa = func.mlil.ssa_form if func.mlil else None

    def analyze_variable_definitions(self):
        """Track variable definitions and uses"""

        if not self.ssa:
            return {}

        definitions = {}
        uses = {}

        for block in self.ssa.basic_blocks:
            for instr in block:
                # Track definitions
                if hasattr(instr, 'dest') and instr.dest:
                    var_name = str(instr.dest)
                    if var_name not in definitions:
                        definitions[var_name] = []

                    definitions[var_name].append({
                        'address': instr.address,
                        'instruction': str(instr),
                        'block': block.index
                    })

                # Track uses
                for operand in instr.operands:
                    if hasattr(operand, 'src') and operand.src:
                        var_name = str(operand.src)
                        if var_name not in uses:
                            uses[var_name] = []

                        uses[var_name].append({
                            'address': instr.address,
                            'instruction': str(instr),
                            'block': block.index
                        })

        return {'definitions': definitions, 'uses': uses}

    def find_uninitialized_variables(self):
        """Find potentially uninitialized variables"""

        analysis = self.analyze_variable_definitions()
        definitions = analysis['definitions']
        uses = analysis['uses']

        uninitialized = []

        for var_name, use_list in uses.items():
            if var_name not in definitions:
                # Variable used but never defined in this function
                uninitialized.append({
                    'variable': var_name,
                    'first_use': use_list[0],
                    'all_uses': use_list
                })

        return uninitialized

    def trace_taint_propagation(self, source_vars):
        """Trace taint propagation from source variables"""

        if not self.ssa:
            return []

        tainted = set(source_vars)
        taint_flow = []

        for block in self.ssa.basic_blocks:
            for instr in block:
                # Check if instruction uses tainted data
                uses_tainted = False
                for operand in instr.operands:
                    if hasattr(operand, 'src') and str(operand.src) in tainted:
                        uses_tainted = True
                        break

                if uses_tainted:
                    # Mark destination as tainted
                    if hasattr(instr, 'dest') and instr.dest:
                        dest_var = str(instr.dest)
                        tainted.add(dest_var)

                        taint_flow.append({
                            'address': instr.address,
                            'instruction': str(instr),
                            'tainted_dest': dest_var,
                            'operation': instr.operation.name
                        })

        return taint_flow

# Usage example
analyzer = DataFlowAnalyzer(func)
var_analysis = analyzer.analyze_variable_definitions()
uninitialized = analyzer.find_uninitialized_variables()
taint_flow = analyzer.trace_taint_propagation(['user_input', 'argv'])

Cryptographic Analysis

# Cryptographic algorithm detection
class CryptoAnalyzer:
    def __init__(self, bv):
        self.bv = bv
        self.crypto_signatures = self.load_crypto_signatures()

    def load_crypto_signatures(self):
        """Load cryptographic algorithm signatures"""

        return {
            'aes': {
                'sbox': [0x63, 0x7c, 0x77, 0x7b, 0xf2, 0x6b, 0x6f, 0xc5],
                'rcon': [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80],
                'constants': [0x63636363, 0x7c7c7c7c]
            },
            'des': {
                'initial_permutation': [58, 50, 42, 34, 26, 18, 10, 2],
                'sboxes': [
                    [14, 4, 13, 1, 2, 15, 11, 8, 3, 10, 6, 12, 5, 9, 0, 7],
                    [0, 15, 7, 4, 14, 2, 13, 1, 10, 6, 12, 11, 9, 5, 3, 8]
                ]
            },
            'md5': {
                'constants': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476],
                'round_constants': [0xd76aa478, 0xe8c7b756, 0x242070db, 0xc1bdceee]
            },
            'sha1': {
                'constants': [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0],
                'round_constants': [0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xca62c1d6]
            },
            'rsa': {
                'common_exponents': [3, 17, 65537],
                'key_sizes': [1024, 2048, 3072, 4096]
            }
        }

    def detect_crypto_algorithms(self):
        """Detect cryptographic algorithms in binary"""

        detected = []

        for algo_name, signatures in self.crypto_signatures.items():
            matches = self.search_algorithm_signatures(algo_name, signatures)
            if matches:
                detected.extend(matches)

        return detected

    def search_algorithm_signatures(self, algo_name, signatures):
        """Search for specific algorithm signatures"""

        matches = []

        # Search for constants
        if 'constants' in signatures:
            for constant in signatures['constants']:
                addresses = self.bv.find_all_constant(constant, 4)
                for addr in addresses:
                    matches.append({
                        'algorithm': algo_name,
                        'type': 'constant',
                        'value': hex(constant),
                        'address': hex(addr),
                        'confidence': 0.7
                    })

        # Search for S-boxes
        if 'sbox' in signatures:
            sbox_matches = self.search_sbox_pattern(signatures['sbox'])
            for match in sbox_matches:
                matches.append({
                    'algorithm': algo_name,
                    'type': 'sbox',
                    'address': hex(match),
                    'confidence': 0.9
                })

        # Search for permutation tables
        if 'initial_permutation' in signatures:
            perm_matches = self.search_permutation_table(signatures['initial_permutation'])
            for match in perm_matches:
                matches.append({
                    'algorithm': algo_name,
                    'type': 'permutation_table',
                    'address': hex(match),
                    'confidence': 0.8
                })

        return matches

    def search_sbox_pattern(self, sbox):
        """Search for S-box patterns in binary"""

        matches = []
        sbox_bytes = bytes(sbox)

        # Search for exact S-box pattern
        current_addr = self.bv.start
        while current_addr < self.bv.end:
            data = self.bv.read(current_addr, len(sbox_bytes))
            if data == sbox_bytes:
                matches.append(current_addr)
                current_addr += len(sbox_bytes)
            else:
                current_addr += 1

        return matches

    def search_permutation_table(self, perm_table):
        """Search for permutation table patterns"""

        matches = []

        # Convert to bytes and search
        if all(x < 256 for x in perm_table):
            perm_bytes = bytes(perm_table)
            current_addr = self.bv.start

            while current_addr < self.bv.end:
                data = self.bv.read(current_addr, len(perm_bytes))
                if data == perm_bytes:
                    matches.append(current_addr)
                    current_addr += len(perm_bytes)
                else:
                    current_addr += 1

        return matches

    def analyze_crypto_functions(self):
        """Analyze functions that might implement crypto"""

        crypto_functions = []

        for func in self.bv.functions:
            score = self.calculate_crypto_score(func)
            if score > 0.5:
                crypto_functions.append({
                    'function': func.name,
                    'address': hex(func.start),
                    'score': score,
                    'indicators': self.get_crypto_indicators(func)
                })

        return crypto_functions

    def calculate_crypto_score(self, func):
        """Calculate likelihood that function implements crypto"""

        score = 0.0

        # Check for bit manipulation operations
        bit_ops = 0
        arithmetic_ops = 0

        if func.mlil:
            for block in func.mlil.basic_blocks:
                for instr in block:
                    op = instr.operation

                    # Bit operations
                    if op in [bn.MediumLevelILOperation.MLIL_XOR,
                             bn.MediumLevelILOperation.MLIL_AND,
                             bn.MediumLevelILOperation.MLIL_OR,
                             bn.MediumLevelILOperation.MLIL_LSL,
                             bn.MediumLevelILOperation.MLIL_LSR,
                             bn.MediumLevelILOperation.MLIL_ROR,
                             bn.MediumLevelILOperation.MLIL_ROL]:
                        bit_ops += 1

                    # Arithmetic operations
                    elif op in [bn.MediumLevelILOperation.MLIL_ADD,
                               bn.MediumLevelILOperation.MLIL_SUB,
                               bn.MediumLevelILOperation.MLIL_MUL]:
                        arithmetic_ops += 1

        # High ratio of bit operations suggests crypto
        total_ops = bit_ops + arithmetic_ops
        if total_ops > 0:
            bit_ratio = bit_ops / total_ops
            if bit_ratio > 0.3:
                score += 0.4

        # Check for loops (common in crypto)
        if len(func.basic_blocks) > 5:
            score += 0.2

        # Check for constants
        constants_found = 0
        for algo_name, signatures in self.crypto_signatures.items():
            if 'constants' in signatures:
                for constant in signatures['constants']:
                    if self.function_contains_constant(func, constant):
                        constants_found += 1

        if constants_found > 0:
            score += 0.4

        return min(score, 1.0)

    def function_contains_constant(self, func, constant):
        """Check if function contains specific constant"""

        for addr in range(func.start, func.start + len(func)):
            try:
                value = self.bv.read_int(addr, 4, False)  # Little endian
                if value == constant:
                    return True
                value = self.bv.read_int(addr, 4, True)   # Big endian
                if value == constant:
                    return True
            except:
                continue

        return False

    def get_crypto_indicators(self, func):
        """Get specific crypto indicators for function"""

        indicators = []

        # Check for specific patterns
        if self.has_substitution_pattern(func):
            indicators.append('substitution_operations')

        if self.has_permutation_pattern(func):
            indicators.append('permutation_operations')

        if self.has_key_schedule_pattern(func):
            indicators.append('key_schedule_operations')

        return indicators

    def has_substitution_pattern(self, func):
        """Check for substitution box patterns"""
        # Look for array indexing patterns common in S-boxes
        return False  # Simplified for example

    def has_permutation_pattern(self, func):
        """Check for permutation patterns"""
        # Look for bit manipulation patterns
        return False  # Simplified for example

    def has_key_schedule_pattern(self, func):
        """Check for key schedule patterns"""
        # Look for iterative key expansion patterns
        return False  # Simplified for example

# Usage example
crypto_analyzer = CryptoAnalyzer(bv)
detected_algos = crypto_analyzer.detect_crypto_algorithms()
crypto_functions = crypto_analyzer.analyze_crypto_functions()

print(f"Detected {len(detected_algos)} crypto algorithm signatures")
print(f"Found {len(crypto_functions)} potential crypto functions")

Debugging and Dynamic Analysis

Debugger Integration

# Binary Ninja debugger integration
import binaryninja as bn
from binaryninja.debugger import DebuggerController

class DebuggerHelper:
    def __init__(self, bv):
        self.bv = bv
        self.debugger = None
        self.breakpoints = {}
        self.watchpoints = {}

    def start_debugging(self, target_path, args=None):
        """Start debugging session"""

        try:
            # Create debugger controller
            self.debugger = DebuggerController(self.bv)

            # Set target
            self.debugger.set_target(target_path, args or [])

            # Launch process
            self.debugger.launch()

            log.log_info(f"Started debugging: {target_path}")
            return True

        except Exception as e:
            log.log_error(f"Failed to start debugger: {e}")
            return False

    def set_breakpoint(self, address, condition=None):
        """Set breakpoint at address"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return False

        try:
            bp_id = self.debugger.add_breakpoint(address)

            self.breakpoints[address] = {
                'id': bp_id,
                'condition': condition,
                'hit_count': 0
            }

            log.log_info(f"Breakpoint set at 0x{address:x}")
            return True

        except Exception as e:
            log.log_error(f"Failed to set breakpoint: {e}")
            return False

    def set_watchpoint(self, address, size, access_type='rw'):
        """Set memory watchpoint"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return False

        try:
            wp_id = self.debugger.add_watchpoint(address, size, access_type)

            self.watchpoints[address] = {
                'id': wp_id,
                'size': size,
                'access_type': access_type,
                'hit_count': 0
            }

            log.log_info(f"Watchpoint set at 0x{address:x} ({size} bytes, {access_type})")
            return True

        except Exception as e:
            log.log_error(f"Failed to set watchpoint: {e}")
            return False

    def trace_execution(self, start_addr, end_addr, max_instructions=10000):
        """Trace execution between two addresses"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return []

        trace = []
        instruction_count = 0

        # Set breakpoint at start
        self.set_breakpoint(start_addr)

        # Continue until start address
        self.debugger.continue_execution()

        # Single step and record
        while instruction_count < max_instructions:
            current_addr = self.debugger.get_current_address()

            if current_addr == end_addr:
                break

            # Record instruction
            instr_text = self.bv.get_disassembly(current_addr)
            registers = self.debugger.get_registers()

            trace.append({
                'address': current_addr,
                'instruction': instr_text,
                'registers': registers.copy(),
                'step': instruction_count
            })

            # Single step
            self.debugger.step_into()
            instruction_count += 1

        return trace

    def analyze_memory_access(self, address, size=0x1000):
        """Analyze memory access patterns"""

        if not self.debugger:
            log.log_error("Debugger not started")
            return None

        # Set watchpoint on memory region
        self.set_watchpoint(address, size, 'rw')

        access_log = []

        # Continue execution and log accesses
        while True:
            try:
                self.debugger.continue_execution()

                # Check if watchpoint hit
                if self.debugger.is_watchpoint_hit():
                    hit_addr = self.debugger.get_watchpoint_address()
                    access_type = self.debugger.get_access_type()
                    current_pc = self.debugger.get_current_address()

                    access_log.append({
                        'pc': current_pc,
                        'memory_address': hit_addr,
                        'access_type': access_type,
                        'timestamp': self.debugger.get_timestamp()
                    })

                    # Continue after hit
                    self.debugger.continue_execution()

            except KeyboardInterrupt:
                break
            except Exception as e:
                log.log_error(f"Error during memory analysis: {e}")
                break

        return access_log

# Usage example
debugger_helper = DebuggerHelper(bv)

# Start debugging
if debugger_helper.start_debugging("/path/to/target"):
    # Set breakpoints
    debugger_helper.set_breakpoint(0x401000)
    debugger_helper.set_breakpoint(0x401500)

    # Trace execution
    trace = debugger_helper.trace_execution(0x401000, 0x401500)

    # Analyze memory access
    memory_access = debugger_helper.analyze_memory_access(0x402000, 0x1000)

Code Coverage Analysis

# Code coverage analysis with Binary Ninja
class CoverageAnalyzer:
    def __init__(self, bv):
        self.bv = bv
        self.coverage_data = {}
        self.basic_block_hits = {}
        self.function_coverage = {}

    def initialize_coverage(self):
        """Initialize coverage tracking for all functions"""

        for func in self.bv.functions:
            self.function_coverage[func.start] = {
                'name': func.name,
                'total_blocks': len(func.basic_blocks),
                'hit_blocks': set(),
                'coverage_percentage': 0.0
            }

            for block in func.basic_blocks:
                self.basic_block_hits[block.start] = {
                    'function': func.name,
                    'hit_count': 0,
                    'first_hit': None,
                    'last_hit': None
                }

    def record_execution(self, address):
        """Record execution of an address"""

        # Find which basic block this address belongs to
        func = self.bv.get_function_at(address)
        if not func:
            return

        for block in func.basic_blocks:
            if block.start <= address < block.end:
                # Record block hit
                if block.start in self.basic_block_hits:
                    self.basic_block_hits[block.start]['hit_count'] += 1

                    if not self.basic_block_hits[block.start]['first_hit']:
                        self.basic_block_hits[block.start]['first_hit'] = address

                    self.basic_block_hits[block.start]['last_hit'] = address

                    # Update function coverage
                    if func.start in self.function_coverage:
                        self.function_coverage[func.start]['hit_blocks'].add(block.start)

                        hit_count = len(self.function_coverage[func.start]['hit_blocks'])
                        total_count = self.function_coverage[func.start]['total_blocks']

                        self.function_coverage[func.start]['coverage_percentage'] = \
                            (hit_count / total_count) * 100.0

                break

    def import_coverage_data(self, coverage_file):
        """Import coverage data from external tool (e.g., DynamoRIO, Intel PIN)"""

        try:
            with open(coverage_file, 'r') as f:
                for line in f:
                    line = line.strip()
                    if line.startswith('0x'):
                        address = int(line, 16)
                        self.record_execution(address)

            log.log_info(f"Imported coverage data from {coverage_file}")

        except Exception as e:
            log.log_error(f"Failed to import coverage data: {e}")

    def generate_coverage_report(self):
        """Generate comprehensive coverage report"""

        report = {
            'summary': {
                'total_functions': len(self.function_coverage),
                'covered_functions': 0,
                'total_basic_blocks': len(self.basic_block_hits),
                'covered_basic_blocks': 0,
                'overall_coverage': 0.0
            },
            'function_coverage': [],
            'uncovered_functions': [],
            'hot_spots': []
        }

        covered_blocks = 0

        for func_addr, coverage in self.function_coverage.items():
            if coverage['coverage_percentage'] > 0:
                report['summary']['covered_functions'] += 1
                report['function_coverage'].append({
                    'name': coverage['name'],
                    'address': hex(func_addr),
                    'coverage': coverage['coverage_percentage'],
                    'hit_blocks': len(coverage['hit_blocks']),
                    'total_blocks': coverage['total_blocks']
                })
            else:
                report['uncovered_functions'].append({
                    'name': coverage['name'],
                    'address': hex(func_addr)
                })

            covered_blocks += len(coverage['hit_blocks'])

        # Calculate overall coverage
        if report['summary']['total_basic_blocks'] > 0:
            report['summary']['overall_coverage'] = \
                (covered_blocks / report['summary']['total_basic_blocks']) * 100.0

        report['summary']['covered_basic_blocks'] = covered_blocks

        # Find hot spots (frequently executed blocks)
        hot_spots = []
        for block_addr, hit_data in self.basic_block_hits.items():
            if hit_data['hit_count'] > 100:  # Threshold for hot spot
                hot_spots.append({
                    'address': hex(block_addr),
                    'function': hit_data['function'],
                    'hit_count': hit_data['hit_count']
                })

        # Sort by hit count
        hot_spots.sort(key=lambda x: x['hit_count'], reverse=True)
        report['hot_spots'] = hot_spots[:20]  # Top 20 hot spots

        return report

    def visualize_coverage(self, func_name):
        """Create coverage visualization for a function"""

        func = None
        for f in self.bv.functions:
            if f.name == func_name:
                func = f
                break

        if not func:
            log.log_error(f"Function {func_name} not found")
            return

        # Create coverage map
        coverage_map = {}

        for block in func.basic_blocks:
            is_covered = block.start in self.basic_block_hits and \
                        self.basic_block_hits[block.start]['hit_count'] > 0

            hit_count = self.basic_block_hits.get(block.start, {}).get('hit_count', 0)

            coverage_map[block.start] = {
                'covered': is_covered,
                'hit_count': hit_count,
                'start': block.start,
                'end': block.end,
                'size': len(block)
            }

        # Apply coverage highlighting in Binary Ninja
        for block_addr, coverage in coverage_map.items():
            if coverage['covered']:
                # Highlight covered blocks in green
                self.bv.set_comment_at(block_addr, f"Coverage: {coverage['hit_count']} hits")
            else:
                # Highlight uncovered blocks in red
                self.bv.set_comment_at(block_addr, "UNCOVERED")

        log.log_info(f"Applied coverage visualization for {func_name}")

# Usage example
coverage_analyzer = CoverageAnalyzer(bv)
coverage_analyzer.initialize_coverage()

# Import coverage data from external tool
coverage_analyzer.import_coverage_data("coverage_trace.txt")

# Generate report
report = coverage_analyzer.generate_coverage_report()
print(f"Overall coverage: {report['summary']['overall_coverage']:.2f}%")
print(f"Covered functions: {report['summary']['covered_functions']}/{report['summary']['total_functions']}")

# Visualize coverage for specific function
coverage_analyzer.visualize_coverage("main")

Integration and Automation

CI/CD Integration

# Binary Ninja in CI/CD pipelines

# GitHub Actions workflow example
name: Binary Analysis
on: [push, pull_request]

jobs:
  binary-analysis:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2

    - name: Setup Binary Ninja
      run: |
        # Download and install Binary Ninja headless
        wget ${{ secrets.BINJA_DOWNLOAD_URL }}
        unzip binaryninja-headless.zip
        export PATH=$PATH:$(pwd)/binaryninja

    - name: Run Binary Analysis
      run: |
        python3 analyze_binary.py --binary target_binary --output analysis_report.json

    - name: Upload Analysis Results
      uses: actions/upload-artifact@v2
      with:
        name: analysis-results
        path: analysis_report.json

# Jenkins pipeline example
pipeline {
    agent any

    stages {
        stage('Binary Analysis') {
            steps {
                script {
                    // Run Binary Ninja analysis
                    sh '''
                        python3 -c "
                        import binaryninja as bn
                        import json

                        bv = bn.open_view('${BINARY_PATH}')
                        bv.update_analysis_and_wait()

                        # Perform analysis
                        results = {
                            'functions': len(bv.functions),
                            'strings': len(bv.strings),
                            'imports': len([s for s in bv.symbols if s.type == bn.SymbolType.ImportedFunctionSymbol])
                        }

                        with open('analysis_results.json', 'w') as f:
                            json.dump(results, f)
                        "
                    '''
                }
            }
        }

        stage('Security Analysis') {
            steps {
                script {
                    // Run security-focused analysis
                    sh 'python3 security_analysis.py --input analysis_results.json'
                }
            }
        }
    }

    post {
        always {
            archiveArtifacts artifacts: '*.json', fingerprint: true
        }
    }
}

Batch Processing

# Batch processing multiple binaries
import os
import json
import multiprocessing
from pathlib import Path
import binaryninja as bn

class BatchAnalyzer:
    def __init__(self, output_dir="batch_results"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def analyze_single_binary(self, binary_path):
        """Analyze a single binary file"""

        try:
            # Open binary
            bv = bn.open_view(str(binary_path))
            if not bv:
                return {'error': f'Failed to open {binary_path}'}

            # Wait for analysis
            bv.update_analysis_and_wait()

            # Collect analysis results
            results = {
                'file_path': str(binary_path),
                'file_size': binary_path.stat().st_size,
                'architecture': bv.arch.name,
                'platform': bv.platform.name,
                'entry_point': hex(bv.entry_point),
                'functions': {
                    'total': len(bv.functions),
                    'named': len([f for f in bv.functions if not f.name.startswith('sub_')]),
                    'library': len([f for f in bv.functions if f.symbol and f.symbol.type == bn.SymbolType.LibraryFunctionSymbol])
                },
                'strings': {
                    'total': len(bv.strings),
                    'ascii': len([s for s in bv.strings if s.string_type == bn.StringType.AsciiString]),
                    'unicode': len([s for s in bv.strings if s.string_type == bn.StringType.Utf16String])
                },
                'imports': len([s for s in bv.symbols if s.type == bn.SymbolType.ImportedFunctionSymbol]),
                'exports': len([s for s in bv.symbols if s.type == bn.SymbolType.FunctionSymbol and s.binding == bn.SymbolBinding.GlobalBinding]),
                'sections': [
                    {
                        'name': section.name,
                        'start': hex(section.start),
                        'length': section.length,
                        'semantics': section.semantics.name
                    }
                    for section in bv.sections
                ]
            }

            # Perform additional analysis
            results['security_analysis'] = self.perform_security_analysis(bv)
            results['complexity_analysis'] = self.perform_complexity_analysis(bv)

            # Close binary view
            bv.file.close()

            return results

        except Exception as e:
            return {'error': f'Analysis failed for {binary_path}: {str(e)}'}

    def perform_security_analysis(self, bv):
        """Perform security-focused analysis"""

        security_results = {
            'dangerous_functions': [],
            'crypto_indicators': [],
            'packing_indicators': {},
            'stack_strings': []
        }

        # Check for dangerous functions
        dangerous_functions = [
            'strcpy', 'strcat', 'sprintf', 'gets', 'scanf',
            'system', 'exec', 'eval', 'shell'
        ]

        for func in bv.functions:
            if func.name.lower() in [df.lower() for df in dangerous_functions]:
                security_results['dangerous_functions'].append({
                    'name': func.name,
                    'address': hex(func.start)
                })

        # Check for crypto constants
        crypto_constants = [0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476]  # MD5/SHA1

        for constant in crypto_constants:
            addresses = bv.find_all_constant(constant, 4)
            if addresses:
                security_results['crypto_indicators'].append({
                    'constant': hex(constant),
                    'addresses': [hex(addr) for addr in addresses]
                })

        return security_results

    def perform_complexity_analysis(self, bv):
        """Perform complexity analysis"""

        complexity_results = {
            'total_complexity': 0,
            'average_complexity': 0.0,
            'complex_functions': []
        }

        complexities = []

        for func in bv.functions:
            # Calculate cyclomatic complexity
            edges = sum(len(block.outgoing_edges) for block in func.basic_blocks)
            nodes = len(func.basic_blocks)
            complexity = edges - nodes + 2 if nodes > 0 else 0

            complexities.append(complexity)

            if complexity > 10:  # High complexity threshold
                complexity_results['complex_functions'].append({
                    'name': func.name,
                    'address': hex(func.start),
                    'complexity': complexity
                })

        if complexities:
            complexity_results['total_complexity'] = sum(complexities)
            complexity_results['average_complexity'] = sum(complexities) / len(complexities)

        return complexity_results

    def analyze_directory(self, directory_path, file_patterns=None, max_workers=4):
        """Analyze all binaries in a directory"""

        if file_patterns is None:
            file_patterns = ['*.exe', '*.dll', '*.so', '*.dylib', '*']

        # Find all binary files
        binary_files = []
        directory = Path(directory_path)

        for pattern in file_patterns:
            binary_files.extend(directory.glob(pattern))

        # Filter out non-files and duplicates
        binary_files = list(set([f for f in binary_files if f.is_file()]))

        print(f"Found {len(binary_files)} files to analyze")

        # Analyze files in parallel
        with multiprocessing.Pool(max_workers) as pool:
            results = pool.map(self.analyze_single_binary, binary_files)

        # Save results
        batch_results = {
            'summary': {
                'total_files': len(binary_files),
                'successful_analyses': len([r for r in results if 'error' not in r]),
                'failed_analyses': len([r for r in results if 'error' in r])
            },
            'results': results
        }

        output_file = self.output_dir / 'batch_analysis_results.json'
        with open(output_file, 'w') as f:
            json.dump(batch_results, f, indent=2)

        print(f"Batch analysis complete. Results saved to {output_file}")
        return batch_results

    def generate_summary_report(self, results):
        """Generate summary report from batch results"""

        if isinstance(results, (str, Path)):
            # Load results from file
            with open(results, 'r') as f:
                results = json.load(f)

        successful_results = [r for r in results['results'] if 'error' not in r]

        # Architecture distribution
        arch_dist = {}
        for result in successful_results:
            arch = result.get('architecture', 'unknown')
            arch_dist[arch] = arch_dist.get(arch, 0) + 1

        # Platform distribution
        platform_dist = {}
        for result in successful_results:
            platform = result.get('platform', 'unknown')
            platform_dist[platform] = platform_dist.get(platform, 0) + 1

        # Security summary
        total_dangerous_functions = sum(
            len(result.get('security_analysis', {}).get('dangerous_functions', []))
            for result in successful_results
        )

        files_with_crypto = len([
            result for result in successful_results
            if result.get('security_analysis', {}).get('crypto_indicators')
        ])

        # Complexity summary
        avg_complexity = sum(
            result.get('complexity_analysis', {}).get('average_complexity', 0)
            for result in successful_results
        ) / len(successful_results) if successful_results else 0

        summary = {
            'analysis_summary': results['summary'],
            'architecture_distribution': arch_dist,
            'platform_distribution': platform_dist,
            'security_summary': {
                'total_dangerous_functions': total_dangerous_functions,
                'files_with_crypto_indicators': files_with_crypto,
                'percentage_with_crypto': (files_with_crypto / len(successful_results)) * 100 if successful_results else 0
            },
            'complexity_summary': {
                'average_complexity': avg_complexity,
                'high_complexity_files': len([
                    result for result in successful_results
                    if result.get('complexity_analysis', {}).get('average_complexity', 0) > 10
                ])
            }
        }

        # Save summary report
        summary_file = self.output_dir / 'summary_report.json'
        with open(summary_file, 'w') as f:
            json.dump(summary, f, indent=2)

        print(f"Summary report saved to {summary_file}")
        return summary

# Usage example
batch_analyzer = BatchAnalyzer("analysis_results")

# Analyze all files in a directory
results = batch_analyzer.analyze_directory(
    "/path/to/binaries",
    file_patterns=['*.exe', '*.dll'],
    max_workers=8
)

# Generate summary report
summary = batch_analyzer.generate_summary_report(results)

print("Batch Analysis Summary:")
print(f"- Total files: {summary['analysis_summary']['total_files']}")
print(f"- Successful analyses: {summary['analysis_summary']['successful_analyses']}")
print(f"- Average complexity: {summary['complexity_summary']['average_complexity']:.2f}")
print(f"- Files with crypto indicators: {summary['security_summary']['files_with_crypto_indicators']}")

Best Practices and Tips

Performance Optimization

# Performance optimization techniques
class PerformanceOptimizer:
    def __init__(self, bv):
        self.bv = bv

    def optimize_analysis_settings(self):
        """Optimize Binary Ninja analysis settings for performance"""

        # Get analysis settings
        settings = self.bv.analysis_settings

        # Disable expensive analysis for large binaries
        if self.bv.end - self.bv.start > 50 * 1024 * 1024:  # 50MB threshold
            settings.set_bool('analysis.linearSweep.autorun', False)
            settings.set_bool('analysis.signatureMatcher.autorun', False)
            settings.set_int('analysis.limits.maxFunctionSize', 100000)

        # Optimize for specific architectures
        if self.bv.arch.name in ['x86', 'x86_64']:
            settings.set_bool('analysis.x86.disassembly.simplifyFPUInstructions', True)
            settings.set_bool('analysis.x86.disassembly.simplifySSEInstructions', True)

        # Set reasonable limits
        settings.set_int('analysis.limits.maxFunctionAnalysisTime', 300)  # 5 minutes
        settings.set_int('analysis.limits.maxBasicBlockAnalysisTime', 60)  # 1 minute

        log.log_info("Analysis settings optimized for performance")

    def use_parallel_analysis(self, function_list=None):
        """Use parallel processing for function analysis"""

        import concurrent.futures

        functions = function_list or self.bv.functions

        def analyze_function(func):
            """Analyze a single function"""
            try:
                # Force function analysis
                func.reanalyze()
                return {
                    'name': func.name,
                    'address': func.start,
                    'status': 'success'
                }
            except Exception as e:
                return {
                    'name': func.name,
                    'address': func.start,
                    'status': 'error',
                    'error': str(e)
                }

        # Analyze functions in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(analyze_function, functions))

        successful = len([r for r in results if r['status'] == 'success'])
        failed = len([r for r in results if r['status'] == 'error'])

        log.log_info(f"Parallel analysis complete: {successful} successful, {failed} failed")
        return results

    def cache_analysis_results(self, cache_file="analysis_cache.json"):
        """Cache analysis results for faster subsequent loads"""

        cache_data = {
            'binary_hash': self.calculate_binary_hash(),
            'analysis_timestamp': time.time(),
            'functions': [],
            'strings': [],
            'imports': []
        }

        # Cache function information
        for func in self.bv.functions:
            cache_data['functions'].append({
                'name': func.name,
                'start': func.start,
                'end': func.start + len(func),
                'basic_blocks': len(func.basic_blocks),
                'complexity': self.calculate_function_complexity(func)
            })

        # Cache strings
        for string in self.bv.strings:
            cache_data['strings'].append({
                'address': string.start,
                'value': string.value,
                'type': string.string_type.name
            })

        # Cache imports
        for symbol in self.bv.symbols:
            if symbol.type == bn.SymbolType.ImportedFunctionSymbol:
                cache_data['imports'].append({
                    'name': symbol.name,
                    'address': symbol.address
                })

        # Save cache
        with open(cache_file, 'w') as f:
            json.dump(cache_data, f, indent=2)

        log.log_info(f"Analysis results cached to {cache_file}")

    def calculate_binary_hash(self):
        """Calculate hash of binary for cache validation"""
        import hashlib

        hasher = hashlib.sha256()

        # Read binary in chunks
        chunk_size = 64 * 1024
        current_addr = self.bv.start

        while current_addr < self.bv.end:
            chunk_size = min(chunk_size, self.bv.end - current_addr)
            data = self.bv.read(current_addr, chunk_size)
            if data:
                hasher.update(data)
            current_addr += chunk_size

        return hasher.hexdigest()

# Memory management for large binaries
class MemoryManager:
    def __init__(self, bv):
        self.bv = bv
        self.cached_data = {}
        self.cache_limit = 100  # Maximum cached items

    def get_function_data(self, func_addr):
        """Get function data with caching"""

        if func_addr in self.cached_data:
            return self.cached_data[func_addr]

        func = self.bv.get_function_at(func_addr)
        if not func:
            return None

        # Extract function data
        func_data = {
            'name': func.name,
            'basic_blocks': len(func.basic_blocks),
            'instructions': [],
            'calls': []
        }

        # Get instructions (limit to avoid memory issues)
        for block in func.basic_blocks[:10]:  # Limit blocks
            for instr in block[:50]:  # Limit instructions per block
                func_data['instructions'].append({
                    'address': instr.address,
                    'text': str(instr)
                })

        # Cache data
        if len(self.cached_data) >= self.cache_limit:
            # Remove oldest entry
            oldest_key = next(iter(self.cached_data))
            del self.cached_data[oldest_key]

        self.cached_data[func_addr] = func_data
        return func_data

    def clear_cache(self):
        """Clear cached data to free memory"""
        self.cached_data.clear()
        log.log_info("Memory cache cleared")

Error Handling and Debugging

# Robust error handling for Binary Ninja scripts
import logging
import traceback
from functools import wraps

def safe_analysis(func):
    """Decorator for safe analysis functions"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            log.log_error(f"Error in {func.__name__}: {str(e)}")
            log.log_error(f"Traceback: {traceback.format_exc()}")
            return None

    return wrapper

class RobustAnalyzer:
    def __init__(self, bv):
        self.bv = bv
        self.errors = []
        self.warnings = []

    @safe_analysis
    def analyze_with_validation(self):
        """Perform analysis with extensive validation"""

        # Validate binary view
        if not self.validate_binary_view():
            return False

        # Validate architecture support
        if not self.validate_architecture():
            return False

        # Perform analysis with error tracking
        try:
            self.bv.update_analysis_and_wait()

            # Validate analysis results
            if not self.validate_analysis_results():
                self.warnings.append("Analysis results may be incomplete")

            return True

        except Exception as e:
            self.errors.append(f"Analysis failed: {str(e)}")
            return False

    def validate_binary_view(self):
        """Validate binary view is properly loaded"""

        if not self.bv:
            self.errors.append("Binary view is None")
            return False

        if self.bv.start >= self.bv.end:
            self.errors.append("Invalid binary address range")
            return False

        if not self.bv.arch:
            self.errors.append("No architecture detected")
            return False

        return True

    def validate_architecture(self):
        """Validate architecture is supported"""

        supported_archs = ['x86', 'x86_64', 'arm', 'aarch64', 'mips']

        if self.bv.arch.name not in supported_archs:
            self.warnings.append(f"Architecture {self.bv.arch.name} may have limited support")

        return True

    def validate_analysis_results(self):
        """Validate analysis produced reasonable results"""

        if len(self.bv.functions) == 0:
            self.warnings.append("No functions detected")
            return False

        if len(self.bv.strings) == 0:
            self.warnings.append("No strings detected")

        # Check for reasonable function count
        binary_size = self.bv.end - self.bv.start
        function_density = len(self.bv.functions) / (binary_size / 1024)  # Functions per KB

        if function_density < 0.1:
            self.warnings.append("Low function density - binary may be packed")
        elif function_density > 10:
            self.warnings.append("High function density - may indicate analysis errors")

        return True

    def get_error_report(self):
        """Get comprehensive error report"""

        return {
            'errors': self.errors,
            'warnings': self.warnings,
            'error_count': len(self.errors),
            'warning_count': len(self.warnings)
        }

# Usage example
analyzer = RobustAnalyzer(bv)
success = analyzer.analyze_with_validation()

if not success:
    error_report = analyzer.get_error_report()
    print(f"Analysis failed with {error_report['error_count']} errors")
    for error in error_report['errors']:
        print(f"  Error: {error}")

Resources

Documentation and Learning

Plugin Development

Training and Certification

  • Ghidra - NSA's reverse engineering framework
  • IDA Pro - Industry standard disassembler
  • Radare2 - Open-source reverse engineering framework
  • Cutter - GUI for radare2