Zum Inhalt

Maltego Cheat Blatt

generieren

Überblick

Maltego ist ein umfassendes Open Source Intelligence (OSINT) und grafisches Link-Analyse-Tool von Paterva, das es den Ermittlern ermöglicht, Beziehungen zwischen Menschen, Organisationen, Domänen, IP-Adressen und anderen Organisationen zu sammeln, zu analysieren und zu visualisieren. Die Plattform zeichnet sich durch ihren einzigartigen graphischen Visualisierungsansatz durch die Transformation von unterschiedlichen Informationsstücken in handlungsfähige Intelligenz aus, wodurch komplexe Zusammenhänge und Muster sofort für Analysten sichtbar werden. Maltego ist der Branchenstandard für OSINT-Untersuchungen geworden, der weltweit von Strafverfolgungsbehörden, Geheimdienststellen, Sicherheitsteams und Penetrationsprüfern eingesetzt wird.

Die Kernkraft von Maltego liegt in seinem Transformationssystem, das den Prozess der Erfassung von Informationen aus verschiedenen Datenquellen automatisiert und in einem intuitiven grafischen Format präsentiert. Transforms sind automatisierte Abfragen, die eine Eingabeeinheit (wie z.B. einen Domainnamen) und zurückgebende (z.B. IP-Adressen, E-Mail-Adressen oder zugehörige Domains) zusammen mit ihren Beziehungen aufnehmen. Diese Automatisierungsfähigkeit ermöglicht es den Ermittlern, ihr Verständnis der digitalen Fußabdrücke eines Ziels rasch zu erweitern und dabei den visuellen Kontext zu bewahren, wie sich verschiedene Informationsstücke miteinander verbinden.

Maltegos Vielseitigkeit erstreckt sich über traditionelle OSINT-Untersuchungen hinaus, um Bedrohungsanalysen, Betrugsuntersuchungen, digitale Forensik-Unterstützung und soziale Netzwerkanalyse einzubeziehen. Die Plattform unterstützt sowohl manuelle Untersuchungstechniken als auch den automatisierten Datenabbau und eignet sich sowohl für gezielte Untersuchungen als auch für breit angelegte Geheimdienste. Seine Fähigkeit, mit zahlreichen Datenquellen durch Transformationen zu integrieren, kombiniert mit seiner leistungsstarken Visualisierungsmaschine, macht Maltego zu einem unverzichtbaren Werkzeug für jede Organisation, die umfassende Intelligenz Analysefähigkeiten erfordert.

Installation

Windows Installation

Installieren von Maltego auf Windows-Systemen:

```bash

Download Maltego installer

Visit: https://www.maltego.com/downloads/

Run installer as administrator

maltego-4.6.0-windows.exe

Choose installation directory

Default: C:\Program Files\Maltego

Configure Java settings

Minimum 4GB RAM recommended

Java 8 or higher required

Launch Maltego

Start Menu -> Maltego

Initial setup

Create Maltego account

Choose license type (Community/Commercial)

Configure transform hub

```_

Linux Installation

Installieren von Maltego auf Linux Distributionen:

```bash

Download Linux installer

wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.deb

Install on Ubuntu/Debian

sudo dpkg -i Maltego.v4.6.0.deb sudo apt-get install -f

Install dependencies if needed

sudo apt install openjdk-11-jre

Alternative: Download tar.gz

wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.linux.zip unzip Maltego.v4.6.0.linux.zip cd Maltego.v4.6.0

Make executable

chmod +x bin/maltego

Run Maltego

./bin/maltego

Create desktop shortcut

cp maltego.desktop ~/.local/share/applications/ ```_

macOS Installation

```bash

Download macOS installer

Visit: https://www.maltego.com/downloads/

Install DMG package

Drag Maltego to Applications folder

Install Java if needed

brew install openjdk@11

Launch from Applications

Or from terminal

/Applications/Maltego.app/Contents/MacOS/maltego

Configure security settings

System Preferences -> Security & Privacy

Allow Maltego to run

```_

Docker Installation

