Saltar a contenido

Maltego hoja de trucos

Overview

Maltego is a comprehensive open source intelligence (OSINT) and graphical link analysis tool developed by Paterva that enables investigators to gather, analyze, and visualize relationships between people, organizations, domains, IP addresses, and other entities. The platform excels at transforming disparate pieces of information into actionable intelligence through its unique graph-based visualization approach, making complex relationships and patterns immediately apparent to analysts. Maltego has become the industry standard for OSINT investigations, used extensively by law enforcement, intelligence agencies, corporate security teams, and penetration testers worldwide.

The core strength of Maltego lies in its transform system, which automates the proceso of gathering information from various data sources and presenting it in an intuitive graphical format. Transforms are automated queries that take an input entity (such as a domain name) and return related entities (such as IP addresses, email addresses, or associated domains) along with their relationships. This automation capability allows investigators to rapidly expand their understanding of a objetivo's digital footprint while maintaining visual context of how different pieces of information connect to each other.

Maltego's versatility extends beyond traditional OSINT investigations to include Inteligencia de Amenazas analysis, fraud investigation, forense digital suppuerto, and social Análisis de Red. The platform suppuertos both manual investigation techniques and automated data mining, making it suitable for both objetivoed investigations and broad intelligence gathering operations. Its ability to integrate with numerous data sources through transforms, combined with its powerful visualization engine, makes Maltego an indispensable tool for any organization requiring comprehensive intelligence analysis capabilities.

instalación

Windows instalación

Installing Maltego on Windows systems:

# Download Maltego installer
# Visit: https://www.maltego.com/downloads/

# Run installer as administrator
maltego-4.6.0-windows.exe

# Choose instalación directory
# Default: C:\Program Files\Maltego

# Configure Java settings
# Minimum 4GB RAM recommended
# Java 8 or higher required

# Launch Maltego
# Start Menu -> Maltego

# Initial setup
# Create Maltego account
# Choose license type (Community/Commercial)
# Configure transform hub

Linux instalación

Installing Maltego on Linux distributions:

# Download Linux installer
wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.deb

# Install on Ubuntu/Debian
sudo dpkg -i Maltego.v4.6.0.deb
sudo apt-get install -f

# Install dependencies if needed
sudo apt install openjdk-11-jre

# Alternative: Download tar.gz
wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.linux.zip
unzip Maltego.v4.6.0.linux.zip
cd Maltego.v4.6.0

# Make executable
chmod +x bin/maltego

# Run Maltego
./bin/maltego

# Create desktop shortcut
cp maltego.desktop ~/.local/share/applications/

macOS instalación

# Download macOS installer
# Visit: https://www.maltego.com/downloads/

# Install DMG package
# Drag Maltego to Applications folder

# Install Java if needed
brew install openjdk@11

# Launch from Applications
# Or from terminal
/Applications/Maltego.app/Contents/MacOS/maltego

# Configure security settings
# System Preferencias -> Security & Privacy
# Allow Maltego to run

Docker instalación

# Create Maltego Docker environment
cat > Dockerfile << 'EOF'
FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    openjdk-11-jre wget unzip \
    xvfb x11vnc fluxbox

# Download and install Maltego
RUN wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.linux.zip
RUN unzip Maltego.v4.6.0.linux.zip
RUN mv Maltego.v4.6.0 /opt/maltego

# Setup VNC
EXPOSE 5900

# Start script
COPY start.sh /start.sh
RUN chmod +x /start.sh

CMD ["/start.sh"]
EOF

# Create start script
cat > start.sh << 'EOF'
#!/bin/bash
Xvfb :1 -screen 0 1024x768x16 &
expuerto DISPLAY=:1
fluxbox &
x11vnc -display :1 -nopw -listen localhost -xkb &
cd /opt/maltego
./bin/maltego
EOF

# Build and run
docker build -t maltego-osint .
docker run -p 5900:5900 maltego-osint

Basic uso

Initial Setup

Setting up Maltego for first use:

# Launch Maltego
maltego

# Account Setup
# 1. Create Maltego account or login
# 2. Choose license type:
#    - Community Edition (free, limited)
#    - Commercial License (full features)

# Transform Hub configuración
# 1. Access Transform Hub
# 2. Install transform packages:
#    - Standard Transforms
#    - Social Networks
#    - Inteligencia de Amenazas
#    - Infrastructure

