Saltar a contenido

Hoja de goma de motor unicornio

"Clase de la hoja" id="copy-btn" class="copy-btn" onclick="copyAllCommands()" Copiar todos los comandos id="pdf-btn" class="pdf-btn" onclick="generatePDF()" Generar PDF seleccionado/button ■/div titulada

Sinopsis

Unicorn Engine es un marco de emulador CPU de peso ligero, multiplataforma basado en QEMU. Proporciona una API limpia para emular instrucciones de CPU en varias arquitecturas, lo que lo hace ideal para el análisis de malware, ingeniería inversa, fuzzing, e investigación de seguridad.

Características clave: Soporte multiarquitectura (x86, ARM, MIPS, SPARC, M68K), diseño ligero, API limpia, enlaces de lenguaje amplios e integración con otras herramientas de seguridad.

Instalación y configuración

Instalación básica

# Install via pip (recommended)
pip install unicorn

# Install development version
pip install git+https://github.com/unicorn-engine/unicorn.git

# Install with additional bindings
pip install unicorn[all]

# System dependencies (Ubuntu/Debian)
sudo apt-get update
sudo apt-get install -y \
    build-essential \
    cmake \
    pkg-config \
    libglib2.0-dev

# System dependencies (macOS)
brew install cmake pkg-config glib

# Build from source
git clone https://github.com/unicorn-engine/unicorn.git
cd unicorn
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make -j$(nproc)
sudo make install

# Python bindings from source
cd ../bindings/python
python setup.py install

# Verify installation
python -c "import unicorn; print(f'Unicorn version: {unicorn.version_bind()}')"

Arreglos de idiomas

# Python (primary)
pip install unicorn

# Java
# Download unicorn.jar from releases

# .NET/C#
# Install-Package UnicornEngine

# Go
go get github.com/unicorn-engine/unicorn/bindings/go/unicorn

# Rust
# Add to Cargo.toml: unicorn-engine = "2.0"

# Ruby
gem install unicorn-engine

# Haskell
cabal install unicorn

# Verify bindings
python -c "
import unicorn
from unicorn import *
from unicorn.x86_const import *
print('All bindings imported successfully')
"

Apoyo a la Arquitectura

# Supported architectures and modes
import unicorn
from unicorn import *

# Architecture constants
architectures = {
    'x86': {
        'arch': UC_ARCH_X86,
        'modes': {
            '16-bit': UC_MODE_16,
            '32-bit': UC_MODE_32, 
            '64-bit': UC_MODE_64
        }
    },
    'ARM': {
        'arch': UC_ARCH_ARM,
        'modes': {
            'ARM': UC_MODE_ARM,
            'Thumb': UC_MODE_THUMB,
            'Cortex-M': UC_MODE_MCLASS,
            'ARMv8': UC_MODE_V8
        }
    },
    'ARM64': {
        'arch': UC_ARCH_ARM64,
        'modes': {
            'ARM64': UC_MODE_ARM
        }
    },
    'MIPS': {
        'arch': UC_ARCH_MIPS,
        'modes': {
            '32-bit': UC_MODE_32,
            '64-bit': UC_MODE_64,
            'Big Endian': UC_MODE_BIG_ENDIAN,
            'Little Endian': UC_MODE_LITTLE_ENDIAN,
            'Micro MIPS': UC_MODE_MICRO
        }
    },
    'SPARC': {
        'arch': UC_ARCH_SPARC,
        'modes': {
            '32-bit': UC_MODE_32,
            '64-bit': UC_MODE_64,
            'V9': UC_MODE_V9
        }
    },
    'M68K': {
        'arch': UC_ARCH_M68K,
        'modes': {
            '68000': UC_MODE_BIG_ENDIAN
        }
    }
}

# Print supported architectures
for arch_name, arch_info in architectures.items():
    print(f"{arch_name}:")
    for mode_name, mode_const in arch_info['modes'].items():
        print(f"  {mode_name}: {mode_const}")

Emulación básica

Emulación simple x86

# Basic x86 emulation example
import unicorn
from unicorn import *
from unicorn.x86_const import *

def basic_x86_emulation():
    """Basic x86-64 emulation example"""

    # x86-64 machine code for: mov eax, 0x1234; ret
    X86_CODE = b"\x48\xc7\xc0\x34\x12\x00\x00\xc3"  # mov rax, 0x1234; ret

    # Memory addresses
    ADDRESS = 0x1000000

    print("Basic x86-64 emulation")

    try:
        # Initialize emulator in x86-64 mode
        mu = Uc(UC_ARCH_X86, UC_MODE_64)

        # Map memory for this emulation
        mu.mem_map(ADDRESS, 2 * 1024 * 1024)  # 2MB

        # Write machine code to be emulated to memory
        mu.mem_write(ADDRESS, X86_CODE)

        # Initialize registers
        mu.reg_write(UC_X86_REG_RAX, 0x0)
        mu.reg_write(UC_X86_REG_RBX, 0x0)

        print(f"Initial RAX: 0x{mu.reg_read(UC_X86_REG_RAX):x}")

        # Emulate code
        mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))

        # Read register values after emulation
        rax = mu.reg_read(UC_X86_REG_RAX)
        print(f"Final RAX: 0x{rax:x}")

        assert rax == 0x1234, f"Expected RAX=0x1234, got 0x{rax:x}"
        print("Emulation successful!")

    except UcError as e:
        print(f"ERROR: {e}")

def x86_with_stack():
    """x86 emulation with stack operations"""

    # Assembly: push 0x1234; pop eax; ret
    X86_CODE = b"\x68\x34\x12\x00\x00\x58\xc3"

    ADDRESS = 0x1000000
    STACK_ADDRESS = 0x2000000
    STACK_SIZE = 1024 * 1024  # 1MB stack

    print("x86 emulation with stack")

    try:
        mu = Uc(UC_ARCH_X86, UC_MODE_32)

        # Map code memory
        mu.mem_map(ADDRESS, 2 * 1024 * 1024)

        # Map stack memory
        mu.mem_map(STACK_ADDRESS, STACK_SIZE)

        # Write code
        mu.mem_write(ADDRESS, X86_CODE)

        # Set up stack pointer
        mu.reg_write(UC_X86_REG_ESP, STACK_ADDRESS + STACK_SIZE - 4)

        print(f"Initial ESP: 0x{mu.reg_read(UC_X86_REG_ESP):x}")
        print(f"Initial EAX: 0x{mu.reg_read(UC_X86_REG_EAX):x}")

        # Emulate
        mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))

        # Check results
        eax = mu.reg_read(UC_X86_REG_EAX)
        esp = mu.reg_read(UC_X86_REG_ESP)

        print(f"Final EAX: 0x{eax:x}")
        print(f"Final ESP: 0x{esp:x}")

        assert eax == 0x1234, f"Expected EAX=0x1234, got 0x{eax:x}"
        print("Stack emulation successful!")

    except UcError as e:
        print(f"ERROR: {e}")

# Run examples
if __name__ == "__main__":
    basic_x86_emulation()
    print()
    x86_with_stack()

ARM Emulation

# ARM emulation examples
import unicorn
from unicorn import *
from unicorn.arm_const import *

def basic_arm_emulation():
    """Basic ARM emulation example"""

    # ARM machine code for: mov r0, #0x37; mov r1, #0x3; add r2, r1, r0
    ARM_CODE = b"\x37\x00\xa0\xe3\x03\x10\xa0\xe3\x01\x20\x81\xe0"

    ADDRESS = 0x10000

    print("Basic ARM emulation")

    try:
        # Initialize emulator in ARM mode
        mu = Uc(UC_ARCH_ARM, UC_MODE_ARM)

        # Map memory
        mu.mem_map(ADDRESS, 2 * 1024 * 1024)

        # Write machine code
        mu.mem_write(ADDRESS, ARM_CODE)

        # Initialize registers
        mu.reg_write(UC_ARM_REG_R0, 0x0)
        mu.reg_write(UC_ARM_REG_R1, 0x0)
        mu.reg_write(UC_ARM_REG_R2, 0x0)

        print(f"Initial R0: 0x{mu.reg_read(UC_ARM_REG_R0):x}")
        print(f"Initial R1: 0x{mu.reg_read(UC_ARM_REG_R1):x}")
        print(f"Initial R2: 0x{mu.reg_read(UC_ARM_REG_R2):x}")

        # Emulate
        mu.emu_start(ADDRESS, ADDRESS + len(ARM_CODE))

        # Read results
        r0 = mu.reg_read(UC_ARM_REG_R0)
        r1 = mu.reg_read(UC_ARM_REG_R1)
        r2 = mu.reg_read(UC_ARM_REG_R2)

        print(f"Final R0: 0x{r0:x}")
        print(f"Final R1: 0x{r1:x}")
        print(f"Final R2: 0x{r2:x}")

        assert r0 == 0x37 and r1 == 0x3 and r2 == 0x3a
        print("ARM emulation successful!")

    except UcError as e:
        print(f"ERROR: {e}")

