FRONTMATTER_31_# Rizin Cheat Foglio¶
Panoramica¶
Rizin è un potente, libero e open-source reverse engineering framework e toolkit di linea di comando che funge da base per analisi binarie moderne e flussi di lavoro reverse engineering. Originariamente predetto da Radare2 per affrontare le preoccupazioni della comunità riguardo alla direzione dello sviluppo e alla governance, Rizin si è evoluto in una piattaforma completa che fornisce profonde capacità di analisi binarie attraverso una ricca interfaccia di riga di comando e un ampio supporto di scripting. Il framework supporta più architetture tra cui x86, x64, ARM, MIPS, PowerPC, SPARC e molti altri, rendendolo adatto per l'analisi di binari da virtualmente qualsiasi piattaforma tra cui Windows, Linux, macOS, Android, iOS e sistemi incorporati.
La forza fondamentale di Rizin si trova nella sua architettura modulare e nel suo ampio set di comandi che consente il controllo fine-grained su ogni aspetto dell'analisi binaria. Dalle operazioni di disassembly e hexdump di base alle funzioni avanzate come l'analisi della funzione, la generazione del grafo di flusso di controllo, il rilevamento della funzione crittografica e la scoperta automatizzata della vulnerabilità, Rizin fornisce i blocchi di costruzione per flussi di lavoro di reverse engineering completi. Il sistema plugin del framework consente una personalizzazione e un'integrazione estesa con strumenti esterni, mentre le sue capacità di scripting in più lingue tra cui Python, JavaScript e Lua consentono l'automazione di complesse attività di analisi.
L'interfaccia di linea di comando di Rizin, mentre inizialmente appare complessa, segue una struttura logica che diventa intuitiva con la pratica. I comandi sono organizzati in categorie con convenzioni di denominazione coerente, e il quadro fornisce sistemi di aiuto e documentazione vasti. La capacità dello strumento di gestire vari formati di file, eseguire analisi in-memory, supportare le operazioni di debug, e integrare con altri strumenti di sicurezza lo rende un componente essenziale di qualsiasi serio reverse engineering toolkit. La sua comunità attiva di sviluppo e l'impegno per i principi open source garantiscono un miglioramento continuo e un adattamento ai requisiti emergenti di minacce e analisi.
Installazione¶
## Package Manager Installazione¶
Installazione di Rizin attraverso i gestori dei pacchetti di sistema:
# Ubuntu/Debian installation
sudo apt update
sudo apt install rizin
# Alternative: Add official PPA
sudo add-apt-repository ppa:rizin-team/rizin
sudo apt update
sudo apt install rizin
# Kali Linux (usually pre-installed)
sudo apt install rizin
# CentOS/RHEL installation
sudo yum install epel-release
sudo yum install rizin
# Arch Linux installation
sudo pacman -S rizin
# macOS installation
brew install rizin
# Verify installation
rz-bin --version
rz-asm --version
rizin --version
Source Compilation¶
Costruire Rizin dal codice sorgente:
# Install build dependencies
sudo apt install git build-essential cmake meson ninja-build
sudo apt install libssl-dev libzip-dev liblz4-dev
# Clone repository
git clone https://github.com/rizinorg/rizin.git
cd rizin
# Configure build with meson
meson setup build
meson configure build
# Compile
ninja -C build
# Install
sudo ninja -C build install
# Update library cache
sudo ldconfig
# Verify installation
rizin -v
Installazione Docker¶
# Create Rizin Docker environment
cat > Dockerfile ``<< 'EOF'
FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
rizin \
python3 python3-pip \
curl wget git \
&& rm -rf /var/lib/apt/lists/*
# Install additional tools
RUN pip3 install r2pipe
WORKDIR /analysis
CMD ["rizin"]
EOF
# Build container
docker build -t rizin-analysis .
# Run Rizin
docker run -it -v $(pwd)/samples:/analysis rizin-analysis
# Example usage
docker run -it rizin-analysis rizin /analysis/binary_sample
Plugin Installazione¶
# Install common plugins
# r2ghidra (Ghidra decompiler)
rz-pm install r2ghidra
# r2dec (decompiler)
rz-pm install r2dec
# r2yara (YARA integration)
rz-pm install r2yara
# List available packages
rz-pm list
# Update package database
rz-pm update
# Install from source
git clone https://github.com/rizinorg/rz-ghidra.git
cd rz-ghidra
mkdir build && cd build
cmake ..
make -j$(nproc)
sudo make install
Uso di base¶
Apertura e caricamento dei file¶
Operazioni di file di base in Rizin:
# Open file for analysis
rizin binary_file
# Open with specific options
rizin -A binary_file # Auto-analysis
rizin -AA binary_file # More aggressive analysis
rizin -AAA binary_file # Most aggressive analysis
# Open with specific architecture
rizin -a x86 -b 32 binary_file
rizin -a arm -b 64 binary_file
# Open raw binary
rizin -B 0x1000 raw_binary # Set base address
# Open with debugging
rizin -d binary_file
# Open URL
rizin http://example.com/binary
# Open multiple files
rizin file1 file2 file3
# Load file in existing session
o binary_file # Open file
o # List open files
o-1 # Close file descriptor 1
Basic Navigation¶
Navigando attraverso il binario:
# Seek (navigate) commands
s 0x401000 # Seek to address
s main # Seek to function
s +10 # Seek forward 10 bytes
s -5 # Seek backward 5 bytes
s # Show current address
# Print current location
pwd # Print working directory (current address)
# Navigation history
s- # Go back in seek history
s+ # Go forward in seek history
s* # Show seek history
# Bookmarks
f bookmark_name @ 0x401000 # Create bookmark
f # List bookmarks
f-bookmark_name # Delete bookmark
# Search and navigate
/ string # Search for string
/x 4142 # Search for hex bytes
/r # Search for references
Informazioni di base¶
Ottenere informazioni di base sul binario:
# File information
i # Basic file info
ii # Imports
ie # Exports
is # Symbols
iz # Strings
iS # Sections
ih # Headers
ir # Relocations
# Detailed information
ij # JSON format info
iij # Imports in JSON
iej # Exports in JSON
isj # Symbols in JSON
# Architecture information
ia # Architecture info
ib # Basic blocks
ic # Classes (ObjC/C++)
# Entry points
ie # Entry points
iE # Constructors/destructors
Analisi avanzata¶
Analisi funzione¶
Analisi completa delle funzioni:
# Function analysis
af # Analyze function at current address
afl # List all functions
afi # Function information
afv # Function variables
afc # Function calling convention
# Advanced function analysis
af @@ sym.* # Analyze all symbols as functions
afr # Analyze function recursively
afs # Function signature
aft # Function types
# Function manipulation
afn new_name # Rename function
afu 0x401000 # Undefine function
af+ 0x401000 100 func_name # Create function manually
# Function statistics
afS # Function statistics
afC # Function complexity
afB # Function basic blocks
# Cross-references
axt # Cross-references to current address
axf # Cross-references from current address
ax # List all cross-references
Analisi dello stress¶
Analisi avanzata delle stringhe:
# String commands
iz # List strings
izz # List all strings (including data)
izzz # Search for strings in whole binary
# String filtering
iz~password # Filter strings containing "password"
iz~^http # Strings starting with "http"
iz|grep -i "error" # Case-insensitive search
# String analysis
iza # Analyze strings
izj # Strings in JSON format
# Custom string search
/s # Search for strings
/s 10 # Search for strings min 10 chars
/w password # Search for wide strings
# String extraction
ps @ str.password # Print string at address
psu @ str.password # Print string UTF-8
psw @ str.password # Print wide string
Disassembly and Code Analysis¶
Operazioni di smontaggio avanzate:
# Disassembly commands
pd # Print disassembly
pd 20 # Print 20 instructions
pD 100 # Print 100 bytes as disassembly
pdf # Print disassembly of function
# Disassembly formatting
pdc # Disassembly with C syntax
pdj # Disassembly in JSON
pds # Disassembly with summary
pdt # Disassembly with timestamps
# Code analysis
aa # Analyze all
aaa # Analyze all (more aggressive)
aaaa # Analyze all (most aggressive)
aac # Analyze function calls
aan # Analyze function names
# Control flow analysis
agf # Function call graph
agc # Function call graph (compact)
agg # Basic block graph
agj # Graph in JSON format
# Pattern analysis
/p # Search for patterns
/p 90 90 90 # Search for NOP patterns
/p ff 25 # Search for JMP patterns
Memoria e analisi dei dati¶
Esame della memoria e analisi dei dati:
# Memory examination
px # Print hexdump
px 100 # Print 100 bytes hex
pxw # Print hex words (32-bit)
pxq # Print hex qwords (64-bit)
# Data interpretation
pd # Print as disassembly
ps # Print as string
pf # Print formatted data
pt # Print timestamps
# Data types
pf d # Print as DWORD
pf q # Print as QWORD
pf s # Print as string
pf [10]b # Print 10 bytes
# Memory maps
dm # List memory maps
dmm # Memory map details
dmi # Memory map info
dmh # Heap information
# Memory search
/ # Search in memory
/x 41414141 # Search hex pattern
/r esp # Search references to ESP
/R # Search ROP gadgets
Automation Scripts¶
Analisi binaria completa¶
#!/usr/bin/env python3
# Comprehensive binary analysis with Rizin
import r2pipe
import json
import os
import hashlib
from datetime import datetime
class RizinBinaryAnalyzer:
def __init__(self, binary_path):
self.binary_path = binary_path
self.r2 = r2pipe.open(binary_path)
self.analysis_results = \\\{\\\}
# Calculate file hash
with open(binary_path, 'rb') as f:
self.file_hash = hashlib.sha256(f.read()).hexdigest()
def basic_analysis(self):
"""Perform basic binary analysis"""
print("Performing basic analysis...")
# Auto-analyze
self.r2.cmd("aaa")
# Basic information
self.analysis_results["file_info"] = self.r2.cmdj("ij")
self.analysis_results["sections"] = self.r2.cmdj("iSj")
self.analysis_results["imports"] = self.r2.cmdj("iij")
self.analysis_results["exports"] = self.r2.cmdj("iej")
self.analysis_results["symbols"] = self.r2.cmdj("isj")
self.analysis_results["strings"] = self.r2.cmdj("izj")
print(f"Found \\\{len(self.analysis_results['imports'])\\\} imports")
print(f"Found \\\{len(self.analysis_results['exports'])\\\} exports")
print(f"Found \\\{len(self.analysis_results['strings'])\\\} strings")
def function_analysis(self):
"""Analyze functions in the binary"""
print("Analyzing functions...")
# Get function list
functions = self.r2.cmdj("aflj")
self.analysis_results["functions"] = functions
# Analyze each function
function_details = []
for func in functions:
func_addr = func.get("offset")
func_name = func.get("name", "")
# Get function details
self.r2.cmd(f"s \\\{func_addr\\\}")
func_info = self.r2.cmdj("afij")
if func_info:
func_detail = func_info[0]
# Add cross-references
xrefs_to = self.r2.cmdj(f"axtj @ \\\{func_addr\\\}")
xrefs_from = self.r2.cmdj(f"axfj @ \\\{func_addr\\\}")
func_detail["xrefs_to"] = xrefs_to or []
func_detail["xrefs_from"] = xrefs_from or []
# Add complexity metrics
complexity = self.calculate_function_complexity(func_addr)
func_detail["complexity"] = complexity
function_details.append(func_detail)
self.analysis_results["function_details"] = function_details
print(f"Analyzed \\\{len(function_details)\\\} functions")
def calculate_function_complexity(self, func_addr):
"""Calculate function complexity metrics"""
# Get function information
func_info = self.r2.cmdj(f"afij @ \\\{func_addr\\\}")
if not func_info:
return \\\{\\\}
func_data = func_info[0]
# Basic metrics
complexity = \\\{
"size": func_data.get("size", 0),
"instructions": func_data.get("ninstr", 0),
"basic_blocks": func_data.get("nbbs", 0),
"cyclomatic_complexity": func_data.get("cc", 0),
"edges": func_data.get("edges", 0)
\\\}
# Calculate additional metrics
if complexity["basic_blocks"] >`` 0:
complexity["avg_bb_size"] = complexity["size"] / complexity["basic_blocks"]
return complexity
def string_analysis(self):
"""Analyze strings for suspicious content"""
print("Analyzing strings...")
strings = self.analysis_results.get("strings", [])
suspicious_strings = []
# Suspicious patterns
suspicious_patterns = [
r"http[s]?://",
r"\b\d\\\\{1,3\\\\}\.\d\\\\{1,3\\\\}\.\d\\\\{1,3\\\\}\.\d\\\\{1,3\\\\}\b",
r"[A-Za-z0-9+/]\\\\{20,\\\\}=\\\\{0,2\\\\}",
r"cmd\.exe|powershell|bash|sh",
r"CreateProcess|ShellExecute|WinExec",
r"RegOpenKey|RegSetValue|RegDeleteKey",
r"socket|connect|send|recv",
r"password|secret|key|token"
]
import re
for string_obj in strings:
string_value = string_obj.get("string", "")
for pattern in suspicious_patterns:
if re.search(pattern, string_value, re.IGNORECASE):
suspicious_strings.append(\\\\{
"string": string_obj,
"pattern": pattern,
"category": self.categorize_string_pattern(pattern)
\\\\})
break
self.analysis_results["suspicious_strings"] = suspicious_strings
print(f"Found \\\\{len(suspicious_strings)\\\\} suspicious strings")
def categorize_string_pattern(self, pattern):
"""Categorize string patterns"""
categories = \\\\{
r"http[s]?://": "network",
r"\b\d\\\\{1,3\\\\}\.\d\\\\{1,3\\\\}\.\d\\\\{1,3\\\\}\.\d\\\\{1,3\\\\}\b": "network",
r"[A-Za-z0-9+/]\\\\{20,\\\\}=\\\\{0,2\\\\}": "encoding",
r"cmd\.exe|powershell|bash|sh": "execution",
r"CreateProcess|ShellExecute|WinExec": "execution",
r"RegOpenKey|RegSetValue|RegDeleteKey": "registry",
r"socket|connect|send|recv": "network",
r"password|secret|key|token": "credentials"
\\\\}
return categories.get(pattern, "unknown")
def crypto_analysis(self):
"""Analyze for cryptographic functions and constants"""
print("Analyzing cryptographic indicators...")
crypto_indicators = []
# Known crypto constants
crypto_constants = \\\\{
"0x67452301": "MD5/SHA1 initialization",
"0xefcdab89": "MD5/SHA1 initialization",
"0x98badcfe": "MD5/SHA1 initialization",
"0x10325476": "MD5/SHA1 initialization",
"0x6a09e667": "SHA-256 initialization",
"0xbb67ae85": "SHA-256 initialization",
"0x3c6ef372": "SHA-256 initialization",
"0xa54ff53a": "SHA-256 initialization"
\\\\}
# Search for crypto constants
for constant, description in crypto_constants.items():
result = self.r2.cmd(f"/x \\\\{constant[2:]\\\\}") # Remove 0x prefix
if result.strip():
crypto_indicators.append(\\\\{
"type": "constant",
"value": constant,
"description": description,
"locations": result.strip().split('\n')
\\\\})
# Search for crypto function names
crypto_functions = [
"md5", "sha1", "sha256", "sha512", "aes", "des", "rsa",
"encrypt", "decrypt", "cipher", "hash", "hmac"
]
functions = self.analysis_results.get("functions", [])
for func in functions:
func_name = func.get("name", "").lower()
for crypto_func in crypto_functions:
if crypto_func in func_name:
crypto_indicators.append(\\\\{
"type": "function",
"name": func.get("name"),
"address": func.get("offset"),
"crypto_type": crypto_func
\\\\})
self.analysis_results["crypto_indicators"] = crypto_indicators
print(f"Found \\\\{len(crypto_indicators)\\\\} cryptographic indicators")
def vulnerability_analysis(self):
"""Analyze for potential vulnerabilities"""
print("Analyzing for potential vulnerabilities...")
vulnerabilities = []
# Dangerous functions
dangerous_functions = \\\\{
"strcpy": "Buffer overflow risk",
"strcat": "Buffer overflow risk",
"sprintf": "Buffer overflow risk",
"gets": "Buffer overflow risk",
"scanf": "Format string vulnerability",
"printf": "Format string vulnerability (if user input)",
"system": "Command injection risk",
"exec": "Command injection risk",
"eval": "Code injection risk"
\\\\}
imports = self.analysis_results.get("imports", [])
for import_obj in imports:
import_name = import_obj.get("name", "")
for dangerous_func, description in dangerous_functions.items():
if dangerous_func in import_name.lower():
vulnerabilities.append(\\\\{
"type": "dangerous_function",
"function": import_name,
"risk": description,
"severity": self.assess_function_severity(dangerous_func)
\\\\})
# Check for stack canaries and security features
security_features = self.check_security_features()
vulnerabilities.extend(security_features)
self.analysis_results["vulnerabilities"] = vulnerabilities
print(f"Found \\\\{len(vulnerabilities)\\\\} potential vulnerabilities")
def assess_function_severity(self, function_name):
"""Assess severity of dangerous function"""
high_risk = ["gets", "strcpy", "system", "exec"]
medium_risk = ["strcat", "sprintf", "scanf"]
if function_name in high_risk:
return "high"
elif function_name in medium_risk:
return "medium"
else:
return "low"
def check_security_features(self):
"""Check for security features and mitigations"""
security_issues = []
file_info = self.analysis_results.get("file_info", \\\\{\\\\})
# Check for stack canaries
if not file_info.get("canary", False):
security_issues.append(\\\\{
"type": "missing_mitigation",
"feature": "stack_canary",
"description": "Stack canary protection not enabled",
"severity": "medium"
\\\\})
# Check for NX bit
if not file_info.get("nx", False):
security_issues.append(\\\\{
"type": "missing_mitigation",
"feature": "nx_bit",
"description": "NX bit protection not enabled",
"severity": "high"
\\\\})
# Check for ASLR
if not file_info.get("pic", False):
security_issues.append(\\\\{
"type": "missing_mitigation",
"feature": "aslr",
"description": "ASLR protection not enabled",
"severity": "medium"
\\\\})
return security_issues
def generate_report(self, output_file=None):
"""Generate comprehensive analysis report"""
if not output_file:
output_file = f"rizin_analysis_\\\\{self.file_hash[:8]\\\\}.json"
# Calculate summary statistics
summary = \\\\{
"file_hash": self.file_hash,
"file_path": self.binary_path,
"analysis_timestamp": datetime.now().isoformat(),
"total_functions": len(self.analysis_results.get("functions", [])),
"total_imports": len(self.analysis_results.get("imports", [])),
"total_exports": len(self.analysis_results.get("exports", [])),
"total_strings": len(self.analysis_results.get("strings", [])),
"suspicious_strings": len(self.analysis_results.get("suspicious_strings", [])),
"crypto_indicators": len(self.analysis_results.get("crypto_indicators", [])),
"vulnerabilities": len(self.analysis_results.get("vulnerabilities", []))
\\\\}
report = \\\\{
"summary": summary,
"analysis_results": self.analysis_results
\\\\}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"Analysis report saved: \\\\{output_file\\\\}")
return report
def run_full_analysis(self):
"""Run complete binary analysis"""
print(f"Starting comprehensive analysis of: \\\\{self.binary_path\\\\}")
self.basic_analysis()
self.function_analysis()
self.string_analysis()
self.crypto_analysis()
self.vulnerability_analysis()
report = self.generate_report()
# Close Rizin session
self.r2.quit()
print("Analysis completed successfully!")
return report
# Usage
if __name__ == "__main__":
binary_path = "/path/to/binary"
analyzer = RizinBinaryAnalyzer(binary_path)
report = analyzer.run_full_analysis()
Rilevazione automatizzata della famiglia Malware¶
#!/usr/bin/env python3
# Malware family detection using Rizin
import r2pipe
import json
import hashlib
from collections import Counter
class MalwareFamilyDetector:
def __init__(self):
self.family_signatures = self.load_family_signatures()
self.analysis_results = \\\\{\\\\}
def load_family_signatures(self):
"""Load malware family signatures"""
# This would typically load from a database
# For demo purposes, we'll use hardcoded signatures
return \\\\{
"wannacry": \\\\{
"strings": [
"tasksche.exe",
"Wanna Decryptor",
".WNCRY",
"msg/m_bulgarian.wnry"
],
"imports": [
"CryptAcquireContextA",
"CryptGenKey",
"CryptEncrypt"
],
"file_patterns": [
"taskdl.exe",
"taskse.exe"
]
\\\\},
"emotet": \\\\{
"strings": [
"RegOpenKeyExW",
"SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run",
"powershell.exe"
],
"imports": [
"URLDownloadToFileW",
"ShellExecuteW",
"CreateProcessW"
],
"network_indicators": [
"POST",
"User-Agent:",
"Content-Type: application/x-www-form-urlencoded"
]
\\\\},
"trickbot": \\\\{
"strings": [
"<moduleconfig>",
"injectDll",
"systeminfo",
"nltest /domain_trusts"
],
"imports": [
"WinHttpOpen",
"WinHttpConnect",
"WinHttpSendRequest"
],
"file_patterns": [
"client_id",
"group_tag"
]
\\\\}
\\\\}
def analyze_binary(self, binary_path):
"""Analyze binary for malware family indicators"""
print(f"Analyzing binary: \\\\{binary_path\\\\}")
r2 = r2pipe.open(binary_path)
r2.cmd("aaa") # Auto-analyze
# Extract features
features = \\\\{
"strings": self.extract_strings(r2),
"imports": self.extract_imports(r2),
"functions": self.extract_functions(r2),
"sections": self.extract_sections(r2),
"file_info": r2.cmdj("ij")
\\\\}
# Calculate file hash
with open(binary_path, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
features["file_hash"] = file_hash
r2.quit()
return features
def extract_strings(self, r2):
"""Extract strings from binary"""
strings_data = r2.cmdj("izj")
return [s.get("string", "") for s in strings_data if s.get("string")]
def extract_imports(self, r2):
"""Extract imports from binary"""
imports_data = r2.cmdj("iij")
return [imp.get("name", "") for imp in imports_data if imp.get("name")]
def extract_functions(self, r2):
"""Extract function names from binary"""
functions_data = r2.cmdj("aflj")
return [func.get("name", "") for func in functions_data if func.get("name")]
def extract_sections(self, r2):
"""Extract section information"""
sections_data = r2.cmdj("iSj")
return [
\\\\{
"name": sec.get("name", ""),
"size": sec.get("size", 0),
"entropy": sec.get("entropy", 0)
\\\\}
for sec in sections_data
]
def match_family_signatures(self, features):
"""Match features against malware family signatures"""
family_scores = \\\\{\\\\}
for family_name, signatures in self.family_signatures.items():
score = 0
matches = []
# Check string matches
if "strings" in signatures:
for sig_string in signatures["strings"]:
for binary_string in features["strings"]:
if sig_string.lower() in binary_string.lower():
score += 2
matches.append(\\\\{
"type": "string",
"signature": sig_string,
"match": binary_string
\\\\})
# Check import matches
if "imports" in signatures:
for sig_import in signatures["imports"]:
for binary_import in features["imports"]:
if sig_import.lower() in binary_import.lower():
score += 3
matches.append(\\\\{
"type": "import",
"signature": sig_import,
"match": binary_import
\\\\})
# Check function name matches
if "functions" in signatures:
for sig_func in signatures["functions"]:
for binary_func in features["functions"]:
if sig_func.lower() in binary_func.lower():
score += 1
matches.append(\\\\{
"type": "function",
"signature": sig_func,
"match": binary_func
\\\\})
if score > 0:
family_scores[family_name] = \\\\{
"score": score,
"matches": matches,
"confidence": self.calculate_confidence(score, len(matches))
\\\\}
return family_scores
def calculate_confidence(self, score, match_count):
"""Calculate confidence level based on score and matches"""
if score >= 10 and match_count >= 5:
return "high"
elif score >= 5 and match_count >= 3:
return "medium"
elif score >= 2 and match_count >= 1:
return "low"
else:
return "very_low"
def detect_family(self, binary_path):
"""Detect malware family for given binary"""
# Analyze binary
features = self.analyze_binary(binary_path)
# Match against signatures
family_matches = self.match_family_signatures(features)
# Determine most likely family
if family_matches:
best_match = max(family_matches.items(), key=lambda x: x[1]["score"])
family_name, match_data = best_match
result = \\\\{
"binary_path": binary_path,
"file_hash": features["file_hash"],
"detected_family": family_name,
"confidence": match_data["confidence"],
"score": match_data["score"],
"matches": match_data["matches"],
"all_matches": family_matches
\\\\}
else:
result = \\\\{
"binary_path": binary_path,
"file_hash": features["file_hash"],
"detected_family": "unknown",
"confidence": "none",
"score": 0,
"matches": [],
"all_matches": \\\\{\\\\}
\\\\}
return result
def batch_detection(self, binary_paths):
"""Perform batch malware family detection"""
results = []
for binary_path in binary_paths:
try:
result = self.detect_family(binary_path)
results.append(result)
print(f"Analyzed: \\\\{binary_path\\\\}")
print(f"Family: \\\\{result['detected_family']\\\\} (confidence: \\\\{result['confidence']\\\\})")
except Exception as e:
print(f"Error analyzing \\\\{binary_path\\\\}: \\\\{e\\\\}")
results.append(\\\\{
"binary_path": binary_path,
"error": str(e),
"detected_family": "error"
\\\\})
return results
def generate_detection_report(self, results, output_file="family_detection_report.json"):
"""Generate family detection report"""
# Calculate statistics
family_counts = Counter([r.get("detected_family", "unknown") for r in results])
confidence_counts = Counter([r.get("confidence", "none") for r in results])
report = \\\\{
"detection_summary": \\\\{
"total_samples": len(results),
"family_distribution": dict(family_counts),
"confidence_distribution": dict(confidence_counts)
\\\\},
"detection_results": results
\\\\}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"Detection report saved: \\\\{output_file\\\\}")
return report
# Usage
if __name__ == "__main__":
detector = MalwareFamilyDetector()
# Single binary detection
result = detector.detect_family("/path/to/malware/sample")
print(f"Detected family: \\\\{result['detected_family']\\\\}")
# Batch detection
binary_paths = [
"/path/to/sample1",
"/path/to/sample2",
"/path/to/sample3"
]
results = detector.batch_detection(binary_paths)
report = detector.generate_detection_report(results)
ROP Gadget Analysis¶
#!/usr/bin/env python3
# ROP gadget analysis with Rizin
import r2pipe
import re
import json
class ROPGadgetAnalyzer:
def __init__(self, binary_path):
self.binary_path = binary_path
self.r2 = r2pipe.open(binary_path)
self.gadgets = []
def find_rop_gadgets(self, max_gadget_length=5):
"""Find ROP gadgets in the binary"""
print("Searching for ROP gadgets...")
# Use Rizin's built-in ROP search
rop_output = self.r2.cmd("/R")
if rop_output:
self.parse_rop_output(rop_output)
# Custom gadget search
self.find_custom_gadgets(max_gadget_length)
print(f"Found \\\\{len(self.gadgets)\\\\} ROP gadgets")
return self.gadgets
def parse_rop_output(self, rop_output):
"""Parse Rizin ROP search output"""
lines = rop_output.strip().split('\n')
for line in lines:
if line.strip():
# Parse ROP gadget line
# Format: address: instruction; instruction; ret
parts = line.split(':', 1)
if len(parts) == 2:
address = parts[0].strip()
instructions = parts[1].strip()
try:
addr_int = int(address, 16)
self.gadgets.append(\\\\{
"address": addr_int,
"address_hex": address,
"instructions": instructions,
"type": "rop",
"source": "rizin_builtin"
\\\\})
except ValueError:
continue
def find_custom_gadgets(self, max_length):
"""Find custom ROP gadgets"""
# Search for specific instruction patterns
patterns = [
"pop.*ret",
"mov.*ret",
"add.*ret",
"sub.*ret",
"xor.*ret",
"call.*",
"jmp.*"
]
for pattern in patterns:
self.search_instruction_pattern(pattern, max_length)
def search_instruction_pattern(self, pattern, max_length):
"""Search for specific instruction patterns"""
# Get all executable sections
sections = self.r2.cmdj("iSj")
for section in sections:
if section.get("perm", "").find("x") != -1: # Executable section
section_addr = section.get("vaddr", 0)
section_size = section.get("vsize", 0)
if section_size > 0:
self.search_section_for_pattern(
section_addr,
section_size,
pattern,
max_length
)
def search_section_for_pattern(self, start_addr, size, pattern, max_length):
"""Search section for instruction patterns"""
# Disassemble section
self.r2.cmd(f"s \\\\{start_addr\\\\}")
disasm = self.r2.cmd(f"pd \\\\{size // 4\\\\}") # Approximate instruction count
lines = disasm.split('\n')
for i, line in enumerate(lines):
if re.search(pattern, line, re.IGNORECASE):
# Found potential gadget start
gadget = self.extract_gadget(lines, i, max_length)
if gadget:
self.gadgets.append(gadget)
def extract_gadget(self, disasm_lines, start_index, max_length):
"""Extract gadget from disassembly lines"""
gadget_instructions = []
gadget_address = None
for i in range(start_index, min(start_index + max_length, len(disasm_lines))):
line = disasm_lines[i].strip()
if not line:
continue
# Parse instruction line
# Format: address instruction operands
parts = line.split(None, 2)
if len(parts) >= 2:
addr_part = parts[0]
instr_part = parts[1]
operands_part = parts[2] if len(parts) > 2 else ""
if gadget_address is None:
try:
gadget_address = int(addr_part, 16)
except ValueError:
continue
instruction = f"\\\\{instr_part\\\\} \\\\{operands_part\\\\}".strip()
gadget_instructions.append(instruction)
# Check for gadget terminator
if instr_part.lower() in ["ret", "retn", "retf"]:
break
elif instr_part.lower().startswith("jmp"):
break
elif instr_part.lower().startswith("call"):
break
if gadget_instructions and gadget_address:
return \\\\{
"address": gadget_address,
"address_hex": f"0x\\\\{gadget_address:x\\\\}",
"instructions": "; ".join(gadget_instructions),
"instruction_count": len(gadget_instructions),
"type": "custom",
"source": "pattern_search"
\\\\}
return None
def classify_gadgets(self):
"""Classify gadgets by functionality"""
classified = \\\\{
"stack_pivot": [],
"arithmetic": [],
"memory_access": [],
"control_flow": [],
"system_calls": [],
"other": []
\\\\}
for gadget in self.gadgets:
instructions = gadget["instructions"].lower()
# Stack pivot gadgets
if any(pattern in instructions for pattern in ["xchg", "mov esp", "mov rsp", "leave"]):
classified["stack_pivot"].append(gadget)
# Arithmetic gadgets
elif any(pattern in instructions for pattern in ["add", "sub", "mul", "div", "xor", "or", "and"]):
classified["arithmetic"].append(gadget)
# Memory access gadgets
elif any(pattern in instructions for pattern in ["mov", "lea", "push", "pop"]):
classified["memory_access"].append(gadget)
# Control flow gadgets
elif any(pattern in instructions for pattern in ["jmp", "call", "ret"]):
classified["control_flow"].append(gadget)
# System call gadgets
elif any(pattern in instructions for pattern in ["int", "syscall", "sysenter"]):
classified["system_calls"].append(gadget)
else:
classified["other"].append(gadget)
return classified
def find_rop_chains(self, target_operations):
"""Find potential ROP chains for specific operations"""
chains = []
for operation in target_operations:
chain = self.build_rop_chain(operation)
if chain:
chains.append(\\\\{
"operation": operation,
"chain": chain
\\\\})
return chains
def build_rop_chain(self, operation):
"""Build ROP chain for specific operation"""
# This is a simplified example
# Real ROP chain building is much more complex
if operation == "execve":
# Look for gadgets to set up execve system call
required_gadgets = [
"pop rax", # System call number
"pop rdi", # First argument
"pop rsi", # Second argument
"pop rdx", # Third argument
"syscall" # System call
]
chain = []
for required in required_gadgets:
matching_gadget = self.find_matching_gadget(required)
if matching_gadget:
chain.append(matching_gadget)
else:
return None # Chain incomplete
return chain
return None
def find_matching_gadget(self, pattern):
"""Find gadget matching specific pattern"""
for gadget in self.gadgets:
if pattern.lower() in gadget["instructions"].lower():
return gadget
return None
def generate_rop_report(self, output_file="rop_analysis.json"):
"""Generate ROP analysis report"""
# Classify gadgets
classified = self.classify_gadgets()
# Find common ROP chains
target_operations = ["execve", "mprotect", "mmap"]
chains = self.find_rop_chains(target_operations)
report = \\\\{
"binary_path": self.binary_path,
"total_gadgets": len(self.gadgets),
"gadget_classification": \\\\{
category: len(gadgets) for category, gadgets in classified.items()
\\\\},
"classified_gadgets": classified,
"rop_chains": chains,
"all_gadgets": self.gadgets
\\\\}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"ROP analysis report saved: \\\\{output_file\\\\}")
return report
def close(self):
"""Close Rizin session"""
self.r2.quit()
# Usage
if __name__ == "__main__":
binary_path = "/path/to/binary"
analyzer = ROPGadgetAnalyzer(binary_path)
# Find ROP gadgets
gadgets = analyzer.find_rop_gadgets()
# Generate report
report = analyzer.generate_rop_report()
# Close analyzer
analyzer.close()
print(f"Found \\\\{len(gadgets)\\\\} ROP gadgets")
Esempi di integrazione¶
# Cutter Integration¶
#!/usr/bin/env python3
# Rizin and Cutter integration
import r2pipe
import json
import os
class RizinCutterIntegration:
def __init__(self, binary_path):
self.binary_path = binary_path
self.r2 = r2pipe.open(binary_path)
def export_for_cutter(self, output_file="cutter_project.json"):
"""Export Rizin analysis for Cutter"""
# Perform analysis
self.r2.cmd("aaa")
# Collect analysis data
project_data = \\\\{
"file_info": self.r2.cmdj("ij"),
"functions": self.r2.cmdj("aflj"),
"imports": self.r2.cmdj("iij"),
"exports": self.r2.cmdj("iej"),
"strings": self.r2.cmdj("izj"),
"sections": self.r2.cmdj("iSj"),
"symbols": self.r2.cmdj("isj"),
"comments": self.get_all_comments(),
"flags": self.r2.cmdj("fj")
\\\\}
with open(output_file, 'w') as f:
json.dump(project_data, f, indent=2)
print(f"Cutter project data exported: \\\\{output_file\\\\}")
return project_data
def get_all_comments(self):
"""Get all comments from Rizin"""
comments_output = self.r2.cmd("CC*")
comments = []
for line in comments_output.split('\n'):
if line.startswith("CC "):
# Parse comment line
parts = line.split(' ', 2)
if len(parts) >= 3:
comment_text = parts[1]
address = parts[2].split('@')[1] if '@' in parts[2] else ""
try:
addr_int = int(address, 16)
comments.append(\\\\{
"address": addr_int,
"comment": comment_text
\\\\})
except ValueError:
continue
return comments
def import_cutter_project(self, project_file):
"""Import Cutter project data into Rizin"""
try:
with open(project_file, 'r') as f:
project_data = json.load(f)
# Import functions
if "functions" in project_data:
for func in project_data["functions"]:
addr = func.get("offset")
name = func.get("name")
if addr and name:
self.r2.cmd(f"af @ \\\\{addr\\\\}")
self.r2.cmd(f"afn \\\\{name\\\\} @ \\\\{addr\\\\}")
# Import comments
if "comments" in project_data:
for comment in project_data["comments"]:
addr = comment.get("address")
text = comment.get("comment")
if addr and text:
self.r2.cmd(f"CC \\\\{text\\\\} @ \\\\{addr\\\\}")
# Import flags
if "flags" in project_data:
for flag in project_data["flags"]:
addr = flag.get("offset")
name = flag.get("name")
if addr and name:
self.r2.cmd(f"f \\\\{name\\\\} @ \\\\{addr\\\\}")
print("Cutter project imported successfully")
return True
except Exception as e:
print(f"Error importing Cutter project: \\\\{e\\\\}")
return False
# Usage
integration = RizinCutterIntegration("/path/to/binary")
project_data = integration.export_for_cutter()
Ghidra Integration¶
#!/usr/bin/env python3
# Rizin and Ghidra integration
import r2pipe
import json
import subprocess
import tempfile
class RizinGhidraIntegration:
def __init__(self, binary_path, ghidra_path="/opt/ghidra"):
self.binary_path = binary_path
self.ghidra_path = ghidra_path
self.r2 = r2pipe.open(binary_path)
def export_to_ghidra_script(self, output_script="import_rizin.py"):
"""Generate Ghidra script to import Rizin analysis"""
# Perform analysis
self.r2.cmd("aaa")
# Get analysis data
functions = self.r2.cmdj("aflj")
comments = self.get_all_comments()
# Generate Ghidra Python script
script_content = f'''
# Ghidra script to import Rizin analysis
# Auto-generated from Rizin analysis
from ghidra.program.model.symbol import SourceType
from ghidra.program.model.listing import CodeUnit
# Function data from Rizin
functions_data = \\\\{json.dumps(functions, indent=2)\\\\}
# Comments data from Rizin
comments_data = \\\\{json.dumps(comments, indent=2)\\\\}
# Import functions
for func_data in functions_data:
addr = func_data.get("offset")
name = func_data.get("name", "")
if addr and name:
address = toAddr(addr)
# Create function if it doesn't exist
func = getFunctionAt(address)
if func is None:
func = createFunction(address, name)
# Set function name
if func:
func.setName(name, SourceType.USER_DEFINED)
# Import comments
for comment_data in comments_data:
addr = comment_data.get("address")
comment_text = comment_data.get("comment", "")
if addr and comment_text:
address = toAddr(addr)
codeUnit = listing.getCodeUnitAt(address)
if codeUnit:
codeUnit.setComment(CodeUnit.EOL_COMMENT, comment_text)
print("Rizin analysis imported successfully")
'''
with open(output_script, 'w') as f:
f.write(script_content)
print(f"Ghidra import script generated: \\\\{output_script\\\\}")
return output_script
def get_all_comments(self):
"""Get all comments from Rizin"""
comments_output = self.r2.cmd("CC*")
comments = []
for line in comments_output.split('\n'):
if line.startswith("CC "):
# Parse comment line
parts = line.split(' ', 2)
if len(parts) >= 3:
comment_text = parts[1].strip('"')
address_part = parts[2]
if '@' in address_part:
address = address_part.split('@')[1].strip()
try:
addr_int = int(address, 16)
comments.append(\\\\{
"address": addr_int,
"comment": comment_text
\\\\})
except ValueError:
continue
return comments
def run_ghidra_analysis(self, project_name="rizin_import"):
"""Run Ghidra headless analysis"""
# Generate import script
import_script = self.export_to_ghidra_script()
# Run Ghidra headless
cmd = [
f"\\\\{self.ghidra_path\\\\}/support/analyzeHeadless",
"/tmp/ghidra_projects",
project_name,
"-import", self.binary_path,
"-postScript", import_script,
"-deleteProject" # Clean up after analysis
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("Ghidra analysis completed successfully")
return True
else:
print(f"Ghidra analysis failed: \\\\{result.stderr\\\\}")
return False
except Exception as e:
print(f"Error running Ghidra: \\\\{e\\\\}")
return False
# Usage
integration = RizinGhidraIntegration("/path/to/binary")
integration.run_ghidra_analysis()
Risoluzione dei problemi¶
Questioni comuni¶
** Problemi di installazione: ** Traduzione:
** Problemi di analisi: ** Traduzione:
** Problemi di conformità: ** Traduzione:
Debugging¶
Abilitare il debug e la risoluzione dei problemi:
# Verbose output
rizin -v binary_file
# Debug mode
rizin -d binary_file
# Enable logging
rizin -e log.level=5 binary_file
# Check configuration
rizin -e
rizin -e~anal
# Plugin debugging
rz-pm list
rz-pm -i r2ghidra
# Memory debugging
rizin -e dbg.trace=true binary_file
Considerazioni di sicurezza¶
Pratiche di analisi sicura¶
** Analisi del malware Sicurezza:** - Utilizzare macchine virtuali isolate per analisi malware - Disattiva la connettività di rete durante l'analisi di malware - Utilizzare snapshot per ripristinare lo stato pulito - Monitorare il comportamento del sistema durante l'analisi - Attuazione di misure di contenimento adeguate
** Protezione dei dati. - Crittografia risultati di analisi sensibili - Conservazione sicura dei campioni binari - Controlli di accesso per strumenti di analisi - Regolare backup dei dati di analisi - Smaltimento sicuro dei file temporanei
Considerazioni giuridiche ed etiche¶
Etica dell'ingegneria inversa: - Rispetto delle licenze software e dei termini di servizio - Rispetto alle leggi e ai regolamenti applicabili - No. Utilizzare reverse engineering solo per scopi legittimi - Evitare la violazione del copyright - Seguire pratiche di divulgazione responsabile
**Le migliori pratiche: ** - Metodologia e risultati dell'analisi dei documenti - Mantenere la catena di custodia per prove - Implementare processi di garanzia della qualità - Formazione regolare e sviluppo delle competenze - Resta aggiornato con i requisiti legali