```bash

Create Maltego Docker environment

cat > Dockerfile << 'EOF' FROM ubuntu:20.04 ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \ openjdk-11-jre wget unzip \ xvfb x11vnc fluxbox

Download and install Maltego

RUN wget https://downloads.maltego.com/maltego-v4/linux/Maltego.v4.6.0.linux.zip RUN unzip Maltego.v4.6.0.linux.zip RUN mv Maltego.v4.6.0 /opt/maltego

Setup VNC

EXPOSE 5900

Start script

COPY start.sh /start.sh RUN chmod +x /start.sh

CMD ["/start.sh"] EOF

Create start script

cat > start.sh << 'EOF'

!/bin/bash

Xvfb :1 -screen 0 1024x768x16 & export DISPLAY=:1 fluxbox & x11vnc -display :1 -nopw -listen localhost -xkb & cd /opt/maltego ./bin/maltego EOF

Build and run

docker build -t maltego-osint . docker run -p 5900:5900 maltego-osint ```_

Basisnutzung

Erster Setup

Maltego für den ersten Gebrauch einrichten:

```bash

Launch Maltego

maltego

Account Setup

1. Create Maltego account or login

2. Choose license type:

- Community Edition (free, limited)

- Commercial License (full features)

Transform Hub Configuration

1. Access Transform Hub

2. Install transform packages:

- Standard Transforms

- Social Networks

- Threat Intelligence

- Infrastructure

Machine Configuration

1. Configure machines for automated investigations

2. Set up API keys for data sources

3. Configure proxy settings if needed

Workspace Setup

1. Create new graph

2. Configure entity palette

3. Set up custom entity types if needed

```_

Grundlagenforschung

Durchführung grundlegender OSINT-Untersuchungen:

```bash

Starting an Investigation

1. Create new graph

2. Add initial entity (domain, person, email)

3. Right-click entity -> Run Transform

Domain Investigation

Add Domain entity: example.com

Run transforms:

- To DNS Name [DNS]

- To IP Address [DNS]

- To MX Record [DNS]

- To Website [HTTP]

Email Investigation

Add Email Address entity: user@example.com

Run transforms:

- To Domain [Email]

- To Person [Email]

- To Social Network Profile

Person Investigation

Add Person entity: John Doe

Run transforms:

- To Email Address [Search Engines]

- To Social Network Profile

- To Phone Number

- To Address

```_

Entry Management

Arbeiten mit Einrichtungen und Beziehungen:

```bash

Adding Entities

1. Drag from Entity Palette

2. Right-click graph -> Add Entity

3. Import from file (CSV, XML)

Entity Properties

1. Select entity

2. View Properties tab

3. Edit entity details

4. Add custom properties

Relationship Management

1. Select two entities

2. Right-click -> Add Link

3. Choose link type

4. Set link properties

Entity Grouping

1. Select multiple entities

2. Right-click -> Group

3. Name the group

4. Set group properties

```_

Erweiterte Funktionen

Zolltransformatoren

Erstellen und Konfigurieren von benutzerdefinierten Transformationen:

```bash

Transform Development

1. Access Transform Hub

2. Create new transform

3. Configure input/output entities

4. Write transform code

Python Transform Example

!/usr/bin/env python3

from maltego_trx.maltego import UIM_TYPES from maltego_trx.transform import DiscoverableTransform

class DomainToSubdomains(DiscoverableTransform): @classmethod def create_entities(cls, request, response): domain = request.Value

    # Custom subdomain discovery logic
    subdomains = discover_subdomains(domain)

    for subdomain in subdomains:
        response.addEntity("maltego.Domain", subdomain)

    return response

def discover_subdomains(domain): # Implementation for subdomain discovery # Using DNS queries, certificate transparency, etc. pass

Transform Configuration

1. Set transform name and description

2. Configure input entity types

3. Set output entity types

4. Configure authentication if needed

5. Test transform functionality

```_

Maschinenkonfiguration

Einrichtung automatisierter Prüfmaschinen:

```bash

Creating Investigation Machines

1. Access Machines tab

2. Create new machine

3. Add transform sequence

4. Configure parameters

Domain Investigation Machine

Sequence:

1. Domain -> DNS Names

2. Domain -> IP Addresses

3. Domain -> MX Records

4. Domain -> Subdomains

5. IP Address -> Geolocation

6. IP Address -> Network Block

Person Investigation Machine

Sequence:

1. Person -> Email Addresses

2. Email -> Domains

3. Person -> Social Profiles

4. Person -> Phone Numbers

5. Phone -> Location

Machine Parameters

- Maximum entities per transform

- Transform timeout settings

- API rate limiting

- Output filtering rules

```_

