FRONTMATTER_31_# Foglio di Cheat Maltego¶
Panoramica¶
Maltego è un completo strumento di analisi di collegamento open source (OSINT) e grafica sviluppato da Paterva che consente agli investigatori di raccogliere, analizzare e visualizzare le relazioni tra persone, organizzazioni, domini, indirizzi IP e altre entità. La piattaforma eccelle nella trasformazione di pezzi di informazioni disparati in intelligenza attuabile attraverso il suo approccio di visualizzazione basato su grafo unico, rendendo relazioni complesse e modelli immediatamente evidenti agli analisti. Maltego è diventato lo standard industriale per le indagini OSINT, utilizzato ampiamente da forze dell'ordine, agenzie di intelligence, team di sicurezza aziendale e tester di penetrazione in tutto il mondo.
La forza principale di Maltego risiede nel suo sistema di trasformazione, che automatizza il processo di raccolta di informazioni da varie fonti di dati e presentandolo in un formato grafico intuitivo. I trasformatori sono query automatizzate che prendono un'entità di input (come un nome di dominio) e restituiscono entità correlate (come indirizzi IP, indirizzi e-mail o domini associati) insieme alle loro relazioni. Questa capacità di automazione consente agli investigatori di espandere rapidamente la loro comprensione dell'impronta digitale di un bersaglio, mantenendo il contesto visivo di come diversi pezzi di informazioni si connettono l'uno all'altro.
La versatilità di Maltego si estende oltre le tradizionali indagini OSINT per includere l'analisi delle minacce, l'indagine sulle frodi, il supporto forense digitale e l'analisi dei social network. La piattaforma supporta sia le tecniche di indagine manuale che l'estrazione automatizzata dei dati, rendendolo adatto sia per le indagini mirate che per le operazioni di raccolta di informazioni generali. La sua capacità di integrare con numerose fonti di dati attraverso trasformazioni, combinate con il suo potente motore di visualizzazione, rende Maltego uno strumento indispensabile per qualsiasi organizzazione che richiede capacità di analisi dell'intelligenza complete.
Installazione¶
Installazione di Windows¶
Installazione di Maltego su sistemi Windows:
# Download Maltego installer
# Visit: https://www.maltego.com/downloads/
# Run installer as administrator
maltego-4.6.0-windows.exe
# Choose installation directory
# Default: C:\Program Files\Maltego
# Configure Java settings
# Minimum 4GB RAM recommended
# Java 8 or higher required
# Launch Maltego
# Start Menu -> Maltego
# Initial setup
# Create Maltego account
# Choose license type (Community/Commercial)
# Configure transform hub
Linux Installazione¶
Installazione di Maltego sulle distribuzioni Linux:
# Download Linux installer
wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.deb
# Install on Ubuntu/Debian
sudo dpkg -i Maltego.v4.6.0.deb
sudo apt-get install -f
# Install dependencies if needed
sudo apt install openjdk-11-jre
# Alternative: Download tar.gz
wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.linux.zip
unzip Maltego.v4.6.0.linux.zip
cd Maltego.v4.6.0
# Make executable
chmod +x bin/maltego
# Run Maltego
./bin/maltego
# Create desktop shortcut
cp maltego.desktop ~/.local/share/applications/
macOS Installazione¶
# Download macOS installer
# Visit: https://www.maltego.com/downloads/
# Install DMG package
# Drag Maltego to Applications folder
# Install Java if needed
brew install openjdk@11
# Launch from Applications
# Or from terminal
/Applications/Maltego.app/Contents/MacOS/maltego
# Configure security settings
# System Preferences -> Security & Privacy
# Allow Maltego to run
Installazione Docker¶
# Create Maltego Docker environment
cat > Dockerfile << 'EOF'
FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
openjdk-11-jre wget unzip \
xvfb x11vnc fluxbox
# Download and install Maltego
RUN wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.linux.zip
RUN unzip Maltego.v4.6.0.linux.zip
RUN mv Maltego.v4.6.0 /opt/maltego
# Setup VNC
EXPOSE 5900
# Start script
COPY start.sh /start.sh
RUN chmod +x /start.sh
CMD ["/start.sh"]
EOF
# Create start script
cat > start.sh << 'EOF'
#!/bin/bash
Xvfb :1 -screen 0 1024x768x16 &
export DISPLAY=:1
fluxbox &
x11vnc -display :1 -nopw -listen localhost -xkb &
cd /opt/maltego
./bin/maltego
EOF
# Build and run
docker build -t maltego-osint .
docker run -p 5900:5900 maltego-osint
Uso di base¶
Impostazione iniziale¶
Impostare Maltego per il primo utilizzo:
# Launch Maltego
maltego
# Account Setup
# 1. Create Maltego account or login
# 2. Choose license type:
# - Community Edition (free, limited)
# - Commercial License (full features)
# Transform Hub Configuration
# 1. Access Transform Hub
# 2. Install transform packages:
# - Standard Transforms
# - Social Networks
# - Threat Intelligence
# - Infrastructure
# Machine Configuration
# 1. Configure machines for automated investigations
# 2. Set up API keys for data sources
# 3. Configure proxy settings if needed
# Workspace Setup
# 1. Create new graph
# 2. Configure entity palette
# 3. Set up custom entity types if needed
Basic Investigation¶
Eseguire indagini OSINT di base:
# Starting an Investigation
# 1. Create new graph
# 2. Add initial entity (domain, person, email)
# 3. Right-click entity -> Run Transform
# Domain Investigation
# Add Domain entity: example.com
# Run transforms:
# - To DNS Name [DNS]
# - To IP Address [DNS]
# - To MX Record [DNS]
# - To Website [HTTP]
# Email Investigation
# Add Email Address entity: user@example.com
# Run transforms:
# - To Domain [Email]
# - To Person [Email]
# - To Social Network Profile
# Person Investigation
# Add Person entity: John Doe
# Run transforms:
# - To Email Address [Search Engines]
# - To Social Network Profile
# - To Phone Number
# - To Address
Gestione delle entrate¶
Lavorare con entità e relazioni:
# Adding Entities
# 1. Drag from Entity Palette
# 2. Right-click graph -> Add Entity
# 3. Import from file (CSV, XML)
# Entity Properties
# 1. Select entity
# 2. View Properties tab
# 3. Edit entity details
# 4. Add custom properties
# Relationship Management
# 1. Select two entities
# 2. Right-click -> Add Link
# 3. Choose link type
# 4. Set link properties
# Entity Grouping
# 1. Select multiple entities
# 2. Right-click -> Group
# 3. Name the group
# 4. Set group properties
Caratteristiche avanzate¶
Custom Transforms¶
Creare e configurare trasformazioni personalizzate:
# Transform Development
# 1. Access Transform Hub
# 2. Create new transform
# 3. Configure input/output entities
# 4. Write transform code
# Python Transform Example
#!/usr/bin/env python3
from maltego_trx.maltego import UIM_TYPES
from maltego_trx.transform import DiscoverableTransform
class DomainToSubdomains(DiscoverableTransform):
@classmethod
def create_entities(cls, request, response):
domain = request.Value
# Custom subdomain discovery logic
subdomains = discover_subdomains(domain)
for subdomain in subdomains:
response.addEntity("maltego.Domain", subdomain)
return response
def discover_subdomains(domain):
# Implementation for subdomain discovery
# Using DNS queries, certificate transparency, etc.
pass
# Transform Configuration
# 1. Set transform name and description
# 2. Configure input entity types
# 3. Set output entity types
# 4. Configure authentication if needed
# 5. Test transform functionality
Configurazione della macchina¶
Installazione di macchine di indagine automatizzate:
# Creating Investigation Machines
# 1. Access Machines tab
# 2. Create new machine
# 3. Add transform sequence
# 4. Configure parameters
# Domain Investigation Machine
# Sequence:
# 1. Domain -> DNS Names
# 2. Domain -> IP Addresses
# 3. Domain -> MX Records
# 4. Domain -> Subdomains
# 5. IP Address -> Geolocation
# 6. IP Address -> Network Block
# Person Investigation Machine
# Sequence:
# 1. Person -> Email Addresses
# 2. Email -> Domains
# 3. Person -> Social Profiles
# 4. Person -> Phone Numbers
# 5. Phone -> Location
# Machine Parameters
# - Maximum entities per transform
# - Transform timeout settings
# - API rate limiting
# - Output filtering rules
Integrazione dei dati¶
Integrazione di fonti di dati esterne:
# API Integration
# 1. Configure API keys in Transform Hub
# 2. Available APIs:
# - VirusTotal
# - Shodan
# - PassiveTotal
# - ThreatCrowd
# - Social media APIs
# Custom Data Sources
# 1. Create CSV import templates
# 2. Configure entity mappings
# 3. Import data files
# 4. Validate imported entities
# Database Integration
# 1. Configure database connections
# 2. Create SQL-based transforms
# 3. Map database fields to entities
# 4. Set up automated queries
# File Import/Export
# Supported formats:
# - CSV (entities and links)
# - XML (Maltego format)
# - GraphML
# - JSON
# - KML (geolocation data)
Visualizzazione avanzata¶
Visualizzazione e analisi dei grafici avanzate:
# Layout Algorithms
# 1. Hierarchical Layout
# 2. Circular Layout
# 3. Organic Layout
# 4. Force-Directed Layout
# 5. Geographic Layout
# Filtering and Selection
# 1. Filter by entity type
# 2. Filter by property values
# 3. Filter by link types
# 4. Time-based filtering
# 5. Custom filter expressions
# Graph Analysis
# 1. Centrality analysis
# 2. Clustering detection
# 3. Path analysis
# 4. Network metrics
# 5. Pattern recognition
# Visualization Customization
# 1. Entity icons and colors
# 2. Link styles and colors
# 3. Label customization
# 4. Size scaling by properties
# 5. Conditional formatting
Automation Scripts¶
Automated OSINT Collection¶
#!/usr/bin/env python3
# Automated OSINT collection using Maltego API
import requests
import json
import time
from datetime import datetime
class MaltegoOSINTCollector:
def __init__(self, api_key, base_url="https://api.maltego.com"):
self.api_key = api_key
self.base_url = base_url
self.headers = \\\\{
"Authorization": f"Bearer \\\\{api_key\\\\}",
"Content-Type": "application/json"
\\\\}
self.results = \\\\{\\\\}
def create_graph(self, name, description=""):
"""Create new investigation graph"""
data = \\\\{
"name": name,
"description": description,
"created": datetime.now().isoformat()
\\\\}
response = requests.post(
f"\\\\{self.base_url\\\\}/graphs",
headers=self.headers,
json=data
)
if response.status_code == 201:
graph_data = response.json()
return graph_data["id"]
else:
print(f"Error creating graph: \\\\{response.status_code\\\\}")
return None
def add_entity(self, graph_id, entity_type, value, properties=None):
"""Add entity to graph"""
data = \\\\{
"type": entity_type,
"value": value,
"properties": properties or \\\\{\\\\}
\\\\}
response = requests.post(
f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/entities",
headers=self.headers,
json=data
)
if response.status_code == 201:
return response.json()["id"]
else:
print(f"Error adding entity: \\\\{response.status_code\\\\}")
return None
def run_transform(self, graph_id, entity_id, transform_name):
"""Run transform on entity"""
data = \\\\{
"transform": transform_name,
"entity_id": entity_id
\\\\}
response = requests.post(
f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/transforms",
headers=self.headers,
json=data
)
if response.status_code == 200:
return response.json()
else:
print(f"Error running transform: \\\\{response.status_code\\\\}")
return None
def run_machine(self, graph_id, entity_id, machine_name):
"""Run investigation machine"""
data = \\\\{
"machine": machine_name,
"entity_id": entity_id,
"max_entities": 100,
"timeout": 300
\\\\}
response = requests.post(
f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/machines",
headers=self.headers,
json=data
)
if response.status_code == 200:
job_id = response.json()["job_id"]
return self.wait_for_job(job_id)
else:
print(f"Error running machine: \\\\{response.status_code\\\\}")
return None
def wait_for_job(self, job_id, timeout=600):
"""Wait for job completion"""
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"\\\\{self.base_url\\\\}/jobs/\\\\{job_id\\\\}",
headers=self.headers
)
if response.status_code == 200:
job_data = response.json()
status = job_data["status"]
if status == "completed":
return job_data["results"]
elif status == "failed":
print(f"Job failed: \\\\{job_data.get('error', 'Unknown error')\\\\}")
return None
else:
time.sleep(10) # Wait 10 seconds before checking again
else:
print(f"Error checking job status: \\\\{response.status_code\\\\}")
return None
print("Job timeout")
return None
def investigate_domain(self, domain):
"""Comprehensive domain investigation"""
print(f"Starting investigation of domain: \\\\{domain\\\\}")
# Create graph
graph_id = self.create_graph(f"Domain Investigation: \\\\{domain\\\\}")
if not graph_id:
return None
# Add domain entity
entity_id = self.add_entity(graph_id, "maltego.Domain", domain)
if not entity_id:
return None
# Run domain investigation machine
results = self.run_machine(graph_id, entity_id, "Domain Investigation")
if results:
self.results[domain] = \\\\{
"graph_id": graph_id,
"investigation_results": results,
"timestamp": datetime.now().isoformat()
\\\\}
print(f"Investigation completed for \\\\{domain\\\\}")
return results
else:
print(f"Investigation failed for \\\\{domain\\\\}")
return None
def investigate_email(self, email):
"""Comprehensive email investigation"""
print(f"Starting investigation of email: \\\\{email\\\\}")
# Create graph
graph_id = self.create_graph(f"Email Investigation: \\\\{email\\\\}")
if not graph_id:
return None
# Add email entity
entity_id = self.add_entity(graph_id, "maltego.EmailAddress", email)
if not entity_id:
return None
# Run email investigation transforms
transforms = [
"To Domain [Email]",
"To Person [Email]",
"To Social Network Profile",
"To Breach Data"
]
results = \\\\{\\\\}
for transform in transforms:
print(f"Running transform: \\\\{transform\\\\}")
result = self.run_transform(graph_id, entity_id, transform)
if result:
results[transform] = result
time.sleep(2) # Rate limiting
self.results[email] = \\\\{
"graph_id": graph_id,
"investigation_results": results,
"timestamp": datetime.now().isoformat()
\\\\}
return results
def investigate_person(self, person_name):
"""Comprehensive person investigation"""
print(f"Starting investigation of person: \\\\{person_name\\\\}")
# Create graph
graph_id = self.create_graph(f"Person Investigation: \\\\{person_name\\\\}")
if not graph_id:
return None
# Add person entity
entity_id = self.add_entity(graph_id, "maltego.Person", person_name)
if not entity_id:
return None
# Run person investigation machine
results = self.run_machine(graph_id, entity_id, "Person Investigation")
if results:
self.results[person_name] = \\\\{
"graph_id": graph_id,
"investigation_results": results,
"timestamp": datetime.now().isoformat()
\\\\}
print(f"Investigation completed for \\\\{person_name\\\\}")
return results
else:
print(f"Investigation failed for \\\\{person_name\\\\}")
return None
def export_results(self, output_file="maltego_results.json"):
"""Export investigation results"""
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"Results exported to \\\\{output_file\\\\}")
def generate_report(self, output_file="maltego_report.html"):
"""Generate HTML investigation report"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Maltego OSINT Investigation Report</title>
<style>
body \\\\{ font-family: Arial, sans-serif; margin: 20px; \\\\}
.header \\\\{ background-color: #f0f0f0; padding: 20px; \\\\}
.investigation \\\\{ margin: 20px 0; border: 1px solid #ccc; padding: 15px; \\\\}
.entity \\\\{ margin: 10px 0; padding: 10px; background-color: #f9f9f9; \\\\}
.results \\\\{ margin: 10px 0; \\\\}
</style>
</head>
<body>
<div class="header">
<h1>Maltego OSINT Investigation Report</h1>
<p>Generated: \\\\{timestamp\\\\}</p>
<p>Total Investigations: \\\\{total_investigations\\\\}</p>
</div>
\\\\{investigations\\\\}
</body>
</html>
"""
investigations_html = ""
for target, data in self.results.items():
investigations_html += f"""
<div class="investigation">
<h2>Investigation: \\\\{target\\\\}</h2>
<p>Timestamp: \\\\{data['timestamp']\\\\}</p>
<p>Graph ID: \\\\{data['graph_id']\\\\}</p>
<div class="results">
<h3>Results Summary</h3>
<p>Entities discovered: \\\\{len(data.get('investigation_results', \\\\{\\\\}))\\\\}</p>
</div>
</div>
"""
html_content = html_template.format(
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
total_investigations=len(self.results),
investigations=investigations_html
)
with open(output_file, 'w') as f:
f.write(html_content)
print(f"Report generated: \\\\{output_file\\\\}")
# Usage
if __name__ == "__main__":
collector = MaltegoOSINTCollector("your_api_key")
# Investigate multiple targets
targets = [
("domain", "example.com"),
("email", "user@example.com"),
("person", "John Doe")
]
for target_type, target_value in targets:
if target_type == "domain":
collector.investigate_domain(target_value)
elif target_type == "email":
collector.investigate_email(target_value)
elif target_type == "person":
collector.investigate_person(target_value)
time.sleep(5) # Rate limiting between investigations
# Export results
collector.export_results()
collector.generate_report()
Bulk Entity Processing¶
#!/usr/bin/env python3
# Bulk entity processing for Maltego
import csv
import json
import time
from datetime import datetime
class MaltegoBulkProcessor:
def __init__(self, maltego_collector):
self.collector = maltego_collector
self.processed_entities = []
self.failed_entities = []
def process_csv_file(self, csv_file, entity_type_column, entity_value_column):
"""Process entities from CSV file"""
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
entity_type = row[entity_type_column]
entity_value = row[entity_value_column]
print(f"Processing \\\\{entity_type\\\\}: \\\\{entity_value\\\\}")
try:
if entity_type.lower() == "domain":
result = self.collector.investigate_domain(entity_value)
elif entity_type.lower() == "email":
result = self.collector.investigate_email(entity_value)
elif entity_type.lower() == "person":
result = self.collector.investigate_person(entity_value)
else:
print(f"Unknown entity type: \\\\{entity_type\\\\}")
continue
if result:
self.processed_entities.append(\\\\{
"type": entity_type,
"value": entity_value,
"status": "success",
"timestamp": datetime.now().isoformat()
\\\\})
else:
self.failed_entities.append(\\\\{
"type": entity_type,
"value": entity_value,
"status": "failed",
"timestamp": datetime.now().isoformat()
\\\\})
except Exception as e:
print(f"Error processing \\\\{entity_value\\\\}: \\\\{e\\\\}")
self.failed_entities.append(\\\\{
"type": entity_type,
"value": entity_value,
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
\\\\})
# Rate limiting
time.sleep(10)
def generate_processing_report(self, output_file="bulk_processing_report.json"):
"""Generate bulk processing report"""
report = \\\\{
"processing_summary": \\\\{
"total_entities": len(self.processed_entities) + len(self.failed_entities),
"successful": len(self.processed_entities),
"failed": len(self.failed_entities),
"success_rate": len(self.processed_entities) / (len(self.processed_entities) + len(self.failed_entities)) * 100
\\\\},
"processed_entities": self.processed_entities,
"failed_entities": self.failed_entities,
"report_timestamp": datetime.now().isoformat()
\\\\}
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"Bulk processing report saved: \\\\{output_file\\\\}")
return report
# Usage
collector = MaltegoOSINTCollector("your_api_key")
processor = MaltegoBulkProcessor(collector)
# Process entities from CSV
processor.process_csv_file("entities.csv", "type", "value")
# Generate report
processor.generate_processing_report()
# Transform Development Framework¶
#!/usr/bin/env python3
# Maltego transform development framework
from maltego_trx.maltego import UIM_TYPES
from maltego_trx.transform import DiscoverableTransform
import requests
import dns.resolver
import whois
import socket
class CustomDomainTransforms:
"""Collection of custom domain-related transforms"""
class DomainToSubdomains(DiscoverableTransform):
@classmethod
def create_entities(cls, request, response):
domain = request.Value
# Certificate Transparency subdomain discovery
subdomains = cls.discover_subdomains_ct(domain)
for subdomain in subdomains:
entity = response.addEntity("maltego.Domain", subdomain)
entity.addProperty("source", "Certificate Transparency")
return response
@staticmethod
def discover_subdomains_ct(domain):
"""Discover subdomains using Certificate Transparency"""
subdomains = set()
try:
url = f"https://crt.sh/?q=%.\\\\{domain\\\\}&output=json"
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
for cert in data:
name_value = cert.get('name_value', '')
for name in name_value.split('\n'):
name = name.strip()
if name.endswith(f'.\\\\{domain\\\\}') and '*' not in name:
subdomains.add(name)
except Exception as e:
print(f"Error discovering subdomains: \\\\{e\\\\}")
return list(subdomains)
class DomainToTechnologies(DiscoverableTransform):
@classmethod
def create_entities(cls, request, response):
domain = request.Value
# Technology detection
technologies = cls.detect_technologies(domain)
for tech in technologies:
entity = response.addEntity("maltego.Website", tech['name'])
entity.addProperty("technology.version", tech.get('version', ''))
entity.addProperty("technology.confidence", tech.get('confidence', ''))
return response
@staticmethod
def detect_technologies(domain):
"""Detect web technologies"""
technologies = []
try:
# Simple technology detection based on headers
response = requests.get(f"http://\\\\{domain\\\\}", timeout=10)
headers = response.headers
# Server detection
server = headers.get('Server', '')
if server:
technologies.append(\\\\{
'name': server,
'type': 'Web Server',
'confidence': 'high'
\\\\})
# Framework detection
x_powered_by = headers.get('X-Powered-By', '')
if x_powered_by:
technologies.append(\\\\{
'name': x_powered_by,
'type': 'Framework',
'confidence': 'high'
\\\\})
# Content analysis
content = response.text.lower()
# WordPress detection
if 'wp-content' in content or 'wordpress' in content:
technologies.append(\\\\{
'name': 'WordPress',
'type': 'CMS',
'confidence': 'high'
\\\\})
# jQuery detection
if 'jquery' in content:
technologies.append(\\\\{
'name': 'jQuery',
'type': 'JavaScript Library',
'confidence': 'medium'
\\\\})
except Exception as e:
print(f"Error detecting technologies: \\\\{e\\\\}")
return technologies
class DomainToWhoisInfo(DiscoverableTransform):
@classmethod
def create_entities(cls, request, response):
domain = request.Value
# WHOIS lookup
whois_info = cls.get_whois_info(domain)
if whois_info:
# Add registrar
if 'registrar' in whois_info:
entity = response.addEntity("maltego.Organization", whois_info['registrar'])
entity.addProperty("organization.type", "Registrar")
# Add registrant
if 'registrant' in whois_info:
entity = response.addEntity("maltego.Person", whois_info['registrant'])
entity.addProperty("person.role", "Registrant")
# Add creation date
if 'creation_date' in whois_info:
entity = response.addEntity("maltego.Date", str(whois_info['creation_date']))
entity.addProperty("date.type", "Domain Creation")
return response
@staticmethod
def get_whois_info(domain):
"""Get WHOIS information"""
try:
w = whois.whois(domain)
return \\\\{
'registrar': w.registrar,
'registrant': w.registrant,
'creation_date': w.creation_date,
'expiration_date': w.expiration_date,
'name_servers': w.name_servers
\\\\}
except Exception as e:
print(f"Error getting WHOIS info: \\\\{e\\\\}")
return None
# Transform registration
def register_transforms():
"""Register custom transforms"""
transforms = [
CustomDomainTransforms.DomainToSubdomains,
CustomDomainTransforms.DomainToTechnologies,
CustomDomainTransforms.DomainToWhoisInfo
]
return transforms
# Usage in Maltego
if __name__ == "__main__":
# This would be called by Maltego transform runner
transforms = register_transforms()
print(f"Registered \\\\{len(transforms)\\\\} custom transforms")
Esempi di integrazione¶
Threat Intelligence Integration¶
#!/usr/bin/env python3
# Maltego threat intelligence integration
import requests
import json
class ThreatIntelligenceIntegration:
def __init__(self, maltego_collector):
self.collector = maltego_collector
self.threat_feeds = \\\\{
"virustotal": "https://www.virustotal.com/vtapi/v2",
"threatcrowd": "https://www.threatcrowd.org/searchApi/v2",
"otx": "https://otx.alienvault.com/api/v1"
\\\\}
def enrich_with_threat_intel(self, graph_id, entity_id, entity_value, entity_type):
"""Enrich entity with threat intelligence"""
threat_data = \\\\{\\\\}
if entity_type == "domain":
threat_data = self.check_domain_reputation(entity_value)
elif entity_type == "ip":
threat_data = self.check_ip_reputation(entity_value)
elif entity_type == "hash":
threat_data = self.check_file_reputation(entity_value)
# Add threat intelligence entities to graph
if threat_data:
self.add_threat_entities(graph_id, entity_id, threat_data)
def check_domain_reputation(self, domain):
"""Check domain reputation across threat feeds"""
reputation_data = \\\\{\\\\}
# VirusTotal domain report
try:
vt_response = requests.get(
f"\\\\{self.threat_feeds['virustotal']\\\\}/domain/report",
params=\\\\{"apikey": "your_vt_api_key", "domain": domain\\\\}
)
if vt_response.status_code == 200:
vt_data = vt_response.json()
reputation_data["virustotal"] = \\\\{
"positives": vt_data.get("positives", 0),
"total": vt_data.get("total", 0),
"scan_date": vt_data.get("scan_date", "")
\\\\}
except Exception as e:
print(f"Error checking VirusTotal: \\\\{e\\\\}")
# ThreatCrowd domain report
try:
tc_response = requests.get(
f"\\\\{self.threat_feeds['threatcrowd']\\\\}/domain/report",
params=\\\\{"domain": domain\\\\}
)
if tc_response.status_code == 200:
tc_data = tc_response.json()
reputation_data["threatcrowd"] = \\\\{
"votes": tc_data.get("votes", 0),
"references": tc_data.get("references", []),
"resolutions": tc_data.get("resolutions", [])
\\\\}
except Exception as e:
print(f"Error checking ThreatCrowd: \\\\{e\\\\}")
return reputation_data
def add_threat_entities(self, graph_id, entity_id, threat_data):
"""Add threat intelligence entities to graph"""
for source, data in threat_data.items():
# Add threat intelligence source
ti_entity_id = self.collector.add_entity(
graph_id,
"maltego.ThreatIntelligence",
source,
\\\\{"source": source, "data": json.dumps(data)\\\\}
)
# Link to original entity
if ti_entity_id:
self.collector.add_link(graph_id, entity_id, ti_entity_id, "threat_intelligence")
# Usage
collector = MaltegoOSINTCollector("your_api_key")
threat_intel = ThreatIntelligenceIntegration(collector)
# Investigate domain with threat intelligence
graph_id = collector.create_graph("Threat Intel Investigation")
entity_id = collector.add_entity(graph_id, "maltego.Domain", "suspicious-domain.com")
threat_intel.enrich_with_threat_intel(graph_id, entity_id, "suspicious-domain.com", "domain")
Social Media Integration¶
#!/usr/bin/env python3
# Maltego social media integration
import tweepy
import facebook
import linkedin
class SocialMediaIntegration:
def __init__(self, maltego_collector, api_keys):
self.collector = maltego_collector
self.api_keys = api_keys
self.setup_apis()
def setup_apis(self):
"""Setup social media API connections"""
# Twitter API
if "twitter" in self.api_keys:
auth = tweepy.OAuthHandler(
self.api_keys["twitter"]["consumer_key"],
self.api_keys["twitter"]["consumer_secret"]
)
auth.set_access_token(
self.api_keys["twitter"]["access_token"],
self.api_keys["twitter"]["access_token_secret"]
)
self.twitter_api = tweepy.API(auth)
# Facebook API
if "facebook" in self.api_keys:
self.facebook_api = facebook.GraphAPI(
access_token=self.api_keys["facebook"]["access_token"]
)
def search_social_profiles(self, person_name):
"""Search for social media profiles"""
profiles = \\\\{\\\\}
# Twitter search
try:
twitter_users = self.twitter_api.search_users(person_name, count=10)
profiles["twitter"] = [
\\\\{
"username": user.screen_name,
"name": user.name,
"followers": user.followers_count,
"verified": user.verified
\\\\}
for user in twitter_users
]
except Exception as e:
print(f"Error searching Twitter: \\\\{e\\\\}")
return profiles
def analyze_social_connections(self, username, platform):
"""Analyze social media connections"""
connections = []
if platform == "twitter":
try:
# Get followers
followers = self.twitter_api.get_followers(screen_name=username, count=100)
for follower in followers:
connections.append(\\\\{
"type": "follower",
"username": follower.screen_name,
"name": follower.name,
"relationship": "follows"
\\\\})
except Exception as e:
print(f"Error analyzing Twitter connections: \\\\{e\\\\}")
return connections
# Usage
api_keys = \\\\{
"twitter": \\\\{
"consumer_key": "your_consumer_key",
"consumer_secret": "your_consumer_secret",
"access_token": "your_access_token",
"access_token_secret": "your_access_token_secret"
\\\\}
\\\\}
collector = MaltegoOSINTCollector("your_api_key")
social_media = SocialMediaIntegration(collector, api_keys)
# Search for social profiles
profiles = social_media.search_social_profiles("John Doe")
Risoluzione dei problemi¶
Questioni comuni¶
** Errore di traduzione: ** Traduzione:
** Problemi di conformità: ** Traduzione:
** Qualità dei dati Questioni: ** Traduzione:
Debugging¶
Abilita debug dettagliato e logging:
# Enable Maltego debugging
# 1. Help -> Debug Information
# 2. Enable verbose logging
# 3. Check log files in:
# - Windows: %APPDATA%\Maltego\logs
# - Linux: ~/.maltego/logs
# - macOS: ~/Library/Application Support/Maltego/logs
# Transform debugging
# 1. Add debug prints to transform code
# 2. Use transform testing framework
# 3. Monitor API responses
# 4. Check entity property validation
# Network debugging
# 1. Use network monitoring tools
# 2. Check proxy configurations
# 3. Verify SSL certificates
# 4. Monitor API rate limits
Considerazioni di sicurezza¶
Sicurezza operativa¶
** Protezione dei dati. - Crittografia dati di indagine sensibili - Utilizzare canali di comunicazione sicuri - Implementare controlli di accesso adeguati - Valutazioni di sicurezza regolari - Smaltimento sicuro di artefatti di indagine
API Sicurezza. - archiviazione sicura delle chiavi API - Regolare rotazione delle chiavi - Monitorare i modelli di utilizzo API - Limitamento del tasso di esecuzione - Utilizzare meno principi privilegi
Considerazioni giuridiche ed etiche¶
**Conformità della privacy: ** - Rispetto delle leggi sulla privacy e dei regolamenti - Ottenere una corretta autorizzazione per le indagini - Implementare i principi di minimizzazione dei dati - Gestione sicura delle informazioni personali - Regolari controlli di conformità
Ethical OSINT: - Utilizzare solo informazioni pubbliche - Rispetto dei termini di servizio - Evitare l'ingegneria sociale - Mantenere gli standard professionali - Metodologia di indagine dei documenti
Referenze¶
- Maltego Sito ufficiale__
- Maltego Documentazione
- [Maltego Transform Hub](URL_28__
- [OSINT Framework](URL_29__
- [Tecniche di intelligenza open source](URL_30__