def arm_thumb_emulation():
    """ARM Thumb mode emulation"""

    # Thumb machine code for: mov r0, #0x37; add r0, #0x3
    THUMB_CODE = b"\x37\x20\x03\x30"

    ADDRESS = 0x10000

    print("ARM Thumb emulation")

    try:
        # Initialize emulator in Thumb mode
        mu = Uc(UC_ARCH_ARM, UC_MODE_THUMB)

        # Map memory
        mu.mem_map(ADDRESS, 2 * 1024 * 1024)

        # Write machine code
        mu.mem_write(ADDRESS, THUMB_CODE)

        # Initialize registers
        mu.reg_write(UC_ARM_REG_R0, 0x0)

        print(f"Initial R0: 0x{mu.reg_read(UC_ARM_REG_R0):x}")

        # Emulate (address must be odd for Thumb mode)
        mu.emu_start(ADDRESS | 1, ADDRESS + len(THUMB_CODE))

        # Read result
        r0 = mu.reg_read(UC_ARM_REG_R0)
        print(f"Final R0: 0x{r0:x}")

        assert r0 == 0x3a  # 0x37 + 0x3
        print("Thumb emulation successful!")

    except UcError as e:
        print(f"ERROR: {e}")

def arm64_emulation():
    """ARM64 (AArch64) emulation"""

    # ARM64 machine code for: mov x0, #0x37; mov x1, #0x3; add x2, x1, x0
    ARM64_CODE = b"\xe0\x06\x80\xd2\xe1\x00\x80\xd2\x22\x00\x01\x8b"

    ADDRESS = 0x10000

    print("ARM64 emulation")

    try:
        # Initialize emulator in ARM64 mode
        mu = Uc(UC_ARCH_ARM64, UC_MODE_ARM)

        # Map memory
        mu.mem_map(ADDRESS, 2 * 1024 * 1024)

        # Write machine code
        mu.mem_write(ADDRESS, ARM64_CODE)

        # Initialize registers
        mu.reg_write(UC_ARM64_REG_X0, 0x0)
        mu.reg_write(UC_ARM64_REG_X1, 0x0)
        mu.reg_write(UC_ARM64_REG_X2, 0x0)

        print(f"Initial X0: 0x{mu.reg_read(UC_ARM64_REG_X0):x}")
        print(f"Initial X1: 0x{mu.reg_read(UC_ARM64_REG_X1):x}")
        print(f"Initial X2: 0x{mu.reg_read(UC_ARM64_REG_X2):x}")

        # Emulate
        mu.emu_start(ADDRESS, ADDRESS + len(ARM64_CODE))

        # Read results
        x0 = mu.reg_read(UC_ARM64_REG_X0)
        x1 = mu.reg_read(UC_ARM64_REG_X1)
        x2 = mu.reg_read(UC_ARM64_REG_X2)

        print(f"Final X0: 0x{x0:x}")
        print(f"Final X1: 0x{x1:x}")
        print(f"Final X2: 0x{x2:x}")

        assert x0 == 0x37 and x1 == 0x3 and x2 == 0x3a
        print("ARM64 emulation successful!")

    except UcError as e:
        print(f"ERROR: {e}")

# Run ARM examples
if __name__ == "__main__":
    basic_arm_emulation()
    print()
    arm_thumb_emulation()
    print()
    arm64_emulation()

Gestión de memoria

Cartografía y operaciones de memoria

# Memory management in Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *

class MemoryManager:
    def __init__(self, arch, mode):
        self.mu = Uc(arch, mode)
        self.mapped_regions = []

    def map_memory(self, address, size, perms=UC_PROT_ALL):
        """Map memory region with specified permissions"""

        try:
            self.mu.mem_map(address, size, perms)
            self.mapped_regions.append({
                'address': address,
                'size': size,
                'perms': perms
            })
            print(f"Mapped memory: 0x{address:x} - 0x{address + size:x} (size: {size}, perms: {perms})")

        except UcError as e:
            print(f"Failed to map memory at 0x{address:x}: {e}")

    def unmap_memory(self, address, size):
        """Unmap memory region"""

        try:
            self.mu.mem_unmap(address, size)
            # Remove from tracked regions
            self.mapped_regions = [r for r in self.mapped_regions 
                                 if not (r['address'] == address and r['size'] == size)]
            print(f"Unmapped memory: 0x{address:x} - 0x{address + size:x}")

        except UcError as e:
            print(f"Failed to unmap memory at 0x{address:x}: {e}")

    def protect_memory(self, address, size, perms):
        """Change memory protection"""

        try:
            self.mu.mem_protect(address, size, perms)
            print(f"Protected memory: 0x{address:x} - 0x{address + size:x} (perms: {perms})")

        except UcError as e:
            print(f"Failed to protect memory at 0x{address:x}: {e}")

    def read_memory(self, address, size):
        """Read memory content"""

        try:
            data = self.mu.mem_read(address, size)
            return data

        except UcError as e:
            print(f"Failed to read memory at 0x{address:x}: {e}")
            return None

    def write_memory(self, address, data):
        """Write data to memory"""

        try:
            self.mu.mem_write(address, data)
            print(f"Wrote {len(data)} bytes to 0x{address:x}")

        except UcError as e:
            print(f"Failed to write memory at 0x{address:x}: {e}")

    def dump_memory(self, address, size, format='hex'):
        """Dump memory content in various formats"""

        data = self.read_memory(address, size)
        if data is None:
            return

        print(f"Memory dump at 0x{address:x} ({size} bytes):")

        if format == 'hex':
            for i in range(0, len(data), 16):
                chunk = data[i:i+16]
                hex_str = ' '.join(f'{b:02x}' for b in chunk)
                ascii_str = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
| print(f"0x{address + i:08x}: {hex_str:<48} | {ascii_str} | ") |

        elif format == 'ascii':
            ascii_str = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in data)
            print(ascii_str)

        elif format == 'raw':
            print(data)

    def get_memory_regions(self):
        """Get list of mapped memory regions"""

        try:
            regions = self.mu.mem_regions()
            print("Mapped memory regions:")
            for region in regions:
                start, end, perms = region
                size = end - start + 1
                print(f"  0x{start:x} - 0x{end:x} (size: {size}, perms: {perms})")
            return regions

        except UcError as e:
            print(f"Failed to get memory regions: {e}")
            return []

def memory_management_example():
    """Comprehensive memory management example"""

    print("Memory Management Example")
    print("=" * 40)

    # Create memory manager
    mm = MemoryManager(UC_ARCH_X86, UC_MODE_64)

    # Map different memory regions
    CODE_BASE = 0x400000
    DATA_BASE = 0x600000
    STACK_BASE = 0x7fff0000
    HEAP_BASE = 0x800000

    # Map code section (read + execute)
    mm.map_memory(CODE_BASE, 0x1000, UC_PROT_READ | UC_PROT_EXEC)

    # Map data section (read + write)
    mm.map_memory(DATA_BASE, 0x1000, UC_PROT_READ | UC_PROT_WRITE)

    # Map stack (read + write)
    mm.map_memory(STACK_BASE, 0x1000, UC_PROT_READ | UC_PROT_WRITE)

    # Map heap (read + write)
    mm.map_memory(HEAP_BASE, 0x1000, UC_PROT_READ | UC_PROT_WRITE)

    # Write some data
    code_data = b"\x48\xc7\xc0\x37\x13\x00\x00\xc3"  # mov rax, 0x1337; ret
    mm.write_memory(CODE_BASE, code_data)

    data_content = b"Hello, Unicorn Engine!"
    mm.write_memory(DATA_BASE, data_content)

    stack_data = b"\x00" * 100 + b"STACK_CANARY"
    mm.write_memory(STACK_BASE + 0x900, stack_data)

    # Read and dump memory
    print("\nCode section:")
    mm.dump_memory(CODE_BASE, len(code_data))

    print("\nData section:")
    mm.dump_memory(DATA_BASE, len(data_content), format='ascii')

    print("\nStack canary:")
    mm.dump_memory(STACK_BASE + 0x900 + 100, 12, format='ascii')

    # Get memory regions
    print("\nMemory layout:")
    mm.get_memory_regions()

    # Test memory protection
    print("\nTesting memory protection:")
    try:
        # Try to write to code section (should fail)
        mm.write_memory(CODE_BASE, b"\x90\x90\x90\x90")
    except:
        print("Write to code section blocked (expected)")

    # Change protection to allow writing
    mm.protect_memory(CODE_BASE, 0x1000, UC_PROT_ALL)
    mm.write_memory(CODE_BASE, b"\x90\x90\x90\x90")  # Should succeed now

    return mm