Datenintegration

Integration externer Datenquellen:

```bash

API Integration

1. Configure API keys in Transform Hub

2. Available APIs:

- VirusTotal

- Shodan

- PassiveTotal

- ThreatCrowd

- Social media APIs

Custom Data Sources

1. Create CSV import templates

2. Configure entity mappings

3. Import data files

4. Validate imported entities

Database Integration

1. Configure database connections

2. Create SQL-based transforms

3. Map database fields to entities

4. Set up automated queries

File Import/Export

Supported formats:

- CSV (entities and links)

- XML (Maltego format)

- GraphML

- JSON

- KML (geolocation data)

```_

Erweiterte Visualisierung

Advanced Graph Visualisierung und Analyse:

```bash

Layout Algorithms

1. Hierarchical Layout

2. Circular Layout

3. Organic Layout

4. Force-Directed Layout

5. Geographic Layout

Filtering and Selection

1. Filter by entity type

2. Filter by property values

3. Filter by link types

4. Time-based filtering

5. Custom filter expressions

Graph Analysis

1. Centrality analysis

2. Clustering detection

3. Path analysis

4. Network metrics

5. Pattern recognition

Visualization Customization

1. Entity icons and colors

2. Link styles and colors

3. Label customization

4. Size scaling by properties

5. Conditional formatting

```_

Automatisierungsskripte

Automatisierte OSINT Sammlung