# Machine configuración
# 1. Configure machines for automated investigations
# 2. Set up API claves for data sources
# 3. Configure proxy settings if needed

# Workspace Setup
# 1. Create new graph
# 2. Configure entity palette
# 3. Set up custom entity types if needed

Basic Investigation

Performing basic OSINT investigations:

# Starting an Investigation
# 1. Create new graph
# 2. Add initial entity (domain, person, email)
# 3. Right-click entity -> Run Transform

# Domain Investigation
# Add Domain entity: ejemplo.com
# Run transforms:
# - To DNS Name [DNS]
# - To IP Address [DNS]
# - To MX Record [DNS]
# - To Website [HTTP]

# Email Investigation
# Add Email Address entity: user@ejemplo.com
# Run transforms:
# - To Domain [Email]
# - To Person [Email]
# - To Social Network Profile

# Person Investigation
# Add Person entity: John Doe
# Run transforms:
# - To Email Address [Search Engines]
# - To Social Network Profile
# - To Phone Number
# - To Address

Entity Management

Working with entities and relationships:

# Adding Entities
# 1. Drag from Entity Palette
# 2. Right-click graph -> Add Entity
# 3. Impuerto from file (CSV, XML)

# Entity Properties
# 1. Select entity
# 2. View Properties tab
# 3. Edit entity details
# 4. Add custom properties

# Relationship Management
# 1. Select two entities
# 2. Right-click -> Add Link
# 3. Choose link type
# 4. Set link properties

# Entity Grouping
# 1. Select multiple entities
# 2. Right-click -> Group
# 3. Name the group
# 4. Set group properties

Advanced Features

Custom Transforms

Creating and configuring custom transforms:

# Transform Development
# 1. Access Transform Hub
# 2. Create new transform
# 3. Configure input/output entities
# 4. Write transform code

# Python Transform ejemplo
#!/usr/bin/env python3
from maltego_trx.maltego impuerto UIM_TYPES
from maltego_trx.transform impuerto DiscoverableTransform

class DomainToSubdomains(DiscoverableTransform):
    @classmethod
    def create_entities(cls, request, response):
        domain = request.Value

        # Custom subdomain discovery logic
        subdomains = discover_subdomains(domain)

        for subdomain in subdomains:
            response.addEntity("maltego.Domain", subdomain)

        return response

def discover_subdomains(domain):
    # Implementation for subdomain discovery
    # Using DNS queries, certificado transparency, etc.
    pass

# Transform configuración
# 1. Set transform name and Descripción
# 2. Configure input entity types
# 3. Set output entity types
# 4. Configure autenticación if needed
# 5. Test transform functionality

Machine configuración

Setting up automated investigation machines:

# Creating Investigation Machines
# 1. Access Machines tab
# 2. Create new machine
# 3. Add transform sequence
# 4. Configure parámetros

# Domain Investigation Machine
# Sequence:
# 1. Domain -> DNS Names
# 2. Domain -> IP Addresses
# 3. Domain -> MX Records
# 4. Domain -> Subdomains
# 5. IP Address -> Geolocation
# 6. IP Address -> Network Block

# Person Investigation Machine
# Sequence:
# 1. Person -> Email Addresses
# 2. Email -> Domains
# 3. Person -> Social Profiles
# 4. Person -> Phone Numbers
# 5. Phone -> Location

# Machine parámetros
# - Maximum entities per transform
# - Transform timeout settings
# - API rate limiting
# - Output filtering rules

Data Integration

Integrating external data sources:

# API Integration
# 1. Configure API claves in Transform Hub
# 2. Available APIs:
#    - VirusTotal
#    - Shodan
#    - PassiveTotal
#    - ThreatCrowd
#    - Social media APIs

# Custom Data Sources
# 1. Create CSV impuerto templates
# 2. Configure entity mappings
# 3. Impuerto data files
# 4. Validate impuertoed entities

# Database Integration
# 1. Configure database conexións
# 2. Create SQL-based transforms
# 3. Map database fields to entities
# 4. Set up automated queries

# File Impuerto/Expuerto
# Suppuertoed formats:
# - CSV (entities and links)
# - XML (Maltego format)
# - GraphML
# - JSON
# - KML (geolocation data)

Advanced Visualization

Advanced graph visualization and analysis:

# Layout algoritmos
# 1. Hierarchical Layout
# 2. Circular Layout
# 3. Organic Layout
# 4. Force-Directed Layout
# 5. Geographic Layout