def advanced_memory_operations():
    """Advanced memory operations and techniques"""

    print("Advanced Memory Operations")
    print("=" * 40)

    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Map memory with different alignments
    BASE_ADDR = 0x10000000

    # Map large memory region
    mu.mem_map(BASE_ADDR, 0x100000)  # 1MB

    # Create memory layout similar to real process
    segments = {
        'text': {'offset': 0x0000, 'size': 0x10000, 'perms': UC_PROT_READ | UC_PROT_EXEC},
        'rodata': {'offset': 0x10000, 'size': 0x5000, 'perms': UC_PROT_READ},
        'data': {'offset': 0x15000, 'size': 0x5000, 'perms': UC_PROT_READ | UC_PROT_WRITE},
        'bss': {'offset': 0x1a000, 'size': 0x5000, 'perms': UC_PROT_READ | UC_PROT_WRITE},
        'heap': {'offset': 0x20000, 'size': 0x80000, 'perms': UC_PROT_READ | UC_PROT_WRITE}
    }

    # Set up segments with different protections
    for name, seg in segments.items():
        addr = BASE_ADDR + seg['offset']
        mu.mem_protect(addr, seg['size'], seg['perms'])
        print(f"Segment {name}: 0x{addr:x} - 0x{addr + seg['size']:x}")

    # Write segment-specific data
    # Text segment - executable code
    text_addr = BASE_ADDR + segments['text']['offset']
    code = b"\x48\x31\xc0\x48\xff\xc0\xc3"  # xor rax, rax; inc rax; ret
    mu.mem_write(text_addr, code)

    # Read-only data segment
    rodata_addr = BASE_ADDR + segments['rodata']['offset']
    rodata = b"This is read-only data\x00"
    mu.mem_write(rodata_addr, rodata)

    # Data segment - initialized global variables
    data_addr = BASE_ADDR + segments['data']['offset']
    global_vars = b"\x37\x13\x00\x00\x00\x00\x00\x00"  # global_var = 0x1337
    mu.mem_write(data_addr, global_vars)

    # BSS segment - uninitialized data (zero-filled)
    bss_addr = BASE_ADDR + segments['bss']['offset']
    mu.mem_write(bss_addr, b"\x00" * 100)

    # Simulate heap allocation
    heap_addr = BASE_ADDR + segments['heap']['offset']
    heap_data = b"Heap allocated data\x00"
    mu.mem_write(heap_addr, heap_data)

    # Test memory access patterns
    print("\nTesting memory access:")

    # Read from each segment
    for name, seg in segments.items():
        addr = BASE_ADDR + seg['offset']
        try:
            data = mu.mem_read(addr, 16)
            print(f"{name} segment: {data[:16].hex()}")
        except UcError as e:
            print(f"Failed to read {name} segment: {e}")

    # Memory search functionality
    def find_pattern(mu, start_addr, size, pattern):
        """Search for byte pattern in memory"""

        try:
            data = mu.mem_read(start_addr, size)
            offset = data.find(pattern)
            if offset != -1:
                return start_addr + offset
            return None
        except UcError:
            return None

    # Search for patterns
    pattern = b"read-only"
    found_addr = find_pattern(mu, BASE_ADDR, 0x100000, pattern)
    if found_addr:
        print(f"Found pattern at: 0x{found_addr:x}")

    return mu

# Run memory management examples
if __name__ == "__main__":
    mm = memory_management_example()
    print("\n" + "=" * 60 + "\n")
    mu = advanced_memory_operations()

Ganchos y retrocesos

Instrucciones y ganchos de memoria

# Hooks and callbacks in Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *

class UnicornHooks:
    def __init__(self, arch, mode):
        self.mu = Uc(arch, mode)
        self.instruction_count = 0
        self.memory_accesses = []
        self.function_calls = []
        self.hooks = []

    def setup_instruction_hook(self):
        """Set up instruction execution hook"""

        def hook_code(uc, address, size, user_data):
            """Callback for instruction execution"""

            self.instruction_count += 1

            # Read instruction bytes
            try:
                instruction_bytes = uc.mem_read(address, size)
                print(f"Instruction #{self.instruction_count}: 0x{address:x} - {instruction_bytes.hex()}")

                # Disassemble if capstone is available
                try:
                    import capstone
                    if uc.arch == UC_ARCH_X86:
                        if uc.mode == UC_MODE_64:
                            md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
                        else:
                            md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32)

                        for insn in md.disasm(instruction_bytes, address):
                            print(f"  {insn.mnemonic} {insn.op_str}")
                            break
                except ImportError:
                    pass

            except UcError as e:
                print(f"Failed to read instruction at 0x{address:x}: {e}")

        # Add hook
        hook_id = self.mu.hook_add(UC_HOOK_CODE, hook_code)
        self.hooks.append(hook_id)
        return hook_id

    def setup_memory_hooks(self):
        """Set up memory access hooks"""

        def hook_mem_read(uc, access, address, size, value, user_data):
            """Callback for memory read"""

            self.memory_accesses.append({
                'type': 'read',
                'address': address,
                'size': size,
                'value': value
            })
            print(f"Memory READ: 0x{address:x} (size: {size}, value: 0x{value:x})")

        def hook_mem_write(uc, access, address, size, value, user_data):
            """Callback for memory write"""

            self.memory_accesses.append({
                'type': 'write',
                'address': address,
                'size': size,
                'value': value
            })
            print(f"Memory WRITE: 0x{address:x} (size: {size}, value: 0x{value:x})")

        def hook_mem_invalid(uc, access, address, size, value, user_data):
            """Callback for invalid memory access"""

            print(f"Invalid memory access: 0x{address:x} (access: {access})")

            # Try to handle invalid access
            if access == UC_MEM_WRITE_UNMAPPED or access == UC_MEM_READ_UNMAPPED:
                print(f"Mapping memory at 0x{address:x}")
                # Map memory page
                page_size = 0x1000
                page_start = address & ~(page_size - 1)
                uc.mem_map(page_start, page_size)
                return True  # Continue execution

            return False  # Stop execution

        # Add hooks
        read_hook = self.mu.hook_add(UC_HOOK_MEM_READ, hook_mem_read)
        write_hook = self.mu.hook_add(UC_HOOK_MEM_WRITE, hook_mem_write)
        invalid_hook = self.mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, hook_mem_invalid)

        self.hooks.extend([read_hook, write_hook, invalid_hook])
        return read_hook, write_hook, invalid_hook

    def setup_interrupt_hook(self):
        """Set up interrupt hook"""

        def hook_interrupt(uc, intno, user_data):
            """Callback for interrupts"""

            print(f"Interrupt: 0x{intno:x}")

            # Handle specific interrupts
            if intno == 0x80:  # Linux system call
                # Get system call number from EAX
                syscall_num = uc.reg_read(UC_X86_REG_EAX)
                print(f"  Linux syscall: {syscall_num}")

                # Handle specific syscalls
                if syscall_num == 1:  # sys_exit
                    exit_code = uc.reg_read(UC_X86_REG_EBX)
                    print(f"  Exit with code: {exit_code}")
                    uc.emu_stop()

                elif syscall_num == 4:  # sys_write
                    fd = uc.reg_read(UC_X86_REG_EBX)
                    buf_addr = uc.reg_read(UC_X86_REG_ECX)
                    count = uc.reg_read(UC_X86_REG_EDX)

                    try:
                        data = uc.mem_read(buf_addr, count)
                        print(f"  Write to fd {fd}: {data.decode('utf-8', errors='ignore')}")
                        # Set return value (number of bytes written)
                        uc.reg_write(UC_X86_REG_EAX, count)
                    except:
                        uc.reg_write(UC_X86_REG_EAX, -1)  # Error

        # Add hook
        hook_id = self.mu.hook_add(UC_HOOK_INTR, hook_interrupt)
        self.hooks.append(hook_id)
        return hook_id

    def setup_block_hook(self):
        """Set up basic block hook"""

        def hook_block(uc, address, size, user_data):
            """Callback for basic block execution"""

            print(f"Basic block: 0x{address:x} - 0x{address + size:x}")

        # Add hook
        hook_id = self.mu.hook_add(UC_HOOK_BLOCK, hook_block)
        self.hooks.append(hook_id)
        return hook_id

    def setup_conditional_hooks(self):
        """Set up conditional hooks for specific addresses"""

        def hook_specific_address(uc, address, size, user_data):
            """Hook for specific address"""

            target_addr = user_data['target_address']
            if address == target_addr:
                print(f"Reached target address: 0x{address:x}")

                # Perform specific action
                action = user_data.get('action')
                if action == 'dump_registers':
                    self.dump_registers()
                elif action == 'modify_register':
                    reg = user_data.get('register', UC_X86_REG_EAX)
                    value = user_data.get('value', 0)
                    uc.reg_write(reg, value)
                    print(f"Modified register to: 0x{value:x}")

        return hook_specific_address

    def dump_registers(self):
        """Dump current register state"""

        if self.mu.arch == UC_ARCH_X86:
            if self.mu.mode == UC_MODE_64:
                registers = [
                    ('RAX', UC_X86_REG_RAX), ('RBX', UC_X86_REG_RBX),
                    ('RCX', UC_X86_REG_RCX), ('RDX', UC_X86_REG_RDX),
                    ('RSI', UC_X86_REG_RSI), ('RDI', UC_X86_REG_RDI),
                    ('RSP', UC_X86_REG_RSP), ('RBP', UC_X86_REG_RBP),
                    ('RIP', UC_X86_REG_RIP)
                ]
            else:
                registers = [
                    ('EAX', UC_X86_REG_EAX), ('EBX', UC_X86_REG_EBX),
                    ('ECX', UC_X86_REG_ECX), ('EDX', UC_X86_REG_EDX),
                    ('ESI', UC_X86_REG_ESI), ('EDI', UC_X86_REG_EDI),
                    ('ESP', UC_X86_REG_ESP), ('EBP', UC_X86_REG_EBP),
                    ('EIP', UC_X86_REG_EIP)
                ]

            print("Register dump:")
            for name, reg_id in registers:
                value = self.mu.reg_read(reg_id)
                print(f"  {name}: 0x{value:x}")

    def remove_all_hooks(self):
        """Remove all registered hooks"""

        for hook_id in self.hooks:
            try:
                self.mu.hook_del(hook_id)
            except UcError:
                pass

        self.hooks.clear()
        print("All hooks removed")