```python

!/usr/bin/env python3

Automated OSINT collection using Maltego API

import requests import json import time from datetime import datetime

class MaltegoOSINTCollector: def init(self, api_key, base_url="https://api.maltego.com"): self.api_key = api_key self.base_url = base_url self.headers = \\{ "Authorization": f"Bearer \\{api_key\\}", "Content-Type": "application/json" \\} self.results = \\{\\}

def create_graph(self, name, description=""):
    """Create new investigation graph"""
    data = \\\\{
        "name": name,
        "description": description,
        "created": datetime.now().isoformat()
    \\\\}

    response = requests.post(
        f"\\\\{self.base_url\\\\}/graphs",
        headers=self.headers,
        json=data
    )

    if response.status_code == 201:
        graph_data = response.json()
        return graph_data["id"]
    else:
        print(f"Error creating graph: \\\\{response.status_code\\\\}")
        return None

def add_entity(self, graph_id, entity_type, value, properties=None):
    """Add entity to graph"""
    data = \\\\{
        "type": entity_type,
        "value": value,
        "properties": properties or \\\\{\\\\}
    \\\\}

    response = requests.post(
        f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/entities",
        headers=self.headers,
        json=data
    )

    if response.status_code == 201:
        return response.json()["id"]
    else:
        print(f"Error adding entity: \\\\{response.status_code\\\\}")
        return None

def run_transform(self, graph_id, entity_id, transform_name):
    """Run transform on entity"""
    data = \\\\{
        "transform": transform_name,
        "entity_id": entity_id
    \\\\}

    response = requests.post(
        f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/transforms",
        headers=self.headers,
        json=data
    )

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error running transform: \\\\{response.status_code\\\\}")
        return None

def run_machine(self, graph_id, entity_id, machine_name):
    """Run investigation machine"""
    data = \\\\{
        "machine": machine_name,
        "entity_id": entity_id,
        "max_entities": 100,
        "timeout": 300
    \\\\}

    response = requests.post(
        f"\\\\{self.base_url\\\\}/graphs/\\\\{graph_id\\\\}/machines",
        headers=self.headers,
        json=data
    )

    if response.status_code == 200:
        job_id = response.json()["job_id"]
        return self.wait_for_job(job_id)
    else:
        print(f"Error running machine: \\\\{response.status_code\\\\}")
        return None

def wait_for_job(self, job_id, timeout=600):
    """Wait for job completion"""
    start_time = time.time()

    while time.time() - start_time < timeout:
        response = requests.get(
            f"\\\\{self.base_url\\\\}/jobs/\\\\{job_id\\\\}",
            headers=self.headers
        )

        if response.status_code == 200:
            job_data = response.json()
            status = job_data["status"]

            if status == "completed":
                return job_data["results"]
            elif status == "failed":
                print(f"Job failed: \\\\{job_data.get('error', 'Unknown error')\\\\}")
                return None
            else:
                time.sleep(10)  # Wait 10 seconds before checking again
        else:
            print(f"Error checking job status: \\\\{response.status_code\\\\}")
            return None

    print("Job timeout")
    return None

def investigate_domain(self, domain):
    """Comprehensive domain investigation"""
    print(f"Starting investigation of domain: \\\\{domain\\\\}")

    # Create graph
    graph_id = self.create_graph(f"Domain Investigation: \\\\{domain\\\\}")

    if not graph_id:
        return None

    # Add domain entity
    entity_id = self.add_entity(graph_id, "maltego.Domain", domain)

    if not entity_id:
        return None

    # Run domain investigation machine
    results = self.run_machine(graph_id, entity_id, "Domain Investigation")

    if results:
        self.results[domain] = \\\\{
            "graph_id": graph_id,
            "investigation_results": results,
            "timestamp": datetime.now().isoformat()
        \\\\}

        print(f"Investigation completed for \\\\{domain\\\\}")
        return results
    else:
        print(f"Investigation failed for \\\\{domain\\\\}")
        return None

def investigate_email(self, email):
    """Comprehensive email investigation"""
    print(f"Starting investigation of email: \\\\{email\\\\}")

    # Create graph
    graph_id = self.create_graph(f"Email Investigation: \\\\{email\\\\}")

    if not graph_id:
        return None

    # Add email entity
    entity_id = self.add_entity(graph_id, "maltego.EmailAddress", email)

    if not entity_id:
        return None

    # Run email investigation transforms
    transforms = [
        "To Domain [Email]",
        "To Person [Email]",
        "To Social Network Profile",
        "To Breach Data"
    ]

    results = \\\\{\\\\}
    for transform in transforms:
        print(f"Running transform: \\\\{transform\\\\}")
        result = self.run_transform(graph_id, entity_id, transform)
        if result:
            results[transform] = result
        time.sleep(2)  # Rate limiting

    self.results[email] = \\\\{
        "graph_id": graph_id,
        "investigation_results": results,
        "timestamp": datetime.now().isoformat()
    \\\\}

    return results

def investigate_person(self, person_name):
    """Comprehensive person investigation"""
    print(f"Starting investigation of person: \\\\{person_name\\\\}")

    # Create graph
    graph_id = self.create_graph(f"Person Investigation: \\\\{person_name\\\\}")

    if not graph_id:
        return None

    # Add person entity
    entity_id = self.add_entity(graph_id, "maltego.Person", person_name)

    if not entity_id:
        return None

    # Run person investigation machine
    results = self.run_machine(graph_id, entity_id, "Person Investigation")

    if results:
        self.results[person_name] = \\\\{
            "graph_id": graph_id,
            "investigation_results": results,
            "timestamp": datetime.now().isoformat()
        \\\\}

        print(f"Investigation completed for \\\\{person_name\\\\}")
        return results
    else:
        print(f"Investigation failed for \\\\{person_name\\\\}")
        return None

def export_results(self, output_file="maltego_results.json"):
    """Export investigation results"""
    with open(output_file, 'w') as f:
        json.dump(self.results, f, indent=2)

    print(f"Results exported to \\\\{output_file\\\\}")

def generate_report(self, output_file="maltego_report.html"):
    """Generate HTML investigation report"""

    html_template = """
Maltego OSINT Investigation Report

Maltego OSINT Investigation Report

Generated: \\\\{timestamp\\\\}

Total Investigations: \\\\{total_investigations\\\\}

\\\\{investigations\\\\}
    """

    investigations_html = ""
    for target, data in self.results.items():
        investigations_html += f"""
        <div class="investigation">
            <h2>Investigation: \\\\{target\\\\}</h2>
            <p>Timestamp: \\\\{data['timestamp']\\\\}</p>
            <p>Graph ID: \\\\{data['graph_id']\\\\}</p>
            <div class="results">
                <h3>Results Summary</h3>
                <p>Entities discovered: \\\\{len(data.get('investigation_results', \\\\{\\\\}))\\\\}</p>
            </div>
        </div>
        """

    html_content = html_template.format(
        timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        total_investigations=len(self.results),
        investigations=investigations_html
    )

    with open(output_file, 'w') as f:
        f.write(html_content)

    print(f"Report generated: \\\\{output_file\\\\}")

Usage

if name == "main": collector = MaltegoOSINTCollector("your_api_key")

# Investigate multiple targets
targets = [
    ("domain", "example.com"),
    ("email", "user@example.com"),
    ("person", "John Doe")
]

for target_type, target_value in targets:
    if target_type == "domain":
        collector.investigate_domain(target_value)
    elif target_type == "email":
        collector.investigate_email(target_value)
    elif target_type == "person":
        collector.investigate_person(target_value)

    time.sleep(5)  # Rate limiting between investigations

# Export results
collector.export_results()
collector.generate_report()

```_

