Feuille de chaleur du moteur Licorne
Aperçu général
Licorne Engine est un cadre d'émulateur CPU multi-architecture léger et multiplateforme basé sur QEMU. Il fournit une API propre pour émuler les instructions CPU sur différentes architectures, ce qui le rend idéal pour l'analyse de logiciels malveillants, l'ingénierie inverse, le flou, et la recherche de sécurité.
C'est-à-dire Caractéristiques principales: Support multi-architecture (x86, ARM, MIPS, SPARC, M68K), conception légère, API propre, fixations linguistiques étendues et intégration avec d'autres outils de sécurité.
Installation et configuration
Installation de base
# Install via pip (recommended)
pip install unicorn
# Install development version
pip install git+https://github.com/unicorn-engine/unicorn.git
# Install with additional bindings
pip install unicorn[all]
# System dependencies (Ubuntu/Debian)
sudo apt-get update
sudo apt-get install -y \
build-essential \
cmake \
pkg-config \
libglib2.0-dev
# System dependencies (macOS)
brew install cmake pkg-config glib
# Build from source
git clone https://github.com/unicorn-engine/unicorn.git
cd unicorn
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make -j$(nproc)
sudo make install
# Python bindings from source
cd ../bindings/python
python setup.py install
# Verify installation
python -c "import unicorn; print(f'Unicorn version: {unicorn.version_bind()}')"
Reliures linguistiques
# Python (primary)
pip install unicorn
# Java
# Download unicorn.jar from releases
# .NET/C#
# Install-Package UnicornEngine
# Go
go get github.com/unicorn-engine/unicorn/bindings/go/unicorn
# Rust
# Add to Cargo.toml: unicorn-engine = "2.0"
# Ruby
gem install unicorn-engine
# Haskell
cabal install unicorn
# Verify bindings
python -c "
import unicorn
from unicorn import *
from unicorn.x86_const import *
print('All bindings imported successfully')
"
```_
### Appui à l'architecture
```python
# Supported architectures and modes
import unicorn
from unicorn import *
# Architecture constants
architectures = {
'x86': {
'arch': UC_ARCH_X86,
'modes': {
'16-bit': UC_MODE_16,
'32-bit': UC_MODE_32,
'64-bit': UC_MODE_64
}
},
'ARM': {
'arch': UC_ARCH_ARM,
'modes': {
'ARM': UC_MODE_ARM,
'Thumb': UC_MODE_THUMB,
'Cortex-M': UC_MODE_MCLASS,
'ARMv8': UC_MODE_V8
}
},
'ARM64': {
'arch': UC_ARCH_ARM64,
'modes': {
'ARM64': UC_MODE_ARM
}
},
'MIPS': {
'arch': UC_ARCH_MIPS,
'modes': {
'32-bit': UC_MODE_32,
'64-bit': UC_MODE_64,
'Big Endian': UC_MODE_BIG_ENDIAN,
'Little Endian': UC_MODE_LITTLE_ENDIAN,
'Micro MIPS': UC_MODE_MICRO
}
},
'SPARC': {
'arch': UC_ARCH_SPARC,
'modes': {
'32-bit': UC_MODE_32,
'64-bit': UC_MODE_64,
'V9': UC_MODE_V9
}
},
'M68K': {
'arch': UC_ARCH_M68K,
'modes': {
'68000': UC_MODE_BIG_ENDIAN
}
}
}
# Print supported architectures
for arch_name, arch_info in architectures.items():
print(f"{arch_name}:")
for mode_name, mode_const in arch_info['modes'].items():
print(f" {mode_name}: {mode_const}")
```_
## Emulation de base
### Simulation simple x86
```python
# Basic x86 emulation example
import unicorn
from unicorn import *
from unicorn.x86_const import *
def basic_x86_emulation():
"""Basic x86-64 emulation example"""
# x86-64 machine code for: mov eax, 0x1234; ret
X86_CODE = b"\x48\xc7\xc0\x34\x12\x00\x00\xc3" # mov rax, 0x1234; ret
# Memory addresses
ADDRESS = 0x1000000
print("Basic x86-64 emulation")
try:
# Initialize emulator in x86-64 mode
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory for this emulation
mu.mem_map(ADDRESS, 2 * 1024 * 1024) # 2MB
# Write machine code to be emulated to memory
mu.mem_write(ADDRESS, X86_CODE)
# Initialize registers
mu.reg_write(UC_X86_REG_RAX, 0x0)
mu.reg_write(UC_X86_REG_RBX, 0x0)
print(f"Initial RAX: 0x{mu.reg_read(UC_X86_REG_RAX):x}")
# Emulate code
mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))
# Read register values after emulation
rax = mu.reg_read(UC_X86_REG_RAX)
print(f"Final RAX: 0x{rax:x}")
assert rax == 0x1234, f"Expected RAX=0x1234, got 0x{rax:x}"
print("Emulation successful!")
except UcError as e:
print(f"ERROR: {e}")
def x86_with_stack():
"""x86 emulation with stack operations"""
# Assembly: push 0x1234; pop eax; ret
X86_CODE = b"\x68\x34\x12\x00\x00\x58\xc3"
ADDRESS = 0x1000000
STACK_ADDRESS = 0x2000000
STACK_SIZE = 1024 * 1024 # 1MB stack
print("x86 emulation with stack")
try:
mu = Uc(UC_ARCH_X86, UC_MODE_32)
# Map code memory
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# Map stack memory
mu.mem_map(STACK_ADDRESS, STACK_SIZE)
# Write code
mu.mem_write(ADDRESS, X86_CODE)
# Set up stack pointer
mu.reg_write(UC_X86_REG_ESP, STACK_ADDRESS + STACK_SIZE - 4)
print(f"Initial ESP: 0x{mu.reg_read(UC_X86_REG_ESP):x}")
print(f"Initial EAX: 0x{mu.reg_read(UC_X86_REG_EAX):x}")
# Emulate
mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))
# Check results
eax = mu.reg_read(UC_X86_REG_EAX)
esp = mu.reg_read(UC_X86_REG_ESP)
print(f"Final EAX: 0x{eax:x}")
print(f"Final ESP: 0x{esp:x}")
assert eax == 0x1234, f"Expected EAX=0x1234, got 0x{eax:x}"
print("Stack emulation successful!")
except UcError as e:
print(f"ERROR: {e}")
# Run examples
if __name__ == "__main__":
basic_x86_emulation()
print()
x86_with_stack()
ARM Emulation
# ARM emulation examples
import unicorn
from unicorn import *
from unicorn.arm_const import *
def basic_arm_emulation():
"""Basic ARM emulation example"""
# ARM machine code for: mov r0, #0x37; mov r1, #0x3; add r2, r1, r0
ARM_CODE = b"\x37\x00\xa0\xe3\x03\x10\xa0\xe3\x01\x20\x81\xe0"
ADDRESS = 0x10000
print("Basic ARM emulation")
try:
# Initialize emulator in ARM mode
mu = Uc(UC_ARCH_ARM, UC_MODE_ARM)
# Map memory
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# Write machine code
mu.mem_write(ADDRESS, ARM_CODE)
# Initialize registers
mu.reg_write(UC_ARM_REG_R0, 0x0)
mu.reg_write(UC_ARM_REG_R1, 0x0)
mu.reg_write(UC_ARM_REG_R2, 0x0)
print(f"Initial R0: 0x{mu.reg_read(UC_ARM_REG_R0):x}")
print(f"Initial R1: 0x{mu.reg_read(UC_ARM_REG_R1):x}")
print(f"Initial R2: 0x{mu.reg_read(UC_ARM_REG_R2):x}")
# Emulate
mu.emu_start(ADDRESS, ADDRESS + len(ARM_CODE))
# Read results
r0 = mu.reg_read(UC_ARM_REG_R0)
r1 = mu.reg_read(UC_ARM_REG_R1)
r2 = mu.reg_read(UC_ARM_REG_R2)
print(f"Final R0: 0x{r0:x}")
print(f"Final R1: 0x{r1:x}")
print(f"Final R2: 0x{r2:x}")
assert r0 == 0x37 and r1 == 0x3 and r2 == 0x3a
print("ARM emulation successful!")
except UcError as e:
print(f"ERROR: {e}")
def arm_thumb_emulation():
"""ARM Thumb mode emulation"""
# Thumb machine code for: mov r0, #0x37; add r0, #0x3
THUMB_CODE = b"\x37\x20\x03\x30"
ADDRESS = 0x10000
print("ARM Thumb emulation")
try:
# Initialize emulator in Thumb mode
mu = Uc(UC_ARCH_ARM, UC_MODE_THUMB)
# Map memory
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# Write machine code
mu.mem_write(ADDRESS, THUMB_CODE)
# Initialize registers
mu.reg_write(UC_ARM_REG_R0, 0x0)
print(f"Initial R0: 0x{mu.reg_read(UC_ARM_REG_R0):x}")
# Emulate (address must be odd for Thumb mode)
mu.emu_start(ADDRESS | 1, ADDRESS + len(THUMB_CODE))
# Read result
r0 = mu.reg_read(UC_ARM_REG_R0)
print(f"Final R0: 0x{r0:x}")
assert r0 == 0x3a # 0x37 + 0x3
print("Thumb emulation successful!")
except UcError as e:
print(f"ERROR: {e}")
def arm64_emulation():
"""ARM64 (AArch64) emulation"""
# ARM64 machine code for: mov x0, #0x37; mov x1, #0x3; add x2, x1, x0
ARM64_CODE = b"\xe0\x06\x80\xd2\xe1\x00\x80\xd2\x22\x00\x01\x8b"
ADDRESS = 0x10000
print("ARM64 emulation")
try:
# Initialize emulator in ARM64 mode
mu = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
# Map memory
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# Write machine code
mu.mem_write(ADDRESS, ARM64_CODE)
# Initialize registers
mu.reg_write(UC_ARM64_REG_X0, 0x0)
mu.reg_write(UC_ARM64_REG_X1, 0x0)
mu.reg_write(UC_ARM64_REG_X2, 0x0)
print(f"Initial X0: 0x{mu.reg_read(UC_ARM64_REG_X0):x}")
print(f"Initial X1: 0x{mu.reg_read(UC_ARM64_REG_X1):x}")
print(f"Initial X2: 0x{mu.reg_read(UC_ARM64_REG_X2):x}")
# Emulate
mu.emu_start(ADDRESS, ADDRESS + len(ARM64_CODE))
# Read results
x0 = mu.reg_read(UC_ARM64_REG_X0)
x1 = mu.reg_read(UC_ARM64_REG_X1)
x2 = mu.reg_read(UC_ARM64_REG_X2)
print(f"Final X0: 0x{x0:x}")
print(f"Final X1: 0x{x1:x}")
print(f"Final X2: 0x{x2:x}")
assert x0 == 0x37 and x1 == 0x3 and x2 == 0x3a
print("ARM64 emulation successful!")
except UcError as e:
print(f"ERROR: {e}")
# Run ARM examples
if __name__ == "__main__":
basic_arm_emulation()
print()
arm_thumb_emulation()
print()
arm64_emulation()
Gestion de la mémoire
Cartographie de la mémoire et opérations
# Memory management in Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *
class MemoryManager:
def __init__(self, arch, mode):
self.mu = Uc(arch, mode)
self.mapped_regions = []
def map_memory(self, address, size, perms=UC_PROT_ALL):
"""Map memory region with specified permissions"""
try:
self.mu.mem_map(address, size, perms)
self.mapped_regions.append({
'address': address,
'size': size,
'perms': perms
})
print(f"Mapped memory: 0x{address:x} - 0x{address + size:x} (size: {size}, perms: {perms})")
except UcError as e:
print(f"Failed to map memory at 0x{address:x}: {e}")
def unmap_memory(self, address, size):
"""Unmap memory region"""
try:
self.mu.mem_unmap(address, size)
# Remove from tracked regions
self.mapped_regions = [r for r in self.mapped_regions
if not (r['address'] == address and r['size'] == size)]
print(f"Unmapped memory: 0x{address:x} - 0x{address + size:x}")
except UcError as e:
print(f"Failed to unmap memory at 0x{address:x}: {e}")
def protect_memory(self, address, size, perms):
"""Change memory protection"""
try:
self.mu.mem_protect(address, size, perms)
print(f"Protected memory: 0x{address:x} - 0x{address + size:x} (perms: {perms})")
except UcError as e:
print(f"Failed to protect memory at 0x{address:x}: {e}")
def read_memory(self, address, size):
"""Read memory content"""
try:
data = self.mu.mem_read(address, size)
return data
except UcError as e:
print(f"Failed to read memory at 0x{address:x}: {e}")
return None
def write_memory(self, address, data):
"""Write data to memory"""
try:
self.mu.mem_write(address, data)
print(f"Wrote {len(data)} bytes to 0x{address:x}")
except UcError as e:
print(f"Failed to write memory at 0x{address:x}: {e}")
def dump_memory(self, address, size, format='hex'):
"""Dump memory content in various formats"""
data = self.read_memory(address, size)
if data is None:
return
print(f"Memory dump at 0x{address:x} ({size} bytes):")
if format == 'hex':
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hex_str = ' '.join(f'{b:02x}' for b in chunk)
ascii_str = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
print(f"0x{address + i:08x}: {hex_str:<48} |{ascii_str}|")
elif format == 'ascii':
ascii_str = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in data)
print(ascii_str)
elif format == 'raw':
print(data)
def get_memory_regions(self):
"""Get list of mapped memory regions"""
try:
regions = self.mu.mem_regions()
print("Mapped memory regions:")
for region in regions:
start, end, perms = region
size = end - start + 1
print(f" 0x{start:x} - 0x{end:x} (size: {size}, perms: {perms})")
return regions
except UcError as e:
print(f"Failed to get memory regions: {e}")
return []
def memory_management_example():
"""Comprehensive memory management example"""
print("Memory Management Example")
print("=" * 40)
# Create memory manager
mm = MemoryManager(UC_ARCH_X86, UC_MODE_64)
# Map different memory regions
CODE_BASE = 0x400000
DATA_BASE = 0x600000
STACK_BASE = 0x7fff0000
HEAP_BASE = 0x800000
# Map code section (read + execute)
mm.map_memory(CODE_BASE, 0x1000, UC_PROT_READ | UC_PROT_EXEC)
# Map data section (read + write)
mm.map_memory(DATA_BASE, 0x1000, UC_PROT_READ | UC_PROT_WRITE)
# Map stack (read + write)
mm.map_memory(STACK_BASE, 0x1000, UC_PROT_READ | UC_PROT_WRITE)
# Map heap (read + write)
mm.map_memory(HEAP_BASE, 0x1000, UC_PROT_READ | UC_PROT_WRITE)
# Write some data
code_data = b"\x48\xc7\xc0\x37\x13\x00\x00\xc3" # mov rax, 0x1337; ret
mm.write_memory(CODE_BASE, code_data)
data_content = b"Hello, Unicorn Engine!"
mm.write_memory(DATA_BASE, data_content)
stack_data = b"\x00" * 100 + b"STACK_CANARY"
mm.write_memory(STACK_BASE + 0x900, stack_data)
# Read and dump memory
print("\nCode section:")
mm.dump_memory(CODE_BASE, len(code_data))
print("\nData section:")
mm.dump_memory(DATA_BASE, len(data_content), format='ascii')
print("\nStack canary:")
mm.dump_memory(STACK_BASE + 0x900 + 100, 12, format='ascii')
# Get memory regions
print("\nMemory layout:")
mm.get_memory_regions()
# Test memory protection
print("\nTesting memory protection:")
try:
# Try to write to code section (should fail)
mm.write_memory(CODE_BASE, b"\x90\x90\x90\x90")
except:
print("Write to code section blocked (expected)")
# Change protection to allow writing
mm.protect_memory(CODE_BASE, 0x1000, UC_PROT_ALL)
mm.write_memory(CODE_BASE, b"\x90\x90\x90\x90") # Should succeed now
return mm
def advanced_memory_operations():
"""Advanced memory operations and techniques"""
print("Advanced Memory Operations")
print("=" * 40)
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory with different alignments
BASE_ADDR = 0x10000000
# Map large memory region
mu.mem_map(BASE_ADDR, 0x100000) # 1MB
# Create memory layout similar to real process
segments = {
'text': {'offset': 0x0000, 'size': 0x10000, 'perms': UC_PROT_READ | UC_PROT_EXEC},
'rodata': {'offset': 0x10000, 'size': 0x5000, 'perms': UC_PROT_READ},
'data': {'offset': 0x15000, 'size': 0x5000, 'perms': UC_PROT_READ | UC_PROT_WRITE},
'bss': {'offset': 0x1a000, 'size': 0x5000, 'perms': UC_PROT_READ | UC_PROT_WRITE},
'heap': {'offset': 0x20000, 'size': 0x80000, 'perms': UC_PROT_READ | UC_PROT_WRITE}
}
# Set up segments with different protections
for name, seg in segments.items():
addr = BASE_ADDR + seg['offset']
mu.mem_protect(addr, seg['size'], seg['perms'])
print(f"Segment {name}: 0x{addr:x} - 0x{addr + seg['size']:x}")
# Write segment-specific data
# Text segment - executable code
text_addr = BASE_ADDR + segments['text']['offset']
code = b"\x48\x31\xc0\x48\xff\xc0\xc3" # xor rax, rax; inc rax; ret
mu.mem_write(text_addr, code)
# Read-only data segment
rodata_addr = BASE_ADDR + segments['rodata']['offset']
rodata = b"This is read-only data\x00"
mu.mem_write(rodata_addr, rodata)
# Data segment - initialized global variables
data_addr = BASE_ADDR + segments['data']['offset']
global_vars = b"\x37\x13\x00\x00\x00\x00\x00\x00" # global_var = 0x1337
mu.mem_write(data_addr, global_vars)
# BSS segment - uninitialized data (zero-filled)
bss_addr = BASE_ADDR + segments['bss']['offset']
mu.mem_write(bss_addr, b"\x00" * 100)
# Simulate heap allocation
heap_addr = BASE_ADDR + segments['heap']['offset']
heap_data = b"Heap allocated data\x00"
mu.mem_write(heap_addr, heap_data)
# Test memory access patterns
print("\nTesting memory access:")
# Read from each segment
for name, seg in segments.items():
addr = BASE_ADDR + seg['offset']
try:
data = mu.mem_read(addr, 16)
print(f"{name} segment: {data[:16].hex()}")
except UcError as e:
print(f"Failed to read {name} segment: {e}")
# Memory search functionality
def find_pattern(mu, start_addr, size, pattern):
"""Search for byte pattern in memory"""
try:
data = mu.mem_read(start_addr, size)
offset = data.find(pattern)
if offset != -1:
return start_addr + offset
return None
except UcError:
return None
# Search for patterns
pattern = b"read-only"
found_addr = find_pattern(mu, BASE_ADDR, 0x100000, pattern)
if found_addr:
print(f"Found pattern at: 0x{found_addr:x}")
return mu
# Run memory management examples
if __name__ == "__main__":
mm = memory_management_example()
print("\n" + "=" * 60 + "\n")
mu = advanced_memory_operations()
Crochets et rappels
Crochets d'instruction et de mémoire
# Hooks and callbacks in Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *
class UnicornHooks:
def __init__(self, arch, mode):
self.mu = Uc(arch, mode)
self.instruction_count = 0
self.memory_accesses = []
self.function_calls = []
self.hooks = []
def setup_instruction_hook(self):
"""Set up instruction execution hook"""
def hook_code(uc, address, size, user_data):
"""Callback for instruction execution"""
self.instruction_count += 1
# Read instruction bytes
try:
instruction_bytes = uc.mem_read(address, size)
print(f"Instruction #{self.instruction_count}: 0x{address:x} - {instruction_bytes.hex()}")
# Disassemble if capstone is available
try:
import capstone
if uc.arch == UC_ARCH_X86:
if uc.mode == UC_MODE_64:
md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
else:
md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32)
for insn in md.disasm(instruction_bytes, address):
print(f" {insn.mnemonic} {insn.op_str}")
break
except ImportError:
pass
except UcError as e:
print(f"Failed to read instruction at 0x{address:x}: {e}")
# Add hook
hook_id = self.mu.hook_add(UC_HOOK_CODE, hook_code)
self.hooks.append(hook_id)
return hook_id
def setup_memory_hooks(self):
"""Set up memory access hooks"""
def hook_mem_read(uc, access, address, size, value, user_data):
"""Callback for memory read"""
self.memory_accesses.append({
'type': 'read',
'address': address,
'size': size,
'value': value
})
print(f"Memory READ: 0x{address:x} (size: {size}, value: 0x{value:x})")
def hook_mem_write(uc, access, address, size, value, user_data):
"""Callback for memory write"""
self.memory_accesses.append({
'type': 'write',
'address': address,
'size': size,
'value': value
})
print(f"Memory WRITE: 0x{address:x} (size: {size}, value: 0x{value:x})")
def hook_mem_invalid(uc, access, address, size, value, user_data):
"""Callback for invalid memory access"""
print(f"Invalid memory access: 0x{address:x} (access: {access})")
# Try to handle invalid access
if access == UC_MEM_WRITE_UNMAPPED or access == UC_MEM_READ_UNMAPPED:
print(f"Mapping memory at 0x{address:x}")
# Map memory page
page_size = 0x1000
page_start = address & ~(page_size - 1)
uc.mem_map(page_start, page_size)
return True # Continue execution
return False # Stop execution
# Add hooks
read_hook = self.mu.hook_add(UC_HOOK_MEM_READ, hook_mem_read)
write_hook = self.mu.hook_add(UC_HOOK_MEM_WRITE, hook_mem_write)
invalid_hook = self.mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, hook_mem_invalid)
self.hooks.extend([read_hook, write_hook, invalid_hook])
return read_hook, write_hook, invalid_hook
def setup_interrupt_hook(self):
"""Set up interrupt hook"""
def hook_interrupt(uc, intno, user_data):
"""Callback for interrupts"""
print(f"Interrupt: 0x{intno:x}")
# Handle specific interrupts
if intno == 0x80: # Linux system call
# Get system call number from EAX
syscall_num = uc.reg_read(UC_X86_REG_EAX)
print(f" Linux syscall: {syscall_num}")
# Handle specific syscalls
if syscall_num == 1: # sys_exit
exit_code = uc.reg_read(UC_X86_REG_EBX)
print(f" Exit with code: {exit_code}")
uc.emu_stop()
elif syscall_num == 4: # sys_write
fd = uc.reg_read(UC_X86_REG_EBX)
buf_addr = uc.reg_read(UC_X86_REG_ECX)
count = uc.reg_read(UC_X86_REG_EDX)
try:
data = uc.mem_read(buf_addr, count)
print(f" Write to fd {fd}: {data.decode('utf-8', errors='ignore')}")
# Set return value (number of bytes written)
uc.reg_write(UC_X86_REG_EAX, count)
except:
uc.reg_write(UC_X86_REG_EAX, -1) # Error
# Add hook
hook_id = self.mu.hook_add(UC_HOOK_INTR, hook_interrupt)
self.hooks.append(hook_id)
return hook_id
def setup_block_hook(self):
"""Set up basic block hook"""
def hook_block(uc, address, size, user_data):
"""Callback for basic block execution"""
print(f"Basic block: 0x{address:x} - 0x{address + size:x}")
# Add hook
hook_id = self.mu.hook_add(UC_HOOK_BLOCK, hook_block)
self.hooks.append(hook_id)
return hook_id
def setup_conditional_hooks(self):
"""Set up conditional hooks for specific addresses"""
def hook_specific_address(uc, address, size, user_data):
"""Hook for specific address"""
target_addr = user_data['target_address']
if address == target_addr:
print(f"Reached target address: 0x{address:x}")
# Perform specific action
action = user_data.get('action')
if action == 'dump_registers':
self.dump_registers()
elif action == 'modify_register':
reg = user_data.get('register', UC_X86_REG_EAX)
value = user_data.get('value', 0)
uc.reg_write(reg, value)
print(f"Modified register to: 0x{value:x}")
return hook_specific_address
def dump_registers(self):
"""Dump current register state"""
if self.mu.arch == UC_ARCH_X86:
if self.mu.mode == UC_MODE_64:
registers = [
('RAX', UC_X86_REG_RAX), ('RBX', UC_X86_REG_RBX),
('RCX', UC_X86_REG_RCX), ('RDX', UC_X86_REG_RDX),
('RSI', UC_X86_REG_RSI), ('RDI', UC_X86_REG_RDI),
('RSP', UC_X86_REG_RSP), ('RBP', UC_X86_REG_RBP),
('RIP', UC_X86_REG_RIP)
]
else:
registers = [
('EAX', UC_X86_REG_EAX), ('EBX', UC_X86_REG_EBX),
('ECX', UC_X86_REG_ECX), ('EDX', UC_X86_REG_EDX),
('ESI', UC_X86_REG_ESI), ('EDI', UC_X86_REG_EDI),
('ESP', UC_X86_REG_ESP), ('EBP', UC_X86_REG_EBP),
('EIP', UC_X86_REG_EIP)
]
print("Register dump:")
for name, reg_id in registers:
value = self.mu.reg_read(reg_id)
print(f" {name}: 0x{value:x}")
def remove_all_hooks(self):
"""Remove all registered hooks"""
for hook_id in self.hooks:
try:
self.mu.hook_del(hook_id)
except UcError:
pass
self.hooks.clear()
print("All hooks removed")
def hooks_example():
"""Comprehensive hooks example"""
print("Unicorn Engine Hooks Example")
print("=" * 40)
# Create hooks manager
hooks = UnicornHooks(UC_ARCH_X86, UC_MODE_64)
# Set up various hooks
hooks.setup_instruction_hook()
hooks.setup_memory_hooks()
hooks.setup_block_hook()
# Map memory and write code
ADDRESS = 0x1000000
hooks.mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# x86-64 code: mov rax, 0x1234; mov [rax], rbx; mov rcx, [rax]; ret
X86_CODE = b"\x48\xc7\xc0\x34\x12\x00\x00\x48\x89\x18\x48\x8b\x08\xc3"
hooks.mu.mem_write(ADDRESS, X86_CODE)
# Initialize registers
hooks.mu.reg_write(UC_X86_REG_RBX, 0xdeadbeef)
print("Starting emulation with hooks...")
try:
# Emulate with hooks
hooks.mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))
print(f"\nEmulation completed!")
print(f"Instructions executed: {hooks.instruction_count}")
print(f"Memory accesses: {len(hooks.memory_accesses)}")
# Dump final state
hooks.dump_registers()
except UcError as e:
print(f"Emulation error: {e}")
finally:
hooks.remove_all_hooks()
return hooks
def advanced_hooks_example():
"""Advanced hooks with conditional logic"""
print("Advanced Hooks Example")
print("=" * 40)
mu = Uc(UC_ARCH_X86, UC_MODE_32)
# Map memory
ADDRESS = 0x1000000
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# Code with loop: mov eax, 0; loop: inc eax; cmp eax, 10; jl loop; ret
X86_CODE = b"\xb8\x00\x00\x00\x00\x40\x83\xf8\x0a\x7c\xfb\xc3"
mu.mem_write(ADDRESS, X86_CODE)
# Hook to count loop iterations
loop_count = {'count': 0}
def hook_loop_counter(uc, address, size, user_data):
"""Count loop iterations"""
# Check if we're at the loop start (inc eax instruction)
if address == ADDRESS + 5: # Address of 'inc eax'
loop_count['count'] += 1
eax = uc.reg_read(UC_X86_REG_EAX)
print(f"Loop iteration {loop_count['count']}: EAX = {eax}")
# Limit iterations to prevent infinite loop
if loop_count['count'] > 20:
print("Loop limit reached, stopping emulation")
uc.emu_stop()
# Hook to detect function calls
def hook_function_calls(uc, address, size, user_data):
"""Detect and log function calls"""
try:
# Read instruction
insn_bytes = uc.mem_read(address, size)
# Check for call instruction (0xe8 for relative call)
if insn_bytes[0] == 0xe8:
# Calculate call target
if len(insn_bytes) >= 5:
offset = int.from_bytes(insn_bytes[1:5], 'little', signed=True)
target = address + size + offset
print(f"Function call from 0x{address:x} to 0x{target:x}")
except:
pass
# Add hooks
loop_hook = mu.hook_add(UC_HOOK_CODE, hook_loop_counter)
call_hook = mu.hook_add(UC_HOOK_CODE, hook_function_calls)
print("Starting emulation with advanced hooks...")
try:
mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE))
print(f"Emulation completed!")
print(f"Total loop iterations: {loop_count['count']}")
# Check final register state
eax = mu.reg_read(UC_X86_REG_EAX)
print(f"Final EAX value: {eax}")
except UcError as e:
print(f"Emulation error: {e}")
finally:
mu.hook_del(loop_hook)
mu.hook_del(call_hook)
return mu
# Run hooks examples
if __name__ == "__main__":
hooks = hooks_example()
print("\n" + "=" * 60 + "\n")
mu = advanced_hooks_example()
Techniques avancées d'émulation
Emulation multithreadée
# Advanced emulation techniques with Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *
import threading
import time
import queue
class UnicornEmulator:
def __init__(self, arch, mode):
self.arch = arch
self.mode = mode
self.emulation_results = {}
self.shared_memory = {}
def create_emulator_instance(self, instance_id):
"""Create a new emulator instance"""
mu = Uc(self.arch, self.mode)
# Set up basic memory layout
CODE_BASE = 0x400000
DATA_BASE = 0x600000
STACK_BASE = 0x7fff0000
mu.mem_map(CODE_BASE, 0x10000) # Code section
mu.mem_map(DATA_BASE, 0x10000) # Data section
mu.mem_map(STACK_BASE, 0x10000) # Stack section
# Set up stack pointer
if self.arch == UC_ARCH_X86:
if self.mode == UC_MODE_64:
mu.reg_write(UC_X86_REG_RSP, STACK_BASE + 0x8000)
else:
mu.reg_write(UC_X86_REG_ESP, STACK_BASE + 0x8000)
return mu, CODE_BASE, DATA_BASE, STACK_BASE
def emulate_code_snippet(self, instance_id, code, initial_state=None):
"""Emulate a code snippet in separate thread"""
def emulation_thread():
try:
mu, code_base, data_base, stack_base = self.create_emulator_instance(instance_id)
# Write code to memory
mu.mem_write(code_base, code)
# Set initial register state if provided
if initial_state:
for reg, value in initial_state.items():
mu.reg_write(reg, value)
# Set up hooks for this instance
instruction_count = {'count': 0}
def hook_code(uc, address, size, user_data):
instruction_count['count'] += 1
if instruction_count['count'] > 10000: # Prevent infinite loops
uc.emu_stop()
hook_id = mu.hook_add(UC_HOOK_CODE, hook_code)
# Start emulation
start_time = time.time()
mu.emu_start(code_base, code_base + len(code))
end_time = time.time()
# Collect results
result = {
'instance_id': instance_id,
'success': True,
'execution_time': end_time - start_time,
'instruction_count': instruction_count['count'],
'final_registers': {},
'memory_dumps': {}
}
# Read final register state
if self.arch == UC_ARCH_X86:
if self.mode == UC_MODE_64:
registers = [UC_X86_REG_RAX, UC_X86_REG_RBX, UC_X86_REG_RCX, UC_X86_REG_RDX]
else:
registers = [UC_X86_REG_EAX, UC_X86_REG_EBX, UC_X86_REG_ECX, UC_X86_REG_EDX]
for reg in registers:
result['final_registers'][reg] = mu.reg_read(reg)
# Store result
self.emulation_results[instance_id] = result
# Clean up
mu.hook_del(hook_id)
except UcError as e:
self.emulation_results[instance_id] = {
'instance_id': instance_id,
'success': False,
'error': str(e)
}
# Start emulation in separate thread
thread = threading.Thread(target=emulation_thread)
thread.start()
return thread
def parallel_emulation(self, code_variants, max_workers=4):
"""Run multiple code variants in parallel"""
print(f"Starting parallel emulation with {len(code_variants)} variants")
threads = []
# Start emulation threads
for i, (code, initial_state) in enumerate(code_variants):
if len(threads) >= max_workers:
# Wait for a thread to complete
threads[0].join()
threads.pop(0)
thread = self.emulate_code_snippet(f"variant_{i}", code, initial_state)
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
print(f"Parallel emulation completed. Results: {len(self.emulation_results)}")
return self.emulation_results
def compare_emulation_results(self):
"""Compare results from different emulation instances"""
if len(self.emulation_results) < 2:
print("Need at least 2 results to compare")
return
print("Emulation Results Comparison:")
print("=" * 50)
for instance_id, result in self.emulation_results.items():
if result['success']:
print(f"\nInstance: {instance_id}")
print(f" Execution time: {result['execution_time']:.6f}s")
print(f" Instructions: {result['instruction_count']}")
print(f" Final registers: {result['final_registers']}")
else:
print(f"\nInstance: {instance_id} - FAILED")
print(f" Error: {result['error']}")
def fuzzing_with_unicorn():
"""Use Unicorn for fuzzing and input generation"""
print("Fuzzing with Unicorn Engine")
print("=" * 40)
emulator = UnicornEmulator(UC_ARCH_X86, UC_MODE_64)
# Target function to fuzz (simple buffer operation)
# mov rdi, input_buffer; mov rsi, input_size; call process_buffer
target_code = b"\x48\xc7\xc7\x00\x20\x60\x00\x48\xc7\xc6\x10\x00\x00\x00\xe8\x10\x00\x00\x00\xc3"
# Process buffer function (simplified)
# Check if input_size > 16, if so, trigger vulnerability
process_buffer_code = b"\x48\x83\xfe\x10\x7f\x05\x48\x31\xc0\xc3\xcc" # cmp rsi, 16; jg vuln; xor rax,rax; ret; int3
# Generate fuzzing inputs
fuzzing_inputs = []
for size in [8, 16, 24, 32, 64, 128]:
for pattern in [0x41, 0x00, 0xff, 0x90]:
input_data = bytes([pattern] * size)
# Create code variant with this input
code_variant = target_code + process_buffer_code
initial_state = {
UC_X86_REG_RDI: 0x602000, # Input buffer address
UC_X86_REG_RSI: size # Input size
}
fuzzing_inputs.append((code_variant, initial_state))
# Run fuzzing
results = emulator.parallel_emulation(fuzzing_inputs, max_workers=8)
# Analyze results for crashes/vulnerabilities
vulnerabilities = []
for instance_id, result in results.items():
if not result['success'] and 'int3' in result.get('error', '').lower():
vulnerabilities.append(instance_id)
print(f"\nFuzzing completed. Found {len(vulnerabilities)} potential vulnerabilities:")
for vuln in vulnerabilities:
print(f" {vuln}")
return emulator, vulnerabilities
def code_coverage_analysis():
"""Implement code coverage analysis with Unicorn"""
print("Code Coverage Analysis")
print("=" * 40)
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory
CODE_BASE = 0x400000
mu.mem_map(CODE_BASE, 0x10000)
# Complex code with multiple paths
# Function with conditional branches
complex_code = b"""
\x48\x83\xec\x10 # sub rsp, 16
\x48\x89\x7c\x24\x08 # mov [rsp+8], rdi
\x48\x83\xff\x0a # cmp rdi, 10
\x7c\x0a # jl less_than_10
\x48\xc7\xc0\x01\x00\x00\x00 # mov rax, 1
\xeb\x08 # jmp end
# less_than_10:
\x48\xc7\xc0\x00\x00\x00\x00 # mov rax, 0
# end:
\x48\x83\xc4\x10 # add rsp, 16
\xc3 # ret
"""
# Remove whitespace and comments for actual bytes
actual_code = b"\x48\x83\xec\x10\x48\x89\x7c\x24\x08\x48\x83\xff\x0a\x7c\x0a\x48\xc7\xc0\x01\x00\x00\x00\xeb\x08\x48\xc7\xc0\x00\x00\x00\x00\x48\x83\xc4\x10\xc3"
mu.mem_write(CODE_BASE, actual_code)
# Coverage tracking
coverage_data = {
'executed_addresses': set(),
'basic_blocks': set(),
'branch_coverage': {}
}
def hook_coverage(uc, address, size, user_data):
"""Track code coverage"""
coverage_data['executed_addresses'].add(address)
# Track basic blocks (simplified)
block_start = address
coverage_data['basic_blocks'].add(block_start)
# Track branches
try:
insn_bytes = uc.mem_read(address, size)
if size >= 2 and insn_bytes[0] == 0x7c: # jl instruction
# This is a conditional branch
branch_addr = address
taken = False # We'd need to check if branch was taken
coverage_data['branch_coverage'][branch_addr] = taken
except:
pass
hook_id = mu.hook_add(UC_HOOK_CODE, hook_coverage)
# Test different inputs to achieve different coverage
test_inputs = [5, 15, 0, 10, -1, 100]
for test_input in test_inputs:
print(f"Testing input: {test_input}")
# Reset coverage for this run
run_coverage = set()
def hook_run_coverage(uc, address, size, user_data):
run_coverage.add(address)
run_hook = mu.hook_add(UC_HOOK_CODE, hook_run_coverage)
# Set input and run
mu.reg_write(UC_X86_REG_RDI, test_input)
mu.reg_write(UC_X86_REG_RSP, 0x7fff8000)
try:
mu.emu_start(CODE_BASE, CODE_BASE + len(actual_code))
result = mu.reg_read(UC_X86_REG_RAX)
print(f" Result: {result}, Coverage: {len(run_coverage)} addresses")
except UcError as e:
print(f" Error: {e}")
mu.hook_del(run_hook)
# Print overall coverage statistics
print(f"\nOverall Coverage Statistics:")
print(f" Total addresses executed: {len(coverage_data['executed_addresses'])}")
print(f" Basic blocks covered: {len(coverage_data['basic_blocks'])}")
print(f" Branches tracked: {len(coverage_data['branch_coverage'])}")
mu.hook_del(hook_id)
return coverage_data
def performance_profiling():
"""Performance profiling with Unicorn"""
print("Performance Profiling")
print("=" * 40)
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory
CODE_BASE = 0x400000
mu.mem_map(CODE_BASE, 0x10000)
# Code with loop for profiling
# Simple loop that counts from 0 to N
loop_code = b"\x48\x31\xc0\x48\x83\xf8\x64\x7d\x04\x48\xff\xc0\xeb\xf6\xc3" # xor rax,rax; loop: cmp rax,100; jge end; inc rax; jmp loop; end: ret
mu.mem_write(CODE_BASE, loop_code)
# Profiling data
profile_data = {
'instruction_counts': {},
'execution_times': {},
'hotspots': []
}
def hook_profiler(uc, address, size, user_data):
"""Profiling hook"""
# Count instructions at each address
if address not in profile_data['instruction_counts']:
profile_data['instruction_counts'][address] = 0
profile_data['instruction_counts'][address] += 1
# Track execution time (simplified)
current_time = time.time()
if address not in profile_data['execution_times']:
profile_data['execution_times'][address] = []
profile_data['execution_times'][address].append(current_time)
hook_id = mu.hook_add(UC_HOOK_CODE, hook_profiler)
# Run profiling
start_time = time.time()
try:
mu.emu_start(CODE_BASE, CODE_BASE + len(loop_code))
end_time = time.time()
print(f"Execution completed in {end_time - start_time:.6f} seconds")
# Analyze profiling data
print("\nInstruction Execution Counts:")
sorted_counts = sorted(profile_data['instruction_counts'].items(),
key=lambda x: x[1], reverse=True)
for addr, count in sorted_counts[:5]: # Top 5 hotspots
print(f" 0x{addr:x}: {count} executions")
profile_data['hotspots'].append((addr, count))
total_instructions = sum(profile_data['instruction_counts'].values())
print(f"\nTotal instructions executed: {total_instructions}")
except UcError as e:
print(f"Profiling error: {e}")
mu.hook_del(hook_id)
return profile_data
# Run advanced emulation examples
if __name__ == "__main__":
# Multi-threaded emulation
emulator, vulns = fuzzing_with_unicorn()
emulator.compare_emulation_results()
print("\n" + "=" * 60 + "\n")
# Code coverage analysis
coverage = code_coverage_analysis()
print("\n" + "=" * 60 + "\n")
# Performance profiling
profile = performance_profiling()
Intégration avec d'autres outils
Intégration du cadre de référence
# Qiling Framework integration with Unicorn Engine
# Qiling provides higher-level emulation with OS API support
try:
from qiling import Qiling
from qiling.const import QL_VERBOSE
QILING_AVAILABLE = True
except ImportError:
QILING_AVAILABLE = False
print("Qiling not available. Install with: pip install qiling")
import unicorn
from unicorn import *
from unicorn.x86_const import *
import os
import tempfile
class UnicornQilingBridge:
"""Bridge between Unicorn Engine and Qiling Framework"""
def __init__(self):
self.unicorn_instances = {}
self.qiling_instances = {}
def create_qiling_emulator(self, binary_path, rootfs_path, arch="x8664", ostype="linux"):
"""Create Qiling emulator instance"""
if not QILING_AVAILABLE:
raise ImportError("Qiling framework not available")
try:
ql = Qiling([binary_path], rootfs_path, verbose=QL_VERBOSE.DEFAULT)
# Set up hooks for integration
self.setup_qiling_hooks(ql)
return ql
except Exception as e:
print(f"Failed to create Qiling emulator: {e}")
return None
def setup_qiling_hooks(self, ql):
"""Set up hooks for Qiling emulator"""
def hook_syscall(ql, syscall_num, *args):
"""Hook system calls"""
print(f"Syscall: {syscall_num} with args: {args}")
def hook_api_call(ql, api_name, *args):
"""Hook API calls"""
print(f"API call: {api_name} with args: {args}")
# Add hooks
ql.hook_syscall(hook_syscall)
def unicorn_to_qiling_state(self, mu, ql):
"""Transfer state from Unicorn to Qiling"""
# Transfer register state
if mu.arch == UC_ARCH_X86:
if mu.mode == UC_MODE_64:
registers = [
(UC_X86_REG_RAX, 'rax'), (UC_X86_REG_RBX, 'rbx'),
(UC_X86_REG_RCX, 'rcx'), (UC_X86_REG_RDX, 'rdx'),
(UC_X86_REG_RSI, 'rsi'), (UC_X86_REG_RDI, 'rdi'),
(UC_X86_REG_RSP, 'rsp'), (UC_X86_REG_RBP, 'rbp')
]
for uc_reg, ql_reg in registers:
value = mu.reg_read(uc_reg)
setattr(ql.reg, ql_reg, value)
# Transfer memory state (simplified)
try:
regions = mu.mem_regions()
for start, end, perms in regions:
size = end - start + 1
data = mu.mem_read(start, size)
ql.mem.write(start, data)
except:
pass
def qiling_to_unicorn_state(self, ql, mu):
"""Transfer state from Qiling to Unicorn"""
# Transfer register state
if mu.arch == UC_ARCH_X86:
if mu.mode == UC_MODE_64:
registers = [
('rax', UC_X86_REG_RAX), ('rbx', UC_X86_REG_RBX),
('rcx', UC_X86_REG_RCX), ('rdx', UC_X86_REG_RDX),
('rsi', UC_X86_REG_RSI), ('rdi', UC_X86_REG_RDI),
('rsp', UC_X86_REG_RSP), ('rbp', UC_X86_REG_RBP)
]
for ql_reg, uc_reg in registers:
value = getattr(ql.reg, ql_reg)
mu.reg_write(uc_reg, value)
def create_test_binary():
"""Create a simple test binary for emulation"""
# Simple C program that we'll compile
c_code = """
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
printf("Hello from test binary!\\n");
if (argc > 1) {
int value = atoi(argv[1]);
printf("Argument value: %d\\n", value);
if (value > 100) {
printf("Large value detected!\\n");
return 1;
}
}
return 0;
}
"""
# Create temporary files
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
f.write(c_code)
c_file = f.name
binary_file = c_file.replace('.c', '')
# Compile the program
compile_cmd = f"gcc -o {binary_file} {c_file}"
result = os.system(compile_cmd)
if result == 0:
print(f"Test binary created: {binary_file}")
return binary_file
else:
print("Failed to compile test binary")
return None
def unicorn_qiling_comparison():
"""Compare Unicorn and Qiling emulation approaches"""
print("Unicorn vs Qiling Comparison")
print("=" * 40)
# Create test binary
binary_path = create_test_binary()
if not binary_path:
return
try:
# Unicorn emulation (low-level)
print("\n1. Unicorn Engine Emulation:")
unicorn_emulation_example(binary_path)
# Qiling emulation (high-level)
if QILING_AVAILABLE:
print("\n2. Qiling Framework Emulation:")
qiling_emulation_example(binary_path)
else:
print("\n2. Qiling Framework: Not available")
finally:
# Clean up
try:
os.unlink(binary_path)
os.unlink(binary_path.replace('', '.c'))
except:
pass
def unicorn_emulation_example(binary_path):
"""Low-level emulation with Unicorn"""
# Read binary file
with open(binary_path, 'rb') as f:
binary_data = f.read()
# Simple emulation (this is very simplified)
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory for binary
BASE_ADDR = 0x400000
mu.mem_map(BASE_ADDR, len(binary_data) + 0x1000)
# Write binary to memory
mu.mem_write(BASE_ADDR, binary_data)
print(f" Loaded {len(binary_data)} bytes at 0x{BASE_ADDR:x}")
print(" Note: This is a simplified example - real ELF loading is complex")
def qiling_emulation_example(binary_path):
"""High-level emulation with Qiling"""
if not QILING_AVAILABLE:
return
# Qiling requires a rootfs - create minimal one
rootfs_path = "/tmp/qiling_rootfs"
os.makedirs(rootfs_path, exist_ok=True)
# Create minimal lib directory
lib_dir = os.path.join(rootfs_path, "lib")
os.makedirs(lib_dir, exist_ok=True)
try:
# Create Qiling instance
ql = Qiling([binary_path], rootfs_path, verbose=QL_VERBOSE.DEFAULT)
# Set up hooks
def hook_printf(ql):
"""Hook printf function"""
# Get format string from first argument
format_addr = ql.reg.rdi
format_str = ql.mem.string(format_addr)
print(f" printf: {format_str}")
# Hook printf
ql.set_api("printf", hook_printf)
# Run emulation
print(" Starting Qiling emulation...")
ql.run()
print(f" Emulation completed with exit code: {ql.exit_code}")
except Exception as e:
print(f" Qiling emulation failed: {e}")
def advanced_integration_example():
"""Advanced integration between different emulation approaches"""
print("Advanced Integration Example")
print("=" * 40)
# Create bridge
bridge = UnicornQilingBridge()
# Example: Use Unicorn for low-level analysis, then switch to Qiling for high-level
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory and set up basic environment
CODE_BASE = 0x400000
mu.mem_map(CODE_BASE, 0x10000)
# Simple code that makes a system call
# mov rax, 1 (sys_write); mov rdi, 1 (stdout); mov rsi, msg; mov rdx, len; syscall
syscall_code = b"\x48\xc7\xc0\x01\x00\x00\x00\x48\xc7\xc7\x01\x00\x00\x00\x48\xc7\xc6\x00\x50\x40\x00\x48\xc7\xc2\x0d\x00\x00\x00\x0f\x05"
mu.mem_write(CODE_BASE, syscall_code)
# Write message to memory
message = b"Hello, World!\n"
mu.mem_write(0x405000, message)
# Hook system calls in Unicorn
def hook_syscall(uc, user_data):
"""Handle system calls in Unicorn"""
syscall_num = uc.reg_read(UC_X86_REG_RAX)
if syscall_num == 1: # sys_write
fd = uc.reg_read(UC_X86_REG_RDI)
buf_addr = uc.reg_read(UC_X86_REG_RSI)
count = uc.reg_read(UC_X86_REG_RDX)
try:
data = uc.mem_read(buf_addr, count)
print(f" Unicorn syscall write: {data.decode()}")
uc.reg_write(UC_X86_REG_RAX, count) # Return bytes written
except:
uc.reg_write(UC_X86_REG_RAX, -1) # Error
# Add interrupt hook for syscalls
mu.hook_add(UC_HOOK_INTR, hook_syscall)
print("Running low-level analysis with Unicorn...")
try:
mu.emu_start(CODE_BASE, CODE_BASE + len(syscall_code))
print(" Unicorn emulation completed")
# At this point, we could transfer state to Qiling for higher-level analysis
if QILING_AVAILABLE:
print(" (State could be transferred to Qiling for high-level analysis)")
except UcError as e:
print(f" Unicorn emulation error: {e}")
# Integration with other reverse engineering tools
def unicorn_with_capstone():
"""Integration with Capstone disassembler"""
try:
import capstone
CAPSTONE_AVAILABLE = True
except ImportError:
CAPSTONE_AVAILABLE = False
print("Capstone not available. Install with: pip install capstone")
return
print("Unicorn + Capstone Integration")
print("=" * 40)
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map memory
CODE_BASE = 0x400000
mu.mem_map(CODE_BASE, 0x10000)
# Complex code to disassemble and emulate
complex_code = b"\x48\x89\xe5\x48\x83\xec\x20\x48\x89\x7d\xe8\x48\x89\x75\xe0\x48\x8b\x45\xe8\x48\x8b\x55\xe0\x48\x01\xd0\x48\x89\x45\xf8\x48\x8b\x45\xf8\x48\x83\xc4\x20\x5d\xc3"
mu.mem_write(CODE_BASE, complex_code)
# Set up Capstone disassembler
md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
md.detail = True
# Disassemble and emulate with detailed analysis
def hook_instruction_analysis(uc, address, size, user_data):
"""Detailed instruction analysis with Capstone"""
# Read instruction bytes
insn_bytes = uc.mem_read(address, size)
# Disassemble
for insn in md.disasm(insn_bytes, address):
print(f"0x{insn.address:x}: {insn.mnemonic} {insn.op_str}")
# Analyze operands
if insn.operands:
for op in insn.operands:
if op.type == capstone.x86.X86_OP_REG:
reg_name = insn.reg_name(op.reg)
reg_value = uc.reg_read(getattr(capstone.x86, f'X86_REG_{reg_name.upper()}', 0))
print(f" {reg_name}: 0x{reg_value:x}")
elif op.type == capstone.x86.X86_OP_MEM:
print(f" Memory operand: base={op.mem.base}, index={op.mem.index}, disp=0x{op.mem.disp:x}")
break
# Add hook
hook_id = mu.hook_add(UC_HOOK_CODE, hook_instruction_analysis)
# Set up registers
mu.reg_write(UC_X86_REG_RDI, 0x1234)
mu.reg_write(UC_X86_REG_RSI, 0x5678)
mu.reg_write(UC_X86_REG_RSP, 0x7fff8000)
print("Starting emulation with Capstone analysis...")
try:
mu.emu_start(CODE_BASE, CODE_BASE + len(complex_code))
# Check result
result = mu.reg_read(UC_X86_REG_RAX)
print(f"\nEmulation result: 0x{result:x}")
except UcError as e:
print(f"Emulation error: {e}")
mu.hook_del(hook_id)
# Run integration examples
if __name__ == "__main__":
# Unicorn-Qiling comparison
unicorn_qiling_comparison()
print("\n" + "=" * 60 + "\n")
# Advanced integration
advanced_integration_example()
print("\n" + "=" * 60 + "\n")
# Capstone integration
unicorn_with_capstone()
Meilleures pratiques et optimisation
Optimisation des performances
# Performance optimization techniques for Unicorn Engine
import unicorn
from unicorn import *
from unicorn.x86_const import *
import time
import psutil
import gc
class UnicornOptimizer:
def __init__(self):
self.performance_metrics = {}
self.optimization_strategies = {}
def benchmark_emulation(self, arch, mode, code, iterations=1000):
"""Benchmark emulation performance"""
print(f"Benchmarking {iterations} iterations...")
# Baseline measurement
baseline_times = []
for i in range(iterations):
mu = Uc(arch, mode)
mu.mem_map(0x1000000, 2 * 1024 * 1024)
mu.mem_write(0x1000000, code)
start_time = time.perf_counter()
try:
mu.emu_start(0x1000000, 0x1000000 + len(code))
except UcError:
pass
end_time = time.perf_counter()
baseline_times.append(end_time - start_time)
# Clean up
del mu
if i % 100 == 0:
gc.collect()
avg_time = sum(baseline_times) / len(baseline_times)
min_time = min(baseline_times)
max_time = max(baseline_times)
print(f" Average time: {avg_time:.6f}s")
print(f" Min time: {min_time:.6f}s")
print(f" Max time: {max_time:.6f}s")
return {
'average': avg_time,
'min': min_time,
'max': max_time,
'times': baseline_times
}
def optimize_memory_usage(self, mu):
"""Optimize memory usage for emulator"""
# Get current memory usage
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
print(f"Initial memory usage: {initial_memory:.2f} MB")
# Optimization strategies
optimizations = {
'minimal_mapping': self.use_minimal_memory_mapping,
'lazy_allocation': self.implement_lazy_allocation,
'memory_pooling': self.implement_memory_pooling,
'garbage_collection': self.optimize_garbage_collection
}
for name, optimization in optimizations.items():
print(f"Applying {name}...")
optimization(mu)
current_memory = process.memory_info().rss / 1024 / 1024
print(f" Memory after {name}: {current_memory:.2f} MB")
def use_minimal_memory_mapping(self, mu):
"""Use minimal memory mapping strategy"""
# Only map memory that's actually needed
# Avoid large contiguous mappings when possible
# Example: Instead of mapping 1GB, map smaller chunks as needed
pass
def implement_lazy_allocation(self, mu):
"""Implement lazy memory allocation"""
# Map memory only when accessed
def hook_mem_unmapped(uc, access, address, size, value, user_data):
"""Lazy allocation hook"""
# Map page on demand
page_size = 0x1000
page_start = address & ~(page_size - 1)
try:
uc.mem_map(page_start, page_size)
return True
except UcError:
return False
# Add lazy allocation hook
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, hook_mem_unmapped)
def implement_memory_pooling(self, mu):
"""Implement memory pooling for better performance"""
# Pre-allocate memory pools for common sizes
self.memory_pools = {
'small': [], # 4KB pages
'medium': [], # 64KB chunks
'large': [] # 1MB chunks
}
# Initialize pools
for _ in range(10):
self.memory_pools['small'].append(bytearray(4096))
self.memory_pools['medium'].append(bytearray(65536))
self.memory_pools['large'].append(bytearray(1048576))
def optimize_garbage_collection(self, mu):
"""Optimize garbage collection"""
# Force garbage collection
gc.collect()
# Adjust GC thresholds for better performance
gc.set_threshold(700, 10, 10)
def optimize_hook_performance(self, mu):
"""Optimize hook performance"""
# Minimize hook overhead
hook_count = {'count': 0}
def lightweight_hook(uc, address, size, user_data):
"""Lightweight hook with minimal overhead"""
hook_count['count'] += 1
# Only perform expensive operations occasionally
if hook_count['count'] % 1000 == 0:
# Expensive operation here
pass
# Use specific hooks instead of generic ones
mu.hook_add(UC_HOOK_CODE, lightweight_hook)
return hook_count
def batch_emulation_optimization(self, code_samples):
"""Optimize batch emulation of multiple code samples"""
print("Optimizing batch emulation...")
# Strategy 1: Reuse emulator instances
mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(0x1000000, 2 * 1024 * 1024)
results = []
for i, code in enumerate(code_samples):
# Reset state instead of creating new instance
self.reset_emulator_state(mu)
# Write new code
mu.mem_write(0x1000000, code)
try:
start_time = time.perf_counter()
mu.emu_start(0x1000000, 0x1000000 + len(code))
end_time = time.perf_counter()
results.append({
'sample': i,
'success': True,
'time': end_time - start_time
})
except UcError as e:
results.append({
'sample': i,
'success': False,
'error': str(e)
})
return results
def reset_emulator_state(self, mu):
"""Reset emulator state for reuse"""
# Reset registers
if mu.arch == UC_ARCH_X86:
if mu.mode == UC_MODE_64:
registers = [
UC_X86_REG_RAX, UC_X86_REG_RBX, UC_X86_REG_RCX, UC_X86_REG_RDX,
UC_X86_REG_RSI, UC_X86_REG_RDI, UC_X86_REG_RSP, UC_X86_REG_RBP,
UC_X86_REG_R8, UC_X86_REG_R9, UC_X86_REG_R10, UC_X86_REG_R11,
UC_X86_REG_R12, UC_X86_REG_R13, UC_X86_REG_R14, UC_X86_REG_R15
]
else:
registers = [
UC_X86_REG_EAX, UC_X86_REG_EBX, UC_X86_REG_ECX, UC_X86_REG_EDX,
UC_X86_REG_ESI, UC_X86_REG_EDI, UC_X86_REG_ESP, UC_X86_REG_EBP
]
for reg in registers:
mu.reg_write(reg, 0)
# Clear memory (write zeros to mapped regions)
try:
regions = mu.mem_regions()
for start, end, perms in regions:
size = end - start + 1
mu.mem_write(start, b'\x00' * size)
except UcError:
pass
def profile_emulation(self, mu, code):
"""Profile emulation performance"""
profiling_data = {
'instruction_count': 0,
'memory_accesses': 0,
'hook_overhead': 0,
'execution_time': 0
}
def profiling_hook(uc, address, size, user_data):
"""Profiling hook"""
profiling_data['instruction_count'] += 1
def memory_hook(uc, access, address, size, value, user_data):
"""Memory access profiling"""
profiling_data['memory_accesses'] += 1
# Add profiling hooks
code_hook = mu.hook_add(UC_HOOK_CODE, profiling_hook)
mem_hook = mu.hook_add(UC_HOOK_MEM_READ | UC_HOOK_MEM_WRITE, memory_hook)
# Run emulation
start_time = time.perf_counter()
try:
mu.emu_start(0x1000000, 0x1000000 + len(code))
except UcError:
pass
end_time = time.perf_counter()
profiling_data['execution_time'] = end_time - start_time
# Clean up hooks
mu.hook_del(code_hook)
mu.hook_del(mem_hook)
# Calculate metrics
if profiling_data['execution_time'] > 0:
profiling_data['instructions_per_second'] = profiling_data['instruction_count'] / profiling_data['execution_time']
profiling_data['memory_accesses_per_second'] = profiling_data['memory_accesses'] / profiling_data['execution_time']
return profiling_data
def performance_comparison():
"""Compare performance of different optimization strategies"""
print("Performance Comparison")
print("=" * 40)
optimizer = UnicornOptimizer()
# Test code samples
test_codes = [
b"\x48\xc7\xc0\x37\x13\x00\x00\xc3", # mov rax, 0x1337; ret
b"\x48\x31\xc0\x48\xff\xc0\xc3", # xor rax, rax; inc rax; ret
b"\x48\x89\xe5\x48\x83\xc4\x08\x5d\xc3", # mov rbp, rsp; add rsp, 8; pop rbp; ret
]
# Benchmark each code sample
for i, code in enumerate(test_codes):
print(f"\nBenchmarking code sample {i+1}:")
results = optimizer.benchmark_emulation(UC_ARCH_X86, UC_MODE_64, code, iterations=100)
# Profile the emulation
mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(0x1000000, 2 * 1024 * 1024)
mu.mem_write(0x1000000, code)
profile = optimizer.profile_emulation(mu, code)
print(f" Instructions: {profile['instruction_count']}")
print(f" Memory accesses: {profile['memory_accesses']}")
print(f" Instructions/sec: {profile.get('instructions_per_second', 0):.0f}")
# Test batch optimization
print(f"\nTesting batch optimization:")
batch_results = optimizer.batch_emulation_optimization(test_codes * 10)
successful = sum(1 for r in batch_results if r['success'])
total_time = sum(r.get('time', 0) for r in batch_results if r['success'])
print(f" Successful emulations: {successful}/{len(batch_results)}")
print(f" Total time: {total_time:.6f}s")
print(f" Average time per emulation: {total_time/successful:.6f}s")
def memory_optimization_example():
"""Example of memory optimization techniques"""
print("Memory Optimization Example")
print("=" * 40)
optimizer = UnicornOptimizer()
# Create emulator
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Apply optimizations
optimizer.optimize_memory_usage(mu)
# Test with large memory operations
LARGE_SIZE = 10 * 1024 * 1024 # 10MB
try:
# Map large memory region
mu.mem_map(0x10000000, LARGE_SIZE)
# Write pattern to memory
pattern = b"ABCD" * 1024 # 4KB pattern
for offset in range(0, LARGE_SIZE, len(pattern)):
if offset + len(pattern) <= LARGE_SIZE:
mu.mem_write(0x10000000 + offset, pattern)
print(f"Successfully allocated and wrote to {LARGE_SIZE} bytes")
# Read back and verify
read_data = mu.mem_read(0x10000000, len(pattern))
if read_data == pattern:
print("Memory verification successful")
except UcError as e:
print(f"Memory operation failed: {e}")
# Get final memory usage
process = psutil.Process()
final_memory = process.memory_info().rss / 1024 / 1024
print(f"Final memory usage: {final_memory:.2f} MB")
def best_practices_guide():
"""Guide to Unicorn Engine best practices"""
practices = {
"Performance": [
"Reuse emulator instances when possible",
"Use minimal memory mappings",
"Implement lazy memory allocation",
"Optimize hook functions for minimal overhead",
"Use specific hooks instead of generic ones",
"Batch similar operations together",
"Profile your emulation to identify bottlenecks"
],
"Memory Management": [
"Map only necessary memory regions",
"Use appropriate memory permissions",
"Implement memory pooling for frequent allocations",
"Clean up unused memory mappings",
"Monitor memory usage in long-running emulations",
"Use memory-mapped files for large data sets"
],
"Error Handling": [
"Always handle UcError exceptions",
"Implement proper cleanup in finally blocks",
"Use hooks to handle invalid memory accesses",
"Validate input data before emulation",
"Set reasonable timeouts for emulation",
"Log errors with sufficient context"
],
"Security": [
"Validate all input data",
"Use appropriate memory permissions",
"Implement timeouts to prevent infinite loops",
"Sanitize hook callback data",
"Be cautious with self-modifying code",
"Isolate emulation environments"
],
"Debugging": [
"Use instruction hooks for tracing",
"Implement register and memory dumping",
"Log emulation state at key points",
"Use conditional hooks for specific addresses",
"Implement step-by-step debugging",
"Validate emulation results"
]
}
print("Unicorn Engine Best Practices")
print("=" * 40)
for category, tips in practices.items():
print(f"\n{category}:")
for tip in tips:
print(f" • {tip}")
return practices
# Run optimization examples
if __name__ == "__main__":
# Performance comparison
performance_comparison()
print("\n" + "=" * 60 + "\n")
# Memory optimization
memory_optimization_example()
print("\n" + "=" * 60 + "\n")
# Best practices guide
best_practices_guide()
Ressources et documentation
Ressources officielles
- [Site Web du moteur Unicorn] (LINK_20) - Site Web officiel et documentation
- [Résistoire de GitHub Unicorn] (LINK_20) - Code source et numéros
- [Documentation de l'API Unicorn] (LINK_20) - Référence complète de l'API
- [Reliures licornes] (LINK_20) - Reliures linguistiques
Ressources pédagogiques
- [Didacticiel moteur de licorne] (LINK_20) - Didacticiel officiel
- [Exemples de licorne] (LINK_20) - Échantillons officiels de code
- [Ingénierie inverse avec licorne] (LINK_20) - tutoriel complet
- [Licorne pour l'analyse des malwares] (LINK_20) - Applications pratiques
Outils connexes et intégration
- [Cadre de calcul] (LINK_20) - Cadre d'émulation de haut niveau construit sur Licorne
- [Capstone Engine] (LINK_20) - Cadre de démontage (paire bien avec Licorne)
- [Keystone Engine] (LINK_20) - Cadre de montage (complément à Licorne)
- Radare2 - Cadre d'ingénierie inverse avec intégration de Licorne
Communauté et soutien
- Unicorn Engine Slack - Discussion communautaire
- [Débordement de la pile] (LINK_20) - Questions et réponses avec étiquette moteur à licorne
- [Reddit r/ReverseEngineering] (LINK_20) - Discussions communautaires
- [Liste d'envoi des lettres] (LINK_20) - Discussions sur le développement
Sujets avancés
- [Licorne intérieur du moteur] (LINK_20) - Comprendre les intérieurs
- Soutien à l'architecture personnalisée - Ajout de nouvelles architectures
- [Performance Optimization] (LINK_20) - Techniques d'optimisation
- [Fuzzing with Licorne] (LINK_20) - Applications de flou