def hooks_example():
    """Comprehensive hooks example"""

    print("Unicorn Engine Hooks Example")
    print("=" * 40)

    # Create hooks manager
    hooks = UnicornHooks(UC_ARCH_X86, UC_MODE_64)

    # Set up various hooks
    hooks.setup_instruction_hook()
    hooks.setup_memory_hooks()
    hooks.setup_block_hook()

    # Map memory and write code
    ADDRESS = 0x1000000
    hooks.mu.mem_map(ADDRESS, 2 * 1024 * 1024)

    # x86-64 code: mov rax, 0x1234; mov [rax], rbx; mov rcx, [rax]; ret
    X86_CODE = b"\x48\xc7\xc0\x34\x12\x00\x00\x48\x89\x18\x48\x8b\x08\xc3"

    hooks.mu.mem_write(ADDRESS, X86_CODE)

    # Initialize registers
    hooks.mu.reg_write(UC_X86_REG_RBX, 0xdeadbeef)

    print("Starting emulation with hooks...")

    try:
        # Emulate with hooks
        hooks.mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))

        print(f"\nEmulation completed!")
        print(f"Instructions executed: {hooks.instruction_count}")
        print(f"Memory accesses: {len(hooks.memory_accesses)}")

        # Dump final state
        hooks.dump_registers()

    except UcError as e:
        print(f"Emulation error: {e}")

    finally:
        hooks.remove_all_hooks()

    return hooks

def advanced_hooks_example():
    """Advanced hooks with conditional logic"""

    print("Advanced Hooks Example")
    print("=" * 40)

    mu = Uc(UC_ARCH_X86, UC_MODE_32)

    # Map memory
    ADDRESS = 0x1000000
    mu.mem_map(ADDRESS, 2 * 1024 * 1024)

    # Code with loop: mov eax, 0; loop: inc eax; cmp eax, 10; jl loop; ret
    X86_CODE = b"\xb8\x00\x00\x00\x00\x40\x83\xf8\x0a\x7c\xfb\xc3"

    mu.mem_write(ADDRESS, X86_CODE)

    # Hook to count loop iterations
    loop_count = {'count': 0}

    def hook_loop_counter(uc, address, size, user_data):
        """Count loop iterations"""

        # Check if we're at the loop start (inc eax instruction)
        if address == ADDRESS + 5:  # Address of 'inc eax'
            loop_count['count'] += 1
            eax = uc.reg_read(UC_X86_REG_EAX)
            print(f"Loop iteration {loop_count['count']}: EAX = {eax}")

            # Limit iterations to prevent infinite loop
            if loop_count['count'] > 20:
                print("Loop limit reached, stopping emulation")
                uc.emu_stop()

    # Hook to detect function calls
    def hook_function_calls(uc, address, size, user_data):
        """Detect and log function calls"""

        try:
            # Read instruction
            insn_bytes = uc.mem_read(address, size)

            # Check for call instruction (0xe8 for relative call)
            if insn_bytes[0] == 0xe8:
                # Calculate call target
                if len(insn_bytes) >= 5:
                    offset = int.from_bytes(insn_bytes[1:5], 'little', signed=True)
                    target = address + size + offset
                    print(f"Function call from 0x{address:x} to 0x{target:x}")
        except:
            pass

    # Add hooks
    loop_hook = mu.hook_add(UC_HOOK_CODE, hook_loop_counter)
    call_hook = mu.hook_add(UC_HOOK_CODE, hook_function_calls)

    print("Starting emulation with advanced hooks...")

    try:
        mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))

        print(f"Emulation completed!")
        print(f"Total loop iterations: {loop_count['count']}")

        # Check final register state
        eax = mu.reg_read(UC_X86_REG_EAX)
        print(f"Final EAX value: {eax}")

    except UcError as e:
        print(f"Emulation error: {e}")

    finally:
        mu.hook_del(loop_hook)
        mu.hook_del(call_hook)

    return mu

# Run hooks examples
if __name__ == "__main__":
    hooks = hooks_example()
    print("\n" + "=" * 60 + "\n")
    mu = advanced_hooks_example()

Técnicas avanzadas de emulación

Emulación de varios hilos

# Advanced emulation techniques with Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *
import threading
import time
import queue

class UnicornEmulator:
    def __init__(self, arch, mode):
        self.arch = arch
        self.mode = mode
        self.emulation_results = {}
        self.shared_memory = {}

    def create_emulator_instance(self, instance_id):
        """Create a new emulator instance"""

        mu = Uc(self.arch, self.mode)

        # Set up basic memory layout
        CODE_BASE = 0x400000
        DATA_BASE = 0x600000
        STACK_BASE = 0x7fff0000

        mu.mem_map(CODE_BASE, 0x10000)    # Code section
        mu.mem_map(DATA_BASE, 0x10000)    # Data section
        mu.mem_map(STACK_BASE, 0x10000)   # Stack section

        # Set up stack pointer
        if self.arch == UC_ARCH_X86:
            if self.mode == UC_MODE_64:
                mu.reg_write(UC_X86_REG_RSP, STACK_BASE + 0x8000)
            else:
                mu.reg_write(UC_X86_REG_ESP, STACK_BASE + 0x8000)

        return mu, CODE_BASE, DATA_BASE, STACK_BASE

    def emulate_code_snippet(self, instance_id, code, initial_state=None):
        """Emulate a code snippet in separate thread"""

        def emulation_thread():
            try:
                mu, code_base, data_base, stack_base = self.create_emulator_instance(instance_id)

                # Write code to memory
                mu.mem_write(code_base, code)

                # Set initial register state if provided
                if initial_state:
                    for reg, value in initial_state.items():
                        mu.reg_write(reg, value)

                # Set up hooks for this instance
                instruction_count = {'count': 0}

                def hook_code(uc, address, size, user_data):
                    instruction_count['count'] += 1
                    if instruction_count['count'] > 10000:  # Prevent infinite loops
                        uc.emu_stop()

                hook_id = mu.hook_add(UC_HOOK_CODE, hook_code)

                # Start emulation
                start_time = time.time()
                mu.emu_start(code_base, code_base + len(code))
                end_time = time.time()

                # Collect results
                result = {
                    'instance_id': instance_id,
                    'success': True,
                    'execution_time': end_time - start_time,
                    'instruction_count': instruction_count['count'],
                    'final_registers': {},
                    'memory_dumps': {}
                }

                # Read final register state
                if self.arch == UC_ARCH_X86:
                    if self.mode == UC_MODE_64:
                        registers = [UC_X86_REG_RAX, UC_X86_REG_RBX, UC_X86_REG_RCX, UC_X86_REG_RDX]
                    else:
                        registers = [UC_X86_REG_EAX, UC_X86_REG_EBX, UC_X86_REG_ECX, UC_X86_REG_EDX]

                    for reg in registers:
                        result['final_registers'][reg] = mu.reg_read(reg)

                # Store result
                self.emulation_results[instance_id] = result

                # Clean up
                mu.hook_del(hook_id)

            except UcError as e:
                self.emulation_results[instance_id] = {
                    'instance_id': instance_id,
                    'success': False,
                    'error': str(e)
                }

        # Start emulation in separate thread
        thread = threading.Thread(target=emulation_thread)
        thread.start()
        return thread

    def parallel_emulation(self, code_variants, max_workers=4):
        """Run multiple code variants in parallel"""

        print(f"Starting parallel emulation with {len(code_variants)} variants")

        threads = []

        # Start emulation threads
        for i, (code, initial_state) in enumerate(code_variants):
            if len(threads) >= max_workers:
                # Wait for a thread to complete
                threads[0].join()
                threads.pop(0)

            thread = self.emulate_code_snippet(f"variant_{i}", code, initial_state)
            threads.append(thread)

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

        print(f"Parallel emulation completed. Results: {len(self.emulation_results)}")
        return self.emulation_results

    def compare_emulation_results(self):
        """Compare results from different emulation instances"""

        if len(self.emulation_results) < 2:
            print("Need at least 2 results to compare")
            return

        print("Emulation Results Comparison:")
        print("=" * 50)

        for instance_id, result in self.emulation_results.items():
            if result['success']:
                print(f"\nInstance: {instance_id}")
                print(f"  Execution time: {result['execution_time']:.6f}s")
                print(f"  Instructions: {result['instruction_count']}")
                print(f"  Final registers: {result['final_registers']}")
            else:
                print(f"\nInstance: {instance_id} - FAILED")
                print(f"  Error: {result['error']}")