Bulk Entity Processing

```python

!/usr/bin/env python3

Bulk entity processing for Maltego

import csv import json import time from datetime import datetime

class MaltegoBulkProcessor: def init(self, maltego_collector): self.collector = maltego_collector self.processed_entities = [] self.failed_entities = []

def process_csv_file(self, csv_file, entity_type_column, entity_value_column):
    """Process entities from CSV file"""

    with open(csv_file, 'r') as f:
        reader = csv.DictReader(f)

        for row in reader:
            entity_type = row[entity_type_column]
            entity_value = row[entity_value_column]

            print(f"Processing \\\\{entity_type\\\\}: \\\\{entity_value\\\\}")

            try:
                if entity_type.lower() == "domain":
                    result = self.collector.investigate_domain(entity_value)
                elif entity_type.lower() == "email":
                    result = self.collector.investigate_email(entity_value)
                elif entity_type.lower() == "person":
                    result = self.collector.investigate_person(entity_value)
                else:
                    print(f"Unknown entity type: \\\\{entity_type\\\\}")
                    continue

                if result:
                    self.processed_entities.append(\\\\{
                        "type": entity_type,
                        "value": entity_value,
                        "status": "success",
                        "timestamp": datetime.now().isoformat()
                    \\\\})
                else:
                    self.failed_entities.append(\\\\{
                        "type": entity_type,
                        "value": entity_value,
                        "status": "failed",
                        "timestamp": datetime.now().isoformat()
                    \\\\})

            except Exception as e:
                print(f"Error processing \\\\{entity_value\\\\}: \\\\{e\\\\}")
                self.failed_entities.append(\\\\{
                    "type": entity_type,
                    "value": entity_value,
                    "status": "error",
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                \\\\})

            # Rate limiting
            time.sleep(10)

def generate_processing_report(self, output_file="bulk_processing_report.json"):
    """Generate bulk processing report"""

    report = \\\\{
        "processing_summary": \\\\{
            "total_entities": len(self.processed_entities) + len(self.failed_entities),
            "successful": len(self.processed_entities),
            "failed": len(self.failed_entities),
            "success_rate": len(self.processed_entities) / (len(self.processed_entities) + len(self.failed_entities)) * 100
        \\\\},
        "processed_entities": self.processed_entities,
        "failed_entities": self.failed_entities,
        "report_timestamp": datetime.now().isoformat()
    \\\\}

    with open(output_file, 'w') as f:
        json.dump(report, f, indent=2)

    print(f"Bulk processing report saved: \\\\{output_file\\\\}")
    return report

Usage

collector = MaltegoOSINTCollector("your_api_key") processor = MaltegoBulkProcessor(collector)

Process entities from CSV

processor.process_csv_file("entities.csv", "type", "value")

Generate report

processor.generate_processing_report() ```_

Transformer Entwicklungsrahmen