# Filtering and Selection
# 1. Filter by entity type
# 2. Filter by property values
# 3. Filter by link types
# 4. Time-based filtering
# 5. Custom filter expressions

# Graph Analysis
# 1. Centrality analysis
# 2. Clustering detection
# 3. Path analysis
# 4. Network metrics
# 5. Pattern recognition

# Visualization Customization
# 1. Entity icons and colors
# 2. Link styles and colors
# 3. Label customization
# 4. Size scaling by properties
# 5. Conditional formatting

Automation Scripts

Automated OSINT Collection

#!/usr/bin/env python3
# Automated OSINT collection using Maltego API

impuerto requests
impuerto json
impuerto time
from datetime impuerto datetime

class MaltegoOSINTCollector:
    def __init__(self, api_clave, base_url="https://api.maltego.com"):
        self.api_clave = api_clave
        self.base_url = base_url
        self.headers = \\\\{
            "autorización": f"Bearer \\\\{api_clave\\\\}",
            "Content-Type": "application/json"
        \\\\}
        self.results = \\\\{\\\\}

    def create_graph(self, name, Descripción=""):
        """Create new investigation graph"""
        data = \\\\{
            "name": name,
            "Descripción": Descripción,
            "created": datetime.now().isoformat()
        \\\\}

        response = requests.post(
            f"\\\\{self.base_url\\\\}/graphs",
            headers=self.headers,
            json=data
        )

        if response.status_code == 201:
            graph_data = response.json()
            return graph_data["id"]
        else:
            print(f"Error creating graph: \\\\{response.status_code\\\\}")
            return None

    def add_entity(self, graph_id, entity_type, value, properties=None):
        """Add entity to graph"""
        data = \\\\{
            "type": entity_type,
            "value": value,
            "properties": properties or \\\\{\\\\}
        \\\\}

        response = requests.post(
            f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/entities",
            headers=self.headers,
            json=data
        )

        if response.status_code == 201:
            return response.json()["id"]
        else:
            print(f"Error adding entity: \\\\{response.status_code\\\\}")
            return None

    def run_transform(self, graph_id, entity_id, transform_name):
        """Run transform on entity"""
        data = \\\\{
            "transform": transform_name,
            "entity_id": entity_id
        \\\\}

        response = requests.post(
            f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/transforms",
            headers=self.headers,
            json=data
        )

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error running transform: \\\\{response.status_code\\\\}")
            return None

    def run_machine(self, graph_id, entity_id, machine_name):
        """Run investigation machine"""
        data = \\\\{
            "machine": machine_name,
            "entity_id": entity_id,
            "max_entities": 100,
            "timeout": 300
        \\\\}

        response = requests.post(
            f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/machines",
            headers=self.headers,
            json=data
        )

        if response.status_code == 200:
            job_id = response.json()["job_id"]
            return self.wait_for_job(job_id)
        else:
            print(f"Error running machine: \\\\{response.status_code\\\\}")
            return None

    def wait_for_job(self, job_id, timeout=600):
        """Wait for job completion"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            response = requests.get(
                f"\\\\{self.base_url\\\\}/jobs/\\\\{job_id\\\\}",
                headers=self.headers
            )

            if response.status_code == 200:
                job_data = response.json()
                status = job_data["status"]

                if status == "completed":
                    return job_data["results"]
                elif status == "failed":
                    print(f"Job failed: \\\\{job_data.get('error', 'Unknown error')\\\\}")
                    return None
                else:
                    time.sleep(10)  # Wait 10 seconds before checking again
            else:
                print(f"Error checking job status: \\\\{response.status_code\\\\}")
                return None

        print("Job timeout")
        return None

    def investigate_domain(self, domain):
        """Comprehensive domain investigation"""
        print(f"Starting investigation of domain: \\\\{domain\\\\}")

        # Create graph
        graph_id = self.create_graph(f"Domain Investigation: \\\\{domain\\\\}")

        if not graph_id:
            return None

        # Add domain entity
        entity_id = self.add_entity(graph_id, "maltego.Domain", domain)

        if not entity_id:
            return None

        # Run domain investigation machine
        results = self.run_machine(graph_id, entity_id, "Domain Investigation")

        if results:
            self.results[domain] = \\\\{
                "graph_id": graph_id,
                "investigation_results": results,
                "timestamp": datetime.now().isoformat()
            \\\\}

            print(f"Investigation completed for \\\\{domain\\\\}")
            return results
        else:
            print(f"Investigation failed for \\\\{domain\\\\}")
            return None

    def investigate_email(self, email):
        """Comprehensive email investigation"""
        print(f"Starting investigation of email: \\\\{email\\\\}")

        # Create graph
        graph_id = self.create_graph(f"Email Investigation: \\\\{email\\\\}")

        if not graph_id:
            return None

        # Add email entity
        entity_id = self.add_entity(graph_id, "maltego.EmailAddress", email)

        if not entity_id:
            return None

        # Run email investigation transforms
        transforms = [
            "To Domain [Email]",
            "To Person [Email]",
            "To Social Network Profile",
            "To Breach Data"
        ]

        results = \\\\{\\\\}
        for transform in transforms:
            print(f"Running transform: \\\\{transform\\\\}")
            result = self.run_transform(graph_id, entity_id, transform)
            if result:
                results[transform] = result
            time.sleep(2)  # Rate limiting

        self.results[email] = \\\\{
            "graph_id": graph_id,
            "investigation_results": results,
            "timestamp": datetime.now().isoformat()
        \\\\}

        return results

    def investigate_person(self, person_name):
        """Comprehensive person investigation"""
        print(f"Starting investigation of person: \\\\{person_name\\\\}")

        # Create graph
        graph_id = self.create_graph(f"Person Investigation: \\\\{person_name\\\\}")

        if not graph_id:
            return None

        # Add person entity
        entity_id = self.add_entity(graph_id, "maltego.Person", person_name)

        if not entity_id:
            return None

        # Run person investigation machine
        results = self.run_machine(graph_id, entity_id, "Person Investigation")

        if results:
            self.results[person_name] = \\\\{
                "graph_id": graph_id,
                "investigation_results": results,
                "timestamp": datetime.now().isoformat()
            \\\\}

            print(f"Investigation completed for \\\\{person_name\\\\}")
            return results
        else:
            print(f"Investigation failed for \\\\{person_name\\\\}")
            return None

    def expuerto_results(self, output_file="maltego_results.json"):
        """Expuerto investigation results"""
        with open(output_file, 'w') as f:
            json.dump(self.results, f, indent=2)

        print(f"Results expuertoed to \\\\{output_file\\\\}")

    def generate_repuerto(self, output_file="maltego_repuerto.html"):
        """Generate HTML investigation repuerto"""

        html_template = """