def fuzzing_with_unicorn():
    """Use Unicorn for fuzzing and input generation"""

    print("Fuzzing with Unicorn Engine")
    print("=" * 40)

    emulator = UnicornEmulator(UC_ARCH_X86, UC_MODE_64)

    # Target function to fuzz (simple buffer operation)
    # mov rdi, input_buffer; mov rsi, input_size; call process_buffer
    target_code = b"\x48\xc7\xc7\x00\x20\x60\x00\x48\xc7\xc6\x10\x00\x00\x00\xe8\x10\x00\x00\x00\xc3"

    # Process buffer function (simplified)
    # Check if input_size > 16, if so, trigger vulnerability
    process_buffer_code = b"\x48\x83\xfe\x10\x7f\x05\x48\x31\xc0\xc3\xcc"  # cmp rsi, 16; jg vuln; xor rax,rax; ret; int3

    # Generate fuzzing inputs
    fuzzing_inputs = []

    for size in [8, 16, 24, 32, 64, 128]:
        for pattern in [0x41, 0x00, 0xff, 0x90]:
            input_data = bytes([pattern] * size)

            # Create code variant with this input
            code_variant = target_code + process_buffer_code

            initial_state = {
                UC_X86_REG_RDI: 0x602000,  # Input buffer address
                UC_X86_REG_RSI: size       # Input size
            }

            fuzzing_inputs.append((code_variant, initial_state))

    # Run fuzzing
    results = emulator.parallel_emulation(fuzzing_inputs, max_workers=8)

    # Analyze results for crashes/vulnerabilities
    vulnerabilities = []

    for instance_id, result in results.items():
        if not result['success'] and 'int3' in result.get('error', '').lower():
            vulnerabilities.append(instance_id)

    print(f"\nFuzzing completed. Found {len(vulnerabilities)} potential vulnerabilities:")
    for vuln in vulnerabilities:
        print(f"  {vuln}")

    return emulator, vulnerabilities

def code_coverage_analysis():
    """Implement code coverage analysis with Unicorn"""

    print("Code Coverage Analysis")
    print("=" * 40)

    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Map memory
    CODE_BASE = 0x400000
    mu.mem_map(CODE_BASE, 0x10000)

    # Complex code with multiple paths
    # Function with conditional branches
    complex_code = b"""
    \x48\x83\xec\x10        # sub rsp, 16
    \x48\x89\x7c\x24\x08    # mov [rsp+8], rdi
    \x48\x83\xff\x0a        # cmp rdi, 10
    \x7c\x0a                # jl less_than_10
    \x48\xc7\xc0\x01\x00\x00\x00  # mov rax, 1
    \xeb\x08                # jmp end
    # less_than_10:
    \x48\xc7\xc0\x00\x00\x00\x00  # mov rax, 0
    # end:
    \x48\x83\xc4\x10        # add rsp, 16
    \xc3                    # ret
    """

    # Remove whitespace and comments for actual bytes
    actual_code = b"\x48\x83\xec\x10\x48\x89\x7c\x24\x08\x48\x83\xff\x0a\x7c\x0a\x48\xc7\xc0\x01\x00\x00\x00\xeb\x08\x48\xc7\xc0\x00\x00\x00\x00\x48\x83\xc4\x10\xc3"

    mu.mem_write(CODE_BASE, actual_code)

    # Coverage tracking
    coverage_data = {
        'executed_addresses': set(),
        'basic_blocks': set(),
        'branch_coverage': {}
    }

    def hook_coverage(uc, address, size, user_data):
        """Track code coverage"""

        coverage_data['executed_addresses'].add(address)

        # Track basic blocks (simplified)
        block_start = address
        coverage_data['basic_blocks'].add(block_start)

        # Track branches
        try:
            insn_bytes = uc.mem_read(address, size)
            if size >= 2 and insn_bytes[0] == 0x7c:  # jl instruction
                # This is a conditional branch
                branch_addr = address
                taken = False  # We'd need to check if branch was taken
                coverage_data['branch_coverage'][branch_addr] = taken
        except:
            pass

    hook_id = mu.hook_add(UC_HOOK_CODE, hook_coverage)

    # Test different inputs to achieve different coverage
    test_inputs = [5, 15, 0, 10, -1, 100]

    for test_input in test_inputs:
        print(f"Testing input: {test_input}")

        # Reset coverage for this run
        run_coverage = set()

        def hook_run_coverage(uc, address, size, user_data):
            run_coverage.add(address)

        run_hook = mu.hook_add(UC_HOOK_CODE, hook_run_coverage)

        # Set input and run
        mu.reg_write(UC_X86_REG_RDI, test_input)
        mu.reg_write(UC_X86_REG_RSP, 0x7fff8000)

        try:
            mu.emu_start(CODE_BASE, CODE_BASE + len(actual_code))
            result = mu.reg_read(UC_X86_REG_RAX)
            print(f"  Result: {result}, Coverage: {len(run_coverage)} addresses")
        except UcError as e:
            print(f"  Error: {e}")

        mu.hook_del(run_hook)

    # Print overall coverage statistics
    print(f"\nOverall Coverage Statistics:")
    print(f"  Total addresses executed: {len(coverage_data['executed_addresses'])}")
    print(f"  Basic blocks covered: {len(coverage_data['basic_blocks'])}")
    print(f"  Branches tracked: {len(coverage_data['branch_coverage'])}")

    mu.hook_del(hook_id)
    return coverage_data

def performance_profiling():
    """Performance profiling with Unicorn"""

    print("Performance Profiling")
    print("=" * 40)

    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Map memory
    CODE_BASE = 0x400000
    mu.mem_map(CODE_BASE, 0x10000)

    # Code with loop for profiling
    # Simple loop that counts from 0 to N
    loop_code = b"\x48\x31\xc0\x48\x83\xf8\x64\x7d\x04\x48\xff\xc0\xeb\xf6\xc3"  # xor rax,rax; loop: cmp rax,100; jge end; inc rax; jmp loop; end: ret

    mu.mem_write(CODE_BASE, loop_code)

    # Profiling data
    profile_data = {
        'instruction_counts': {},
        'execution_times': {},
        'hotspots': []
    }

    def hook_profiler(uc, address, size, user_data):
        """Profiling hook"""

        # Count instructions at each address
        if address not in profile_data['instruction_counts']:
            profile_data['instruction_counts'][address] = 0
        profile_data['instruction_counts'][address] += 1

        # Track execution time (simplified)
        current_time = time.time()
        if address not in profile_data['execution_times']:
            profile_data['execution_times'][address] = []
        profile_data['execution_times'][address].append(current_time)

    hook_id = mu.hook_add(UC_HOOK_CODE, hook_profiler)

    # Run profiling
    start_time = time.time()

    try:
        mu.emu_start(CODE_BASE, CODE_BASE + len(loop_code))
        end_time = time.time()

        print(f"Execution completed in {end_time - start_time:.6f} seconds")

        # Analyze profiling data
        print("\nInstruction Execution Counts:")
        sorted_counts = sorted(profile_data['instruction_counts'].items(), 
                             key=lambda x: x[1], reverse=True)

        for addr, count in sorted_counts[:5]:  # Top 5 hotspots
            print(f"  0x{addr:x}: {count} executions")
            profile_data['hotspots'].append((addr, count))

        total_instructions = sum(profile_data['instruction_counts'].values())
        print(f"\nTotal instructions executed: {total_instructions}")

    except UcError as e:
        print(f"Profiling error: {e}")

    mu.hook_del(hook_id)
    return profile_data