```python

!/usr/bin/env python3

Maltego transform development framework

from maltego_trx.maltego import UIM_TYPES from maltego_trx.transform import DiscoverableTransform import requests import dns.resolver import whois import socket

class CustomDomainTransforms: """Collection of custom domain-related transforms"""

class DomainToSubdomains(DiscoverableTransform):
    @classmethod
    def create_entities(cls, request, response):
        domain = request.Value

        # Certificate Transparency subdomain discovery
        subdomains = cls.discover_subdomains_ct(domain)

        for subdomain in subdomains:
            entity = response.addEntity("maltego.Domain", subdomain)
            entity.addProperty("source", "Certificate Transparency")

        return response

    @staticmethod
    def discover_subdomains_ct(domain):
        """Discover subdomains using Certificate Transparency"""
        subdomains = set()

        try:
            url = f"https://crt.sh/?q=%.\\\\{domain\\\\}&output;=json"
            response = requests.get(url, timeout=30)

            if response.status_code == 200:
                data = response.json()

                for cert in data:
                    name_value = cert.get('name_value', '')
                    for name in name_value.split('\n'):
                        name = name.strip()
                        if name.endswith(f'.\\\\{domain\\\\}') and '*' not in name:
                            subdomains.add(name)

        except Exception as e:
            print(f"Error discovering subdomains: \\\\{e\\\\}")

        return list(subdomains)

class DomainToTechnologies(DiscoverableTransform):
    @classmethod
    def create_entities(cls, request, response):
        domain = request.Value

        # Technology detection
        technologies = cls.detect_technologies(domain)

        for tech in technologies:
            entity = response.addEntity("maltego.Website", tech['name'])
            entity.addProperty("technology.version", tech.get('version', ''))
            entity.addProperty("technology.confidence", tech.get('confidence', ''))

        return response

    @staticmethod
    def detect_technologies(domain):
        """Detect web technologies"""
        technologies = []

        try:
            # Simple technology detection based on headers
            response = requests.get(f"http://\\\\{domain\\\\}", timeout=10)
            headers = response.headers

            # Server detection
            server = headers.get('Server', '')
            if server:
                technologies.append(\\\\{
                    'name': server,
                    'type': 'Web Server',
                    'confidence': 'high'
                \\\\})

            # Framework detection
            x_powered_by = headers.get('X-Powered-By', '')
            if x_powered_by:
                technologies.append(\\\\{
                    'name': x_powered_by,
                    'type': 'Framework',
                    'confidence': 'high'
                \\\\})

            # Content analysis
            content = response.text.lower()

            # WordPress detection
            if 'wp-content' in content or 'wordpress' in content:
                technologies.append(\\\\{
                    'name': 'WordPress',
                    'type': 'CMS',
                    'confidence': 'high'
                \\\\})

            # jQuery detection
            if 'jquery' in content:
                technologies.append(\\\\{
                    'name': 'jQuery',
                    'type': 'JavaScript Library',
                    'confidence': 'medium'
                \\\\})

        except Exception as e:
            print(f"Error detecting technologies: \\\\{e\\\\}")

        return technologies

class DomainToWhoisInfo(DiscoverableTransform):
    @classmethod
    def create_entities(cls, request, response):
        domain = request.Value

        # WHOIS lookup
        whois_info = cls.get_whois_info(domain)

        if whois_info:
            # Add registrar
            if 'registrar' in whois_info:
                entity = response.addEntity("maltego.Organization", whois_info['registrar'])
                entity.addProperty("organization.type", "Registrar")

            # Add registrant
            if 'registrant' in whois_info:
                entity = response.addEntity("maltego.Person", whois_info['registrant'])
                entity.addProperty("person.role", "Registrant")

            # Add creation date
            if 'creation_date' in whois_info:
                entity = response.addEntity("maltego.Date", str(whois_info['creation_date']))
                entity.addProperty("date.type", "Domain Creation")

        return response

    @staticmethod
    def get_whois_info(domain):
        """Get WHOIS information"""
        try:
            w = whois.whois(domain)

            return \\\\{
                'registrar': w.registrar,
                'registrant': w.registrant,
                'creation_date': w.creation_date,
                'expiration_date': w.expiration_date,
                'name_servers': w.name_servers
            \\\\}

        except Exception as e:
            print(f"Error getting WHOIS info: \\\\{e\\\\}")
            return None

Transform registration

def register_transforms(): """Register custom transforms""" transforms = [ CustomDomainTransforms.DomainToSubdomains, CustomDomainTransforms.DomainToTechnologies, CustomDomainTransforms.DomainToWhoisInfo ]

return transforms

Usage in Maltego

if name == "main": # This would be called by Maltego transform runner transforms = register_transforms() print(f"Registered \\{len(transforms)\\} custom transforms") ```_

Integrationsbeispiele

Threat Intelligence Integration