<!DOCTYPE html>
<html>
<head>
    <title>Maltego OSINT Investigation Repuerto</title>
    <style>
        body \\\\{ font-family: Arial, sans-serif; margin: 20px; \\\\}
        .header \\\\{ background-color: #f0f0f0; padding: 20px; \\\\}
        .investigation \\\\{ margin: 20px 0; border: 1px solid #ccc; padding: 15px; \\\\}
        .entity \\\\{ margin: 10px 0; padding: 10px; background-color: #f9f9f9; \\\\}
        .results \\\\{ margin: 10px 0; \\\\}
    </style>
</head>
<body>
    <div class="header">
        <h1>Maltego OSINT Investigation Repuerto</h1>
        <p>Generated: \\\\{timestamp\\\\}</p>
        <p>Total Investigations: \\\\{total_investigations\\\\}</p>
    </div>

    \\\\{investigations\\\\}
</body>
</html>
        """

        investigations_html = ""
        for objetivo, data in self.results.items():
            investigations_html += f"""
            <div class="investigation">
                <h2>Investigation: \\\\{objetivo\\\\}</h2>
                <p>Timestamp: \\\\{data['timestamp']\\\\}</p>
                <p>Graph ID: \\\\{data['graph_id']\\\\}</p>
                <div class="results">
                    <h3>Results Summary</h3>
                    <p>Entities discovered: \\\\{len(data.get('investigation_results', \\\\{\\\\}))\\\\}</p>
                </div>
            </div>
            """

        html_content = html_template.format(
            timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            total_investigations=len(self.results),
            investigations=investigations_html
        )

        with open(output_file, 'w') as f:
            f.write(html_content)

        print(f"Repuerto generated: \\\\{output_file\\\\}")

# uso
if __name__ == "__main__":
    collector = MaltegoOSINTCollector("your_api_clave")

    # Investigate multiple objetivos
    objetivos = [
        ("domain", "ejemplo.com"),
        ("email", "user@ejemplo.com"),
        ("person", "John Doe")
    ]

    for objetivo_type, objetivo_value in objetivos:
        if objetivo_type == "domain":
            collector.investigate_domain(objetivo_value)
        elif objetivo_type == "email":
            collector.investigate_email(objetivo_value)
        elif objetivo_type == "person":
            collector.investigate_person(objetivo_value)

        time.sleep(5)  # Rate limiting between investigations

    # Expuerto results
    collector.expuerto_results()
    collector.generate_repuerto()

Bulk Entity procesoing

#!/usr/bin/env python3
# Bulk entity procesoing for Maltego

impuerto csv
impuerto json
impuerto time
from datetime impuerto datetime

class MaltegoBulkprocesoor:
    def __init__(self, maltego_collector):
        self.collector = maltego_collector
        self.procesoed_entities = []
        self.failed_entities = []

    def proceso_csv_file(self, csv_file, entity_type_column, entity_value_column):
        """proceso entities from CSV file"""

        with open(csv_file, 'r') as f:
            reader = csv.DictReader(f)

            for row in reader:
                entity_type = row[entity_type_column]
                entity_value = row[entity_value_column]

                print(f"procesoing \\\\{entity_type\\\\}: \\\\{entity_value\\\\}")

                try:
                    if entity_type.lower() == "domain":
                        result = self.collector.investigate_domain(entity_value)
                    elif entity_type.lower() == "email":
                        result = self.collector.investigate_email(entity_value)
                    elif entity_type.lower() == "person":
                        result = self.collector.investigate_person(entity_value)
                    else:
                        print(f"Unknown entity type: \\\\{entity_type\\\\}")
                        continue

                    if result:
                        self.procesoed_entities.append(\\\\{
                            "type": entity_type,
                            "value": entity_value,
                            "status": "success",
                            "timestamp": datetime.now().isoformat()
                        \\\\})
                    else:
                        self.failed_entities.append(\\\\{
                            "type": entity_type,
                            "value": entity_value,
                            "status": "failed",
                            "timestamp": datetime.now().isoformat()
                        \\\\})

                except Exception as e:
                    print(f"Error procesoing \\\\{entity_value\\\\}: \\\\{e\\\\}")
                    self.failed_entities.append(\\\\{
                        "type": entity_type,
                        "value": entity_value,
                        "status": "error",
                        "error": str(e),
                        "timestamp": datetime.now().isoformat()
                    \\\\})

                # Rate limiting
                time.sleep(10)

    def generate_procesoing_repuerto(self, output_file="bulk_procesoing_repuerto.json"):
        """Generate bulk procesoing repuerto"""

        repuerto = \\\\{
            "procesoing_summary": \\\\{
                "total_entities": len(self.procesoed_entities) + len(self.failed_entities),
                "successful": len(self.procesoed_entities),
                "failed": len(self.failed_entities),
                "success_rate": len(self.procesoed_entities) / (len(self.procesoed_entities) + len(self.failed_entities)) * 100
            \\\\},
            "procesoed_entities": self.procesoed_entities,
            "failed_entities": self.failed_entities,
            "repuerto_timestamp": datetime.now().isoformat()
        \\\\}

        with open(output_file, 'w') as f:
            json.dump(repuerto, f, indent=2)

        print(f"Bulk procesoing repuerto saved: \\\\{output_file\\\\}")
        return repuerto

# uso
collector = MaltegoOSINTCollector("your_api_clave")
procesoor = MaltegoBulkprocesoor(collector)

# proceso entities from CSV
procesoor.proceso_csv_file("entities.csv", "type", "value")

# Generate repuerto
procesoor.generate_procesoing_repuerto()

Transform Development Framework

#!/usr/bin/env python3
# Maltego transform development framework

from maltego_trx.maltego impuerto UIM_TYPES
from maltego_trx.transform impuerto DiscoverableTransform
impuerto requests
impuerto dns.resolver
impuerto whois
impuerto socket

class CustomDomainTransforms:
    """Collection of custom domain-related transforms"""

    class DomainToSubdomains(DiscoverableTransform):
        @classmethod
        def create_entities(cls, request, response):
            domain = request.Value

            # certificado Transparency subdomain discovery
            subdomains = cls.discover_subdomains_ct(domain)

            for subdomain in subdomains:
                entity = response.addEntity("maltego.Domain", subdomain)
                entity.addProperty("source", "certificado Transparency")

            return response

        @staticmethod
        def discover_subdomains_ct(domain):
            """Discover subdomains using certificado Transparency"""
            subdomains = set()

            try:
                url = f"https://crt.sh/?q=%.\\\\{domain\\\\}&output;=json"
                response = requests.get(url, timeout=30)

                if response.status_code == 200:
                    data = response.json()

                    for cert in data:
                        name_value = cert.get('name_value', '')
                        for name in name_value.split('\n'):
                            name = name.strip()
                            if name.endswith(f'.\\\\{domain\\\\}') and '*' not in name:
                                subdomains.add(name)

            except Exception as e:
                print(f"Error discovering subdomains: \\\\{e\\\\}")

            return list(subdomains)

    class DomainToTechnologies(DiscoverableTransform):
        @classmethod
        def create_entities(cls, request, response):
            domain = request.Value

            # Technology detection
            technologies = cls.detect_technologies(domain)

            for tech in technologies:
                entity = response.addEntity("maltego.Website", tech['name'])
                entity.addProperty("technology.version", tech.get('version', ''))
                entity.addProperty("technology.confidence", tech.get('confidence', ''))

            return response

        @staticmethod
        def detect_technologies(domain):
            """Detect web technologies"""
            technologies = []

            try:
                # Simple technology detection based on headers
                response = requests.get(f"http://\\\\{domain\\\\}", timeout=10)
                headers = response.headers

                # Server detection
                server = headers.get('Server', '')
                if server:
                    technologies.append(\\\\{
                        'name': server,
                        'type': 'Web Server',
                        'confidence': 'high'
                    \\\\})

                # Framework detection
                x_powered_by = headers.get('X-Powered-By', '')
                if x_powered_by:
                    technologies.append(\\\\{
                        'name': x_powered_by,
                        'type': 'Framework',
                        'confidence': 'high'
                    \\\\})

                # Content analysis
                content = response.text.lower()

                # WordPress detection
                if 'wp-content' in content or 'wordpress' in content:
                    technologies.append(\\\\{
                        'name': 'WordPress',
                        'type': 'CMS',
                        'confidence': 'high'
                    \\\\})

                # jQuery detection
                if 'jquery' in content:
                    technologies.append(\\\\{
                        'name': 'jQuery',
                        'type': 'JavaScript Library',
                        'confidence': 'medium'
                    \\\\})

            except Exception as e:
                print(f"Error detecting technologies: \\\\{e\\\\}")

            return technologies

    class DomainToWhoisInfo(DiscoverableTransform):
        @classmethod
        def create_entities(cls, request, response):
            domain = request.Value

            # WHOIS lookup
            whois_info = cls.get_whois_info(domain)

            if whois_info:
                # Add registrar
                if 'registrar' in whois_info:
                    entity = response.addEntity("maltego.Organization", whois_info['registrar'])
                    entity.addProperty("organization.type", "Registrar")

                # Add registrant
                if 'registrant' in whois_info:
                    entity = response.addEntity("maltego.Person", whois_info['registrant'])
                    entity.addProperty("person.role", "Registrant")

                # Add creation date
                if 'creation_date' in whois_info:
                    entity = response.addEntity("maltego.Date", str(whois_info['creation_date']))
                    entity.addProperty("date.type", "Domain Creation")

            return response

        @staticmethod
        def get_whois_info(domain):
            """Get WHOIS information"""
            try:
                w = whois.whois(domain)

                return \\\\{
                    'registrar': w.registrar,
                    'registrant': w.registrant,
                    'creation_date': w.creation_date,
                    'expiration_date': w.expiration_date,
                    'name_servers': w.name_servers
                \\\\}

            except Exception as e:
                print(f"Error getting WHOIS info: \\\\{e\\\\}")
                return None

# Transform registration
def register_transforms():
    """Register custom transforms"""
    transforms = [
        CustomDomainTransforms.DomainToSubdomains,
        CustomDomainTransforms.DomainToTechnologies,
        CustomDomainTransforms.DomainToWhoisInfo
    ]

    return transforms

# uso in Maltego
if __name__ == "__main__":
    # This would be called by Maltego transform runner
    transforms = register_transforms()
    print(f"Registered \\\\{len(transforms)\\\\} custom transforms")

Integration ejemplos

Inteligencia de Amenazas Integration

#!/usr/bin/env python3
# Maltego Inteligencia de Amenazas integration

impuerto requests
impuerto json

class ThreatIntelligenceIntegration:
    def __init__(self, maltego_collector):
        self.collector = maltego_collector
        self.threat_feeds = \\\\{
            "virustotal": "https://www.virustotal.com/vtapi/v2",
            "threatcrowd": "https://www.threatcrowd.org/searchApi/v2",
            "otx": "https://otx.alienvault.com/api/v1"
        \\\\}

    def enrich_with_threat_intel(self, graph_id, entity_id, entity_value, entity_type):
        """Enrich entity with Inteligencia de Amenazas"""

        threat_data = \\\\{\\\\}

        if entity_type == "domain":
            threat_data = self.check_domain_reputation(entity_value)
        elif entity_type == "ip":
            threat_data = self.check_ip_reputation(entity_value)
        elif entity_type == "hash":
            threat_data = self.check_file_reputation(entity_value)

        # Add Inteligencia de Amenazas entities to graph
        if threat_data:
            self.add_threat_entities(graph_id, entity_id, threat_data)

    def check_domain_reputation(self, domain):
        """Check domain reputation across threat feeds"""
        reputation_data = \\\\{\\\\}

        # VirusTotal domain repuerto
        try:
            vt_response = requests.get(
                f"\\\\{self.threat_feeds['virustotal']\\\\}/domain/repuerto",
                params=\\\\{"apiclave": "your_vt_api_clave", "domain": domain\\\\}
            )

            if vt_response.status_code == 200:
                vt_data = vt_response.json()
                reputation_data["virustotal"] = \\\\{
                    "positives": vt_data.get("positives", 0),
                    "total": vt_data.get("total", 0),
                    "scan_date": vt_data.get("scan_date", "")
                \\\\}
        except Exception as e:
            print(f"Error checking VirusTotal: \\\\{e\\\\}")

        # ThreatCrowd domain repuerto
        try:
            tc_response = requests.get(
                f"\\\\{self.threat_feeds['threatcrowd']\\\\}/domain/repuerto",
                params=\\\\{"domain": domain\\\\}
            )

            if tc_response.status_code == 200:
                tc_data = tc_response.json()
                reputation_data["threatcrowd"] = \\\\{
                    "votes": tc_data.get("votes", 0),
                    "referencias": tc_data.get("referencias", []),
                    "resolutions": tc_data.get("resolutions", [])
                \\\\}
        except Exception as e:
            print(f"Error checking ThreatCrowd: \\\\{e\\\\}")

        return reputation_data

    def add_threat_entities(self, graph_id, entity_id, threat_data):
        """Add Inteligencia de Amenazas entities to graph"""

        for source, data in threat_data.items():
            # Add Inteligencia de Amenazas source
            ti_entity_id = self.collector.add_entity(
                graph_id,
                "maltego.ThreatIntelligence",
                source,
                \\\\{"source": source, "data": json.dumps(data)\\\\}
            )

            # Link to original entity
            if ti_entity_id:
                self.collector.add_link(graph_id, entity_id, ti_entity_id, "threat_intelligence")

# uso
collector = MaltegoOSINTCollector("your_api_clave")
threat_intel = ThreatIntelligenceIntegration(collector)

# Investigate domain with Inteligencia de Amenazas
graph_id = collector.create_graph("Threat Intel Investigation")
entity_id = collector.add_entity(graph_id, "maltego.Domain", "suspicious-domain.com")
threat_intel.enrich_with_threat_intel(graph_id, entity_id, "suspicious-domain.com", "domain")

Social Media Integration

#!/usr/bin/env python3
# Maltego social media integration

impuerto tweepy
impuerto facebook
impuerto linkedin

class SocialMediaIntegration:
    def __init__(self, maltego_collector, api_claves):
        self.collector = maltego_collector
        self.api_claves = api_claves
        self.setup_apis()

    def setup_apis(self):
        """Setup social media API conexións"""

        # Twitter API
        if "twitter" in self.api_claves:
            auth = tweepy.OAuthHandler(
                self.api_claves["twitter"]["consumer_clave"],
                self.api_claves["twitter"]["consumer_secret"]
            )
            auth.set_access_token(
                self.api_claves["twitter"]["access_token"],
                self.api_claves["twitter"]["access_token_secret"]
            )
            self.twitter_api = tweepy.API(auth)

        # Facebook API
        if "facebook" in self.api_claves:
            self.facebook_api = facebook.GraphAPI(
                access_token=self.api_claves["facebook"]["access_token"]
            )

    def search_social_profiles(self, person_name):
        """Search for social media profiles"""
        profiles = \\\\{\\\\}

        # Twitter search
        try:
            twitter_users = self.twitter_api.search_users(person_name, count=10)
            profiles["twitter"] = [
                \\\\{
                    "nombre de usuario": user.screen_name,
                    "name": user.name,
                    "followers": user.followers_count,
                    "verified": user.verified
                \\\\}
                for user in twitter_users
            ]
        except Exception as e:
            print(f"Error searching Twitter: \\\\{e\\\\}")

        return profiles

    def analyze_social_conexións(self, nombre de usuario, platform):
        """Analyze social media conexións"""
        conexións = []

        if platform == "twitter":
            try:
                # Get followers
                followers = self.twitter_api.get_followers(screen_name=nombre de usuario, count=100)

                for follower in followers:
                    conexións.append(\\\\{
                        "type": "follower",
                        "nombre de usuario": follower.screen_name,
                        "name": follower.name,
                        "relationship": "follows"
                    \\\\})

            except Exception as e:
                print(f"Error analyzing Twitter conexións: \\\\{e\\\\}")

        return conexións

# uso
api_claves = \\\\{
    "twitter": \\\\{
        "consumer_clave": "your_consumer_clave",
        "consumer_secret": "your_consumer_secret",
        "access_token": "your_access_token",
        "access_token_secret": "your_access_token_secret"
    \\\\}
\\\\}

collector = MaltegoOSINTCollector("your_api_clave")
social_media = SocialMediaIntegration(collector, api_claves)

# Search for social profiles
profiles = social_media.search_social_profiles("John Doe")

solución de problemas

Common Issues

Transform Errors:

# Check transform configuración
# 1. Verify API claves are correct
# 2. Check transform permissions
# 3. Validate input entity types
# 4. Review transform logs

# Debug transform execution
# 1. Run transform manually
# 2. Check network connectivity
# 3. Verify data source availability
# 4. Review rate limiting settings

# Transform timeout issues
# 1. Increase timeout settings
# 2. Optimize transform code
# 3. Implement proper error handling
# 4. Add retry mechanisms

Performance Issues:

# Optimize graph performance
# 1. Limit entity count per transform
# 2. Use filtering to reduce noise
# 3. Implement entity grouping
# 4. Regular graph cleanup

# Memory management
# 1. Close unused graphs
# 2. Clear transform cache
# 3. Restart Maltego periodically
# 4. Monitor system resources

# Network optimization
# 1. Configure proxy settings
# 2. Implement conexión pooling
# 3. Use local data sources when possible
# 4. Optimize API uso patterns

Data Quality Issues:

# Validate data sources
# 1. Check data freshness
# 2. Verify source reliability
# 3. Cross-reference multiple sources
# 4. Implement data validation

# Handle incomplete data
# 1. Implement fallback sources
# 2. Use data enrichment servicios
# 3. Manual verification procesoes
# 4. Document data limitations

Debugging

Enable detailed debugging and logging:

# Enable Maltego debugging
# 1. Help -> Debug Information
# 2. Enable verbose logging
# 3. Check log files in:
#    - Windows: %APPDATA%\Maltego\logs
#    - Linux: ~/.maltego/logs
#    - macOS: ~/Library/Application Suppuerto/Maltego/logs

# Transform debugging
# 1. Add debug prints to transform code
# 2. Use transform testing framework
# 3. Monitor API responses
# 4. Check entity property validation

# Network debugging
# 1. Use network monitoring tools
# 2. Check proxy configuracións
# 3. Verify SSL certificados
# 4. Monitor API rate limits

Security Considerations

Operational Security

Data Protection: - Encrypt sensitive investigation data - Use secure communication channels - Implement proper Control de Accesos - Regular security assessments - Secure disposal of investigation artifacts

Seguridad de API: - Secure storage of API claves - Regular clave rotation - Monitor API uso patterns - Implement rate limiting - Use least privilege principles

Privacy Compliance: - Respect privacy laws and regulations - Obtain proper autorización for investigations - Implement data minimization principles - Secure handling of personal information - Regular compliance audits

Ethical OSINT: - Use only publicly available information - Respect terms of servicio - Avoid ingeniería social - Maintain professional standards - Document investigation methodology

referencias

  1. Maltego Official Website
  2. Maltego documentación
  3. Maltego Transform Hub
  4. OSINT Framework
  5. Open Source Intelligence Techniques