# Run advanced emulation examples
if __name__ == "__main__":
    # Multi-threaded emulation
    emulator, vulns = fuzzing_with_unicorn()
    emulator.compare_emulation_results()

    print("\n" + "=" * 60 + "\n")

    # Code coverage analysis
    coverage = code_coverage_analysis()

    print("\n" + "=" * 60 + "\n")

    # Performance profiling
    profile = performance_profiling()

Integración con otras herramientas

Qiling Framework Integration

# Qiling Framework integration with Unicorn Engine
# Qiling provides higher-level emulation with OS API support

try:
    from qiling import Qiling
    from qiling.const import QL_VERBOSE
    QILING_AVAILABLE = True
except ImportError:
    QILING_AVAILABLE = False
    print("Qiling not available. Install with: pip install qiling")

import unicorn
from unicorn import *
from unicorn.x86_const import *
import os
import tempfile

class UnicornQilingBridge:
    """Bridge between Unicorn Engine and Qiling Framework"""

    def __init__(self):
        self.unicorn_instances = {}
        self.qiling_instances = {}

    def create_qiling_emulator(self, binary_path, rootfs_path, arch="x8664", ostype="linux"):
        """Create Qiling emulator instance"""

        if not QILING_AVAILABLE:
            raise ImportError("Qiling framework not available")

        try:
            ql = Qiling([binary_path], rootfs_path, verbose=QL_VERBOSE.DEFAULT)

            # Set up hooks for integration
            self.setup_qiling_hooks(ql)

            return ql

        except Exception as e:
            print(f"Failed to create Qiling emulator: {e}")
            return None

    def setup_qiling_hooks(self, ql):
        """Set up hooks for Qiling emulator"""

        def hook_syscall(ql, syscall_num, *args):
            """Hook system calls"""
            print(f"Syscall: {syscall_num} with args: {args}")

        def hook_api_call(ql, api_name, *args):
            """Hook API calls"""
            print(f"API call: {api_name} with args: {args}")

        # Add hooks
        ql.hook_syscall(hook_syscall)

    def unicorn_to_qiling_state(self, mu, ql):
        """Transfer state from Unicorn to Qiling"""

        # Transfer register state
        if mu.arch == UC_ARCH_X86:
            if mu.mode == UC_MODE_64:
                registers = [
                    (UC_X86_REG_RAX, 'rax'), (UC_X86_REG_RBX, 'rbx'),
                    (UC_X86_REG_RCX, 'rcx'), (UC_X86_REG_RDX, 'rdx'),
                    (UC_X86_REG_RSI, 'rsi'), (UC_X86_REG_RDI, 'rdi'),
                    (UC_X86_REG_RSP, 'rsp'), (UC_X86_REG_RBP, 'rbp')
                ]

                for uc_reg, ql_reg in registers:
                    value = mu.reg_read(uc_reg)
                    setattr(ql.reg, ql_reg, value)

        # Transfer memory state (simplified)
        try:
            regions = mu.mem_regions()
            for start, end, perms in regions:
                size = end - start + 1
                data = mu.mem_read(start, size)
                ql.mem.write(start, data)
        except:
            pass

    def qiling_to_unicorn_state(self, ql, mu):
        """Transfer state from Qiling to Unicorn"""

        # Transfer register state
        if mu.arch == UC_ARCH_X86:
            if mu.mode == UC_MODE_64:
                registers = [
                    ('rax', UC_X86_REG_RAX), ('rbx', UC_X86_REG_RBX),
                    ('rcx', UC_X86_REG_RCX), ('rdx', UC_X86_REG_RDX),
                    ('rsi', UC_X86_REG_RSI), ('rdi', UC_X86_REG_RDI),
                    ('rsp', UC_X86_REG_RSP), ('rbp', UC_X86_REG_RBP)
                ]

                for ql_reg, uc_reg in registers:
                    value = getattr(ql.reg, ql_reg)
                    mu.reg_write(uc_reg, value)

def create_test_binary():
    """Create a simple test binary for emulation"""

    # Simple C program that we'll compile
    c_code = """
    #include <stdio.h>
    #include <stdlib.h>

    int main(int argc, char *argv[]) {
        printf("Hello from test binary!\\n");

        if (argc > 1) {
            int value = atoi(argv[1]);
            printf("Argument value: %d\\n", value);

            if (value > 100) {
                printf("Large value detected!\\n");
                return 1;
            }
        }

        return 0;
    }
    """

    # Create temporary files
    with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
        f.write(c_code)
        c_file = f.name

    binary_file = c_file.replace('.c', '')

    # Compile the program
    compile_cmd = f"gcc -o {binary_file} {c_file}"
    result = os.system(compile_cmd)

    if result == 0:
        print(f"Test binary created: {binary_file}")
        return binary_file
    else:
        print("Failed to compile test binary")
        return None

def unicorn_qiling_comparison():
    """Compare Unicorn and Qiling emulation approaches"""

    print("Unicorn vs Qiling Comparison")
    print("=" * 40)

    # Create test binary
    binary_path = create_test_binary()
    if not binary_path:
        return

    try:
        # Unicorn emulation (low-level)
        print("\n1. Unicorn Engine Emulation:")
        unicorn_emulation_example(binary_path)

        # Qiling emulation (high-level)
        if QILING_AVAILABLE:
            print("\n2. Qiling Framework Emulation:")
            qiling_emulation_example(binary_path)
        else:
            print("\n2. Qiling Framework: Not available")

    finally:
        # Clean up
        try:
            os.unlink(binary_path)
            os.unlink(binary_path.replace('', '.c'))
        except:
            pass

def unicorn_emulation_example(binary_path):
    """Low-level emulation with Unicorn"""

    # Read binary file
    with open(binary_path, 'rb') as f:
        binary_data = f.read()

    # Simple emulation (this is very simplified)
    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Map memory for binary
    BASE_ADDR = 0x400000
    mu.mem_map(BASE_ADDR, len(binary_data) + 0x1000)

    # Write binary to memory
    mu.mem_write(BASE_ADDR, binary_data)

    print(f"  Loaded {len(binary_data)} bytes at 0x{BASE_ADDR:x}")
    print("  Note: This is a simplified example - real ELF loading is complex")

def qiling_emulation_example(binary_path):
    """High-level emulation with Qiling"""

    if not QILING_AVAILABLE:
        return

    # Qiling requires a rootfs - create minimal one
    rootfs_path = "/tmp/qiling_rootfs"
    os.makedirs(rootfs_path, exist_ok=True)

    # Create minimal lib directory
    lib_dir = os.path.join(rootfs_path, "lib")
    os.makedirs(lib_dir, exist_ok=True)

    try:
        # Create Qiling instance
        ql = Qiling([binary_path], rootfs_path, verbose=QL_VERBOSE.DEFAULT)

        # Set up hooks
        def hook_printf(ql):
            """Hook printf function"""
            # Get format string from first argument
            format_addr = ql.reg.rdi
            format_str = ql.mem.string(format_addr)
            print(f"  printf: {format_str}")

        # Hook printf
        ql.set_api("printf", hook_printf)

        # Run emulation
        print("  Starting Qiling emulation...")
        ql.run()

        print(f"  Emulation completed with exit code: {ql.exit_code}")

    except Exception as e:
        print(f"  Qiling emulation failed: {e}")