```python

!/usr/bin/env python3

Maltego threat intelligence integration

import requests import json

class ThreatIntelligenceIntegration: def init(self, maltego_collector): self.collector = maltego_collector self.threat_feeds = \\{ "virustotal": "https://www.virustotal.com/vtapi/v2", "threatcrowd": "https://www.threatcrowd.org/searchApi/v2", "otx": "https://otx.alienvault.com/api/v1" \\}

def enrich_with_threat_intel(self, graph_id, entity_id, entity_value, entity_type):
    """Enrich entity with threat intelligence"""

    threat_data = \\\\{\\\\}

    if entity_type == "domain":
        threat_data = self.check_domain_reputation(entity_value)
    elif entity_type == "ip":
        threat_data = self.check_ip_reputation(entity_value)
    elif entity_type == "hash":
        threat_data = self.check_file_reputation(entity_value)

    # Add threat intelligence entities to graph
    if threat_data:
        self.add_threat_entities(graph_id, entity_id, threat_data)

def check_domain_reputation(self, domain):
    """Check domain reputation across threat feeds"""
    reputation_data = \\\\{\\\\}

    # VirusTotal domain report
    try:
        vt_response = requests.get(
            f"\\\\{self.threat_feeds['virustotal']\\\\}/domain/report",
            params=\\\\{"apikey": "your_vt_api_key", "domain": domain\\\\}
        )

        if vt_response.status_code == 200:
            vt_data = vt_response.json()
            reputation_data["virustotal"] = \\\\{
                "positives": vt_data.get("positives", 0),
                "total": vt_data.get("total", 0),
                "scan_date": vt_data.get("scan_date", "")
            \\\\}
    except Exception as e:
        print(f"Error checking VirusTotal: \\\\{e\\\\}")

    # ThreatCrowd domain report
    try:
        tc_response = requests.get(
            f"\\\\{self.threat_feeds['threatcrowd']\\\\}/domain/report",
            params=\\\\{"domain": domain\\\\}
        )

        if tc_response.status_code == 200:
            tc_data = tc_response.json()
            reputation_data["threatcrowd"] = \\\\{
                "votes": tc_data.get("votes", 0),
                "references": tc_data.get("references", []),
                "resolutions": tc_data.get("resolutions", [])
            \\\\}
    except Exception as e:
        print(f"Error checking ThreatCrowd: \\\\{e\\\\}")

    return reputation_data

def add_threat_entities(self, graph_id, entity_id, threat_data):
    """Add threat intelligence entities to graph"""

    for source, data in threat_data.items():
        # Add threat intelligence source
        ti_entity_id = self.collector.add_entity(
            graph_id,
            "maltego.ThreatIntelligence",
            source,
            \\\\{"source": source, "data": json.dumps(data)\\\\}
        )

        # Link to original entity
        if ti_entity_id:
            self.collector.add_link(graph_id, entity_id, ti_entity_id, "threat_intelligence")

Usage

collector = MaltegoOSINTCollector("your_api_key") threat_intel = ThreatIntelligenceIntegration(collector)

Investigate domain with threat intelligence

graph_id = collector.create_graph("Threat Intel Investigation") entity_id = collector.add_entity(graph_id, "maltego.Domain", "suspicious-domain.com") threat_intel.enrich_with_threat_intel(graph_id, entity_id, "suspicious-domain.com", "domain") ```_

Social Media Integration

```python

!/usr/bin/env python3

Maltego social media integration

import tweepy import facebook import linkedin

class SocialMediaIntegration: def init(self, maltego_collector, api_keys): self.collector = maltego_collector self.api_keys = api_keys self.setup_apis()

def setup_apis(self):
    """Setup social media API connections"""

    # Twitter API
    if "twitter" in self.api_keys:
        auth = tweepy.OAuthHandler(
            self.api_keys["twitter"]["consumer_key"],
            self.api_keys["twitter"]["consumer_secret"]
        )
        auth.set_access_token(
            self.api_keys["twitter"]["access_token"],
            self.api_keys["twitter"]["access_token_secret"]
        )
        self.twitter_api = tweepy.API(auth)

    # Facebook API
    if "facebook" in self.api_keys:
        self.facebook_api = facebook.GraphAPI(
            access_token=self.api_keys["facebook"]["access_token"]
        )

def search_social_profiles(self, person_name):
    """Search for social media profiles"""
    profiles = \\\\{\\\\}

    # Twitter search
    try:
        twitter_users = self.twitter_api.search_users(person_name, count=10)
        profiles["twitter"] = [
            \\\\{
                "username": user.screen_name,
                "name": user.name,
                "followers": user.followers_count,
                "verified": user.verified
            \\\\}
            for user in twitter_users
        ]
    except Exception as e:
        print(f"Error searching Twitter: \\\\{e\\\\}")

    return profiles

def analyze_social_connections(self, username, platform):
    """Analyze social media connections"""
    connections = []

    if platform == "twitter":
        try:
            # Get followers
            followers = self.twitter_api.get_followers(screen_name=username, count=100)

            for follower in followers:
                connections.append(\\\\{
                    "type": "follower",
                    "username": follower.screen_name,
                    "name": follower.name,
                    "relationship": "follows"
                \\\\})

        except Exception as e:
            print(f"Error analyzing Twitter connections: \\\\{e\\\\}")

    return connections

Usage

api_keys = \\{ "twitter": \\{ "consumer_key": "your_consumer_key", "consumer_secret": "your_consumer_secret", "access_token": "your_access_token", "access_token_secret": "your_access_token_secret" \\} \\}

collector = MaltegoOSINTCollector("your_api_key") social_media = SocialMediaIntegration(collector, api_keys)

Search for social profiles

profiles = social_media.search_social_profiles("John Doe") ```_

Fehlerbehebung

Gemeinsame Themen

Transform Fehler: ```bash

Check transform configuration

1. Verify API keys are correct

2. Check transform permissions

3. Validate input entity types

4. Review transform logs

Debug transform execution

1. Run transform manually

2. Check network connectivity

3. Verify data source availability

4. Review rate limiting settings

Transform timeout issues

1. Increase timeout settings

2. Optimize transform code

3. Implement proper error handling

4. Add retry mechanisms

```_

Leistungsfragen: ```bash

Optimize graph performance

1. Limit entity count per transform

2. Use filtering to reduce noise

3. Implement entity grouping

4. Regular graph cleanup

Memory management

1. Close unused graphs

2. Clear transform cache

3. Restart Maltego periodically

4. Monitor system resources

Network optimization

1. Configure proxy settings

2. Implement connection pooling

3. Use local data sources when possible

4. Optimize API usage patterns

```_

Datenqualität Ausgaben: ```bash

Validate data sources

1. Check data freshness

2. Verify source reliability

3. Cross-reference multiple sources

4. Implement data validation

Handle incomplete data

1. Implement fallback sources

2. Use data enrichment services

3. Manual verification processes

4. Document data limitations

```_

Debugging

Debugging und Protokollierung aktivieren:

```bash

Enable Maltego debugging

1. Help -> Debug Information

2. Enable verbose logging

3. Check log files in:

- Windows: %APPDATA%\Maltego\logs

- Linux: ~/.maltego/logs

- macOS: ~/Library/Application Support/Maltego/logs

Transform debugging

1. Add debug prints to transform code

2. Use transform testing framework

3. Monitor API responses

4. Check entity property validation

Network debugging

1. Use network monitoring tools

2. Check proxy configurations

3. Verify SSL certificates

4. Monitor API rate limits

```_

Sicherheitsüberlegungen

Operationelle Sicherheit

Datenschutz: - Verschlüsselung sensibler Untersuchungsdaten - Sichere Kommunikationskanäle nutzen - Implementierung richtiger Zugriffskontrollen - Regelmäßige Sicherheitsbewertungen - Sichere Entsorgung von Untersuchungsartefakten

API Sicherheit: - Sichere Speicherung der API-Schlüssel - Regelmäßige Schlüsseldrehung - Überwachen Sie API-Nutzungsmuster - Ergänzungsquotenbegrenzung - Wenigstens Privilegien verwenden

Rechtliche und ethische Überlegungen

Privacy Compliance: - Datenschutzgesetze und -vorschriften beachten - ordnungsgemäße Genehmigung für Untersuchungen - Implementierung von Datenminimierungsprinzipien - Sicheres Handling personenbezogener Daten - Regelmäßige Compliance Audits

Ethical OSINT: - Nur öffentlich zugängliche Informationen verwenden - Nutzungsbedingungen - Vermeiden Sie Sozialtechnik - Pflege professioneller Standards - Untersuchungsmethode für die Stichprobe

Referenzen

  1. [ Offizielle Website von Maitego](https://_LINK_5__
  2. Maltego Dokumentation
  3. [Maltego Transform Hub](https://LINK_5
  4. [OSINT Framework](LINK_5
  5. Open Source Intelligence Techniques