def advanced_integration_example():
    """Advanced integration between different emulation approaches"""

    print("Advanced Integration Example")
    print("=" * 40)

    # Create bridge
    bridge = UnicornQilingBridge()

    # Example: Use Unicorn for low-level analysis, then switch to Qiling for high-level
    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Map memory and set up basic environment
    CODE_BASE = 0x400000
    mu.mem_map(CODE_BASE, 0x10000)

    # Simple code that makes a system call
    # mov rax, 1 (sys_write); mov rdi, 1 (stdout); mov rsi, msg; mov rdx, len; syscall
    syscall_code = b"\x48\xc7\xc0\x01\x00\x00\x00\x48\xc7\xc7\x01\x00\x00\x00\x48\xc7\xc6\x00\x50\x40\x00\x48\xc7\xc2\x0d\x00\x00\x00\x0f\x05"

    mu.mem_write(CODE_BASE, syscall_code)

    # Write message to memory
    message = b"Hello, World!\n"
    mu.mem_write(0x405000, message)

    # Hook system calls in Unicorn
    def hook_syscall(uc, user_data):
        """Handle system calls in Unicorn"""

        syscall_num = uc.reg_read(UC_X86_REG_RAX)

        if syscall_num == 1:  # sys_write
            fd = uc.reg_read(UC_X86_REG_RDI)
            buf_addr = uc.reg_read(UC_X86_REG_RSI)
            count = uc.reg_read(UC_X86_REG_RDX)

            try:
                data = uc.mem_read(buf_addr, count)
                print(f"  Unicorn syscall write: {data.decode()}")
                uc.reg_write(UC_X86_REG_RAX, count)  # Return bytes written
            except:
                uc.reg_write(UC_X86_REG_RAX, -1)  # Error

    # Add interrupt hook for syscalls
    mu.hook_add(UC_HOOK_INTR, hook_syscall)

    print("Running low-level analysis with Unicorn...")

    try:
        mu.emu_start(CODE_BASE, CODE_BASE + len(syscall_code))
        print("  Unicorn emulation completed")

        # At this point, we could transfer state to Qiling for higher-level analysis
        if QILING_AVAILABLE:
            print("  (State could be transferred to Qiling for high-level analysis)")

    except UcError as e:
        print(f"  Unicorn emulation error: {e}")

# Integration with other reverse engineering tools
def unicorn_with_capstone():
    """Integration with Capstone disassembler"""

    try:
        import capstone
        CAPSTONE_AVAILABLE = True
    except ImportError:
        CAPSTONE_AVAILABLE = False
        print("Capstone not available. Install with: pip install capstone")
        return

    print("Unicorn + Capstone Integration")
    print("=" * 40)

    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Map memory
    CODE_BASE = 0x400000
    mu.mem_map(CODE_BASE, 0x10000)

    # Complex code to disassemble and emulate
    complex_code = b"\x48\x89\xe5\x48\x83\xec\x20\x48\x89\x7d\xe8\x48\x89\x75\xe0\x48\x8b\x45\xe8\x48\x8b\x55\xe0\x48\x01\xd0\x48\x89\x45\xf8\x48\x8b\x45\xf8\x48\x83\xc4\x20\x5d\xc3"

    mu.mem_write(CODE_BASE, complex_code)

    # Set up Capstone disassembler
    md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
    md.detail = True

    # Disassemble and emulate with detailed analysis
    def hook_instruction_analysis(uc, address, size, user_data):
        """Detailed instruction analysis with Capstone"""

        # Read instruction bytes
        insn_bytes = uc.mem_read(address, size)

        # Disassemble
        for insn in md.disasm(insn_bytes, address):
            print(f"0x{insn.address:x}: {insn.mnemonic} {insn.op_str}")

            # Analyze operands
            if insn.operands:
                for op in insn.operands:
                    if op.type == capstone.x86.X86_OP_REG:
                        reg_name = insn.reg_name(op.reg)
                        reg_value = uc.reg_read(getattr(capstone.x86, f'X86_REG_{reg_name.upper()}', 0))
                        print(f"    {reg_name}: 0x{reg_value:x}")
                    elif op.type == capstone.x86.X86_OP_MEM:
                        print(f"    Memory operand: base={op.mem.base}, index={op.mem.index}, disp=0x{op.mem.disp:x}")
            break

    # Add hook
    hook_id = mu.hook_add(UC_HOOK_CODE, hook_instruction_analysis)

    # Set up registers
    mu.reg_write(UC_X86_REG_RDI, 0x1234)
    mu.reg_write(UC_X86_REG_RSI, 0x5678)
    mu.reg_write(UC_X86_REG_RSP, 0x7fff8000)

    print("Starting emulation with Capstone analysis...")

    try:
        mu.emu_start(CODE_BASE, CODE_BASE + len(complex_code))

        # Check result
        result = mu.reg_read(UC_X86_REG_RAX)
        print(f"\nEmulation result: 0x{result:x}")

    except UcError as e:
        print(f"Emulation error: {e}")

    mu.hook_del(hook_id)

# Run integration examples
if __name__ == "__main__":
    # Unicorn-Qiling comparison
    unicorn_qiling_comparison()

    print("\n" + "=" * 60 + "\n")

    # Advanced integration
    advanced_integration_example()

    print("\n" + "=" * 60 + "\n")

    # Capstone integration
    unicorn_with_capstone()

Mejores prácticas y optimización

Optimización del rendimiento

# Performance optimization techniques for Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *
import time
import psutil
import gc

class UnicornOptimizer:
    def __init__(self):
        self.performance_metrics = {}
        self.optimization_strategies = {}

    def benchmark_emulation(self, arch, mode, code, iterations=1000):
        """Benchmark emulation performance"""

        print(f"Benchmarking {iterations} iterations...")

        # Baseline measurement
        baseline_times = []

        for i in range(iterations):
            mu = Uc(arch, mode)
            mu.mem_map(0x1000000, 2 * 1024 * 1024)
            mu.mem_write(0x1000000, code)

            start_time = time.perf_counter()
            try:
                mu.emu_start(0x1000000, 0x1000000 + len(code))
            except UcError:
                pass
            end_time = time.perf_counter()

            baseline_times.append(end_time - start_time)

            # Clean up
            del mu
            if i % 100 == 0:
                gc.collect()

        avg_time = sum(baseline_times) / len(baseline_times)
        min_time = min(baseline_times)
        max_time = max(baseline_times)

        print(f"  Average time: {avg_time:.6f}s")
        print(f"  Min time: {min_time:.6f}s")
        print(f"  Max time: {max_time:.6f}s")

        return {
            'average': avg_time,
            'min': min_time,
            'max': max_time,
            'times': baseline_times
        }

    def optimize_memory_usage(self, mu):
        """Optimize memory usage for emulator"""

        # Get current memory usage
        process = psutil.Process()
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        print(f"Initial memory usage: {initial_memory:.2f} MB")

        # Optimization strategies
        optimizations = {
            'minimal_mapping': self.use_minimal_memory_mapping,
            'lazy_allocation': self.implement_lazy_allocation,
            'memory_pooling': self.implement_memory_pooling,
            'garbage_collection': self.optimize_garbage_collection
        }

        for name, optimization in optimizations.items():
            print(f"Applying {name}...")
            optimization(mu)

            current_memory = process.memory_info().rss / 1024 / 1024
            print(f"  Memory after {name}: {current_memory:.2f} MB")

    def use_minimal_memory_mapping(self, mu):
        """Use minimal memory mapping strategy"""

        # Only map memory that's actually needed
        # Avoid large contiguous mappings when possible

        # Example: Instead of mapping 1GB, map smaller chunks as needed
        pass

    def implement_lazy_allocation(self, mu):
        """Implement lazy memory allocation"""

        # Map memory only when accessed
        def hook_mem_unmapped(uc, access, address, size, value, user_data):
            """Lazy allocation hook"""

            # Map page on demand
            page_size = 0x1000
            page_start = address & ~(page_size - 1)

            try:
                uc.mem_map(page_start, page_size)
                return True
            except UcError:
                return False

        # Add lazy allocation hook
        mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, hook_mem_unmapped)

    def implement_memory_pooling(self, mu):
        """Implement memory pooling for better performance"""

        # Pre-allocate memory pools for common sizes
        self.memory_pools = {
            'small': [],   # 4KB pages
            'medium': [],  # 64KB chunks  
            'large': []    # 1MB chunks
        }

        # Initialize pools
        for _ in range(10):
            self.memory_pools['small'].append(bytearray(4096))
            self.memory_pools['medium'].append(bytearray(65536))
            self.memory_pools['large'].append(bytearray(1048576))

    def optimize_garbage_collection(self, mu):
        """Optimize garbage collection"""

        # Force garbage collection
        gc.collect()

        # Adjust GC thresholds for better performance
        gc.set_threshold(700, 10, 10)

    def optimize_hook_performance(self, mu):
        """Optimize hook performance"""

        # Minimize hook overhead
        hook_count = {'count': 0}

        def lightweight_hook(uc, address, size, user_data):
            """Lightweight hook with minimal overhead"""

            hook_count['count'] += 1

            # Only perform expensive operations occasionally
            if hook_count['count'] % 1000 == 0:
                # Expensive operation here
                pass

        # Use specific hooks instead of generic ones
        mu.hook_add(UC_HOOK_CODE, lightweight_hook)

        return hook_count

    def batch_emulation_optimization(self, code_samples):
        """Optimize batch emulation of multiple code samples"""

        print("Optimizing batch emulation...")

        # Strategy 1: Reuse emulator instances
        mu = Uc(UC_ARCH_X86, UC_MODE_64)
        mu.mem_map(0x1000000, 2 * 1024 * 1024)

        results = []

        for i, code in enumerate(code_samples):
            # Reset state instead of creating new instance
            self.reset_emulator_state(mu)

            # Write new code
            mu.mem_write(0x1000000, code)

            try:
                start_time = time.perf_counter()
                mu.emu_start(0x1000000, 0x1000000 + len(code))
                end_time = time.perf_counter()

                results.append({
                    'sample': i,
                    'success': True,
                    'time': end_time - start_time
                })

            except UcError as e:
                results.append({
                    'sample': i,
                    'success': False,
                    'error': str(e)
                })

        return results

    def reset_emulator_state(self, mu):
        """Reset emulator state for reuse"""

        # Reset registers
        if mu.arch == UC_ARCH_X86:
            if mu.mode == UC_MODE_64:
                registers = [
                    UC_X86_REG_RAX, UC_X86_REG_RBX, UC_X86_REG_RCX, UC_X86_REG_RDX,
                    UC_X86_REG_RSI, UC_X86_REG_RDI, UC_X86_REG_RSP, UC_X86_REG_RBP,
                    UC_X86_REG_R8, UC_X86_REG_R9, UC_X86_REG_R10, UC_X86_REG_R11,
                    UC_X86_REG_R12, UC_X86_REG_R13, UC_X86_REG_R14, UC_X86_REG_R15
                ]
            else:
                registers = [
                    UC_X86_REG_EAX, UC_X86_REG_EBX, UC_X86_REG_ECX, UC_X86_REG_EDX,
                    UC_X86_REG_ESI, UC_X86_REG_EDI, UC_X86_REG_ESP, UC_X86_REG_EBP
                ]

            for reg in registers:
                mu.reg_write(reg, 0)

        # Clear memory (write zeros to mapped regions)
        try:
            regions = mu.mem_regions()
            for start, end, perms in regions:
                size = end - start + 1
                mu.mem_write(start, b'\x00' * size)
        except UcError:
            pass

    def profile_emulation(self, mu, code):
        """Profile emulation performance"""

        profiling_data = {
            'instruction_count': 0,
            'memory_accesses': 0,
            'hook_overhead': 0,
            'execution_time': 0
        }

        def profiling_hook(uc, address, size, user_data):
            """Profiling hook"""
            profiling_data['instruction_count'] += 1

        def memory_hook(uc, access, address, size, value, user_data):
            """Memory access profiling"""
            profiling_data['memory_accesses'] += 1

        # Add profiling hooks
        code_hook = mu.hook_add(UC_HOOK_CODE, profiling_hook)
        mem_hook = mu.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, memory_hook)

        # Run emulation
        start_time = time.perf_counter()

        try:
            mu.emu_start(0x1000000, 0x1000000 + len(code))
        except UcError:
            pass

        end_time = time.perf_counter()
        profiling_data['execution_time'] = end_time - start_time

        # Clean up hooks
        mu.hook_del(code_hook)
        mu.hook_del(mem_hook)

        # Calculate metrics
        if profiling_data['execution_time'] > 0:
            profiling_data['instructions_per_second'] = profiling_data['instruction_count'] / profiling_data['execution_time']
            profiling_data['memory_accesses_per_second'] = profiling_data['memory_accesses'] / profiling_data['execution_time']

        return profiling_data

def performance_comparison():
    """Compare performance of different optimization strategies"""

    print("Performance Comparison")
    print("=" * 40)

    optimizer = UnicornOptimizer()

    # Test code samples
    test_codes = [
        b"\x48\xc7\xc0\x37\x13\x00\x00\xc3",  # mov rax, 0x1337; ret
        b"\x48\x31\xc0\x48\xff\xc0\xc3",      # xor rax, rax; inc rax; ret
        b"\x48\x89\xe5\x48\x83\xc4\x08\x5d\xc3",  # mov rbp, rsp; add rsp, 8; pop rbp; ret
    ]

    # Benchmark each code sample
    for i, code in enumerate(test_codes):
        print(f"\nBenchmarking code sample {i+1}:")
        results = optimizer.benchmark_emulation(UC_ARCH_X86, UC_MODE_64, code, iterations=100)

        # Profile the emulation
        mu = Uc(UC_ARCH_X86, UC_MODE_64)
        mu.mem_map(0x1000000, 2 * 1024 * 1024)
        mu.mem_write(0x1000000, code)

        profile = optimizer.profile_emulation(mu, code)
        print(f"  Instructions: {profile['instruction_count']}")
        print(f"  Memory accesses: {profile['memory_accesses']}")
        print(f"  Instructions/sec: {profile.get('instructions_per_second', 0):.0f}")

    # Test batch optimization
    print(f"\nTesting batch optimization:")
    batch_results = optimizer.batch_emulation_optimization(test_codes * 10)

    successful = sum(1 for r in batch_results if r['success'])
    total_time = sum(r.get('time', 0) for r in batch_results if r['success'])

    print(f"  Successful emulations: {successful}/{len(batch_results)}")
    print(f"  Total time: {total_time:.6f}s")
    print(f"  Average time per emulation: {total_time/successful:.6f}s")

def memory_optimization_example():
    """Example of memory optimization techniques"""

    print("Memory Optimization Example")
    print("=" * 40)

    optimizer = UnicornOptimizer()

    # Create emulator
    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # Apply optimizations
    optimizer.optimize_memory_usage(mu)

    # Test with large memory operations
    LARGE_SIZE = 10 * 1024 * 1024  # 10MB

    try:
        # Map large memory region
        mu.mem_map(0x10000000, LARGE_SIZE)

        # Write pattern to memory
        pattern = b"ABCD" * 1024  # 4KB pattern
        for offset in range(0, LARGE_SIZE, len(pattern)):
            if offset + len(pattern) <= LARGE_SIZE:
                mu.mem_write(0x10000000 + offset, pattern)

        print(f"Successfully allocated and wrote to {LARGE_SIZE} bytes")

        # Read back and verify
        read_data = mu.mem_read(0x10000000, len(pattern))
        if read_data == pattern:
            print("Memory verification successful")

    except UcError as e:
        print(f"Memory operation failed: {e}")

    # Get final memory usage
    process = psutil.Process()
    final_memory = process.memory_info().rss / 1024 / 1024
    print(f"Final memory usage: {final_memory:.2f} MB")

def best_practices_guide():
    """Guide to Unicorn Engine best practices"""

    practices = {
        "Performance": [
            "Reuse emulator instances when possible",
            "Use minimal memory mappings",
            "Implement lazy memory allocation",
            "Optimize hook functions for minimal overhead",
            "Use specific hooks instead of generic ones",
            "Batch similar operations together",
            "Profile your emulation to identify bottlenecks"
        ],

        "Memory Management": [
            "Map only necessary memory regions",
            "Use appropriate memory permissions",
            "Implement memory pooling for frequent allocations",
            "Clean up unused memory mappings",
            "Monitor memory usage in long-running emulations",
            "Use memory-mapped files for large data sets"
        ],

        "Error Handling": [
            "Always handle UcError exceptions",
            "Implement proper cleanup in finally blocks",
            "Use hooks to handle invalid memory accesses",
            "Validate input data before emulation",
            "Set reasonable timeouts for emulation",
            "Log errors with sufficient context"
        ],

        "Security": [
            "Validate all input data",
            "Use appropriate memory permissions",
            "Implement timeouts to prevent infinite loops",
            "Sanitize hook callback data",
            "Be cautious with self-modifying code",
            "Isolate emulation environments"
        ],

        "Debugging": [
            "Use instruction hooks for tracing",
            "Implement register and memory dumping",
            "Log emulation state at key points",
            "Use conditional hooks for specific addresses",
            "Implement step-by-step debugging",
            "Validate emulation results"
        ]
    }

    print("Unicorn Engine Best Practices")
    print("=" * 40)

    for category, tips in practices.items():
        print(f"\n{category}:")
        for tip in tips:
            print(f"  • {tip}")

    return practices

# Run optimization examples
if __name__ == "__main__":
    # Performance comparison
    performance_comparison()

    print("\n" + "=" * 60 + "\n")

    # Memory optimization
    memory_optimization_example()

    print("\n" + "=" * 60 + "\n")

    # Best practices guide
    best_practices_guide()

Recursos y documentación

Recursos oficiales

Recursos didácticos

Herramientas e integración relacionadas

  • Marco de Qiling - Marco de emulación de alto nivel construido sobre Unicornio
  • Capstone Engine - Marco de desmontaje (pair bien con Unicornio)
  • Keystone Engine - Marco de la Asamblea (complemento a Unicornio)
  • Radare2 - Marco de ingeniería inversa con integración Unicornio

Comunidad y Apoyo

Temas avanzados