Zum Inhalt

Arpwatch Cheatsheet

- :material-content-copy: **[Kopieren auf Clipboard](https://__LINK_0_____** - :material-file-pdf-box:*[PDF herunterladen](__LINK_0____**

Überblick

Arpwatch ist ein Netzwerk-Monitoring-Tool, das den Überblick über Ethernet/IP-Adressenpaarungen auf einem Netzwerk hält. Es überwacht die ARP-Aktivität (Address Resolution Protocol) und hält eine Datenbank mit Ethernet/IP-Adressenpaarungen aufrecht. Wenn es Änderungen in der Netzwerktopologie, wie neue Stationen, geänderte Adressen oder potenzielle ARP-Spoofing-Angriffe erkennt, kann es E-Mail-Benachrichtigungen senden und die Ereignisse protokollieren.

Schlüsselmerkmale

  • ARP Monitoring: Kontinuierliche Überwachung des ARP-Verkehrs und der Adress-Mappings
  • ** Ändern der Erkennung**: Automatische Erkennung neuer Stationen und Adressänderungen
  • *Security Alerting: E-Mail-Benachrichtigungen für verdächtige ARP-Aktivitäten
  • Datenbankwartung: Dauerhafte Speicherung von Ethernet/IP-Adressenpaarungen
  • *Network Mapping: Automatische Entdeckung und Verfolgung von Netzwerkgeräten
  • *Intrusion Detection: Erkennung potenzieller ARP-Spoofing- und MAC-Adressenkonflikte
  • *Historisches Tracking: Langzeitüberwachung von Netzwerktopologieänderungen
  • Multiinterface Support: Mehrere Netzwerkschnittstellen gleichzeitig überwachen

Installation

Linux Systeme

```bash

Ubuntu/Debian

sudo apt update sudo apt install arpwatch

CentOS/RHEL/Fedora

sudo yum install arpwatch

or

sudo dnf install arpwatch

Arch Linux

sudo pacman -S arpwatch

openSUSE

sudo zypper install arpwatch

From source

wget https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz tar -xzf arpwatch-2.1a15.tar.gz cd arpwatch-2.1a15 ./configure make sudo make install

Verify installation

arpwatch -h ```_

FreeBSD/OpenBSD

```bash

FreeBSD

sudo pkg install arpwatch

OpenBSD

sudo pkg_add arpwatch

From ports (FreeBSD)

cd /usr/ports/net-mgmt/arpwatch sudo make install clean

Verify installation

which arpwatch ```_

macOS

```bash

Using Homebrew

brew install arpwatch

Using MacPorts

sudo port install arpwatch

Manual installation

curl -O https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz tar -xzf arpwatch-2.1a15.tar.gz cd arpwatch-2.1a15 ./configure --prefix=/usr/local make sudo make install

Verify installation

arpwatch -h ```_

Basisnutzung

Starten von Arpwatch

```bash

Basic startup (requires root privileges)

sudo arpwatch

Specify interface

sudo arpwatch -i eth0

Run in foreground (don't daemonize)

sudo arpwatch -d

Specify data file location

sudo arpwatch -f /var/lib/arpwatch/arp.dat

Enable email notifications

sudo arpwatch -m admin@example.com

Specify network to monitor

sudo arpwatch -n 192.168.1.0/24

Combine options

sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -m admin@example.com ```_

Konfigurationsdateien

```bash

Main configuration file locations

/etc/arpwatch.conf # Main configuration /etc/default/arpwatch # Ubuntu/Debian defaults /etc/sysconfig/arpwatch # RHEL/CentOS defaults

Data file locations

/var/lib/arpwatch/arp.dat # Default database file /var/lib/arpwatch/ethercodes.dat # Ethernet vendor codes

Log file locations

/var/log/arpwatch.log # Main log file /var/log/messages # System log (may contain arpwatch entries) ```_

Grundkonfiguration

```bash

Edit configuration file (Ubuntu/Debian)

sudo nano /etc/default/arpwatch

Example configuration

INTERFACES="eth0 eth1" ARGS="-m admin@example.com -s root" OPTIONS="-N"

Edit configuration file (RHEL/CentOS)

sudo nano /etc/sysconfig/arpwatch

Example configuration

OPTIONS="-i eth0 -m admin@example.com -f /var/lib/arpwatch/arp.dat" ```_

Erweiterte Konfiguration

Multi-Interface Monitoring

```bash

Create separate instances for multiple interfaces

sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -P /var/run/arpwatch_eth0.pid & sudo arpwatch -i eth1 -f /var/lib/arpwatch/eth1.dat -P /var/run/arpwatch_eth1.pid & sudo arpwatch -i wlan0 -f /var/lib/arpwatch/wlan0.dat -P /var/run/arpwatch_wlan0.pid &

Systemd service for multiple interfaces (create separate service files)

sudo cp /lib/systemd/system/arpwatch.service /lib/systemd/system/arpwatch@.service

Edit the template service file

sudo nano /lib/systemd/system/arpwatch@.service

Example template content:

[Unit] Description=Arpwatch daemon for %i After=network.target

[Service] Type=forking ExecStart=/usr/sbin/arpwatch -i %i -f /var/lib/arpwatch/%i.dat -P /var/run/arpwatch_%i.pid PIDFile=/var/run/arpwatch_%i.pid User=arpwatch Group=arpwatch

[Install] WantedBy=multi-user.target

Enable services for specific interfaces

sudo systemctl enable arpwatch@eth0.service sudo systemctl enable arpwatch@eth1.service sudo systemctl start arpwatch@eth0.service sudo systemctl start arpwatch@eth1.service ```_

E-Mail-Benachrichtigung Setup

```bash

Configure email notifications

sudo nano /etc/arpwatch.conf

Example email configuration

MAILTO="admin@example.com,security@example.com" MAILSERVER="localhost" MAILFROM="arpwatch@example.com"

Test email functionality

echo "Test message" | mail -s "Arpwatch Test" admin@example.com

Configure postfix for email delivery (if needed)

sudo apt install postfix sudo systemctl enable postfix sudo systemctl start postfix

Configure email templates

sudo mkdir -p /etc/arpwatch/templates sudo nano /etc/arpwatch/templates/new_station.txt

Example template:

Subject: New station detected From: arpwatch@example.com

New station detected on network: IP Address: %ip% MAC Address: %mac% Interface: %interface% Timestamp: %timestamp% Vendor: %vendor% ```_

Benutzerdefinierte Alert Scripts

```bash

!/bin/bash

/usr/local/bin/arpwatch_alert.sh

Custom alert script for arpwatch

LOG_FILE="/var/log/arpwatch_custom.log" ALERT_EMAIL="security@example.com" WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"

Parse arpwatch input

while read line; do TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')

# Log the event
echo "[$TIMESTAMP] $line" >> "$LOG_FILE"

# Parse the alert type and details
if echo "$line" | grep -q "new station"; then
    ALERT_TYPE="NEW_STATION"
    IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
    MAC=$(echo "$line" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

    # Send email alert
    echo "New station detected: IP=$IP, MAC=$MAC" | \
        mail -s "Arpwatch Alert: New Station" "$ALERT_EMAIL"

    # Send Slack notification
    curl -X POST -H 'Content-type: application/json' \
        --data "{\"text\":\"🚨 Arpwatch Alert: New station detected\\nIP: $IP\\nMAC: $MAC\\nTime: $TIMESTAMP\"}" \
        "$WEBHOOK_URL"

elif echo "$line" | grep -q "changed ethernet address"; then
    ALERT_TYPE="MAC_CHANGE"
    IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')

    # High priority alert for MAC changes
    echo "SECURITY ALERT: MAC address change detected for IP $IP" | \
        mail -s "URGENT: Arpwatch Security Alert" "$ALERT_EMAIL"

    # Send Slack notification
    curl -X POST -H 'Content-type: application/json' \
        --data "{\"text\":\"🔥 URGENT: MAC address change detected\\nIP: $IP\\nTime: $TIMESTAMP\"}" \
        "$WEBHOOK_URL"

elif echo "$line" | grep -q "flip flop"; then
    ALERT_TYPE="FLIP_FLOP"
    IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')

    # Potential ARP spoofing alert
    echo "SECURITY ALERT: Potential ARP spoofing detected for IP $IP" | \
        mail -s "CRITICAL: Potential ARP Spoofing" "$ALERT_EMAIL"
fi

done

Make script executable

sudo chmod +x /usr/local/bin/arpwatch_alert.sh

Configure arpwatch to use custom script

sudo arpwatch -i eth0 -e /usr/local/bin/arpwatch_alert.sh ```_

Datenbankverwaltung

Datenbanken

```bash

View current ARP database

sudo cat /var/lib/arpwatch/arp.dat

Convert database to readable format

sudo awk '{print $1, $2, $3}' /var/lib/arpwatch/arp.dat

Backup database

sudo cp /var/lib/arpwatch/arp.dat /var/lib/arpwatch/arp.dat.backup.$(date +%Y%m%d)

Restore database

sudo cp /var/lib/arpwatch/arp.dat.backup.20231201 /var/lib/arpwatch/arp.dat

Clear database (start fresh)

sudo systemctl stop arpwatch sudo rm /var/lib/arpwatch/arp.dat sudo systemctl start arpwatch

Merge multiple database files

sudo cat /var/lib/arpwatch/eth0.dat /var/lib/arpwatch/eth1.dat | \ sort -u > /var/lib/arpwatch/merged.dat ```_

Datenbankanalyse Scripts

```python

!/usr/bin/env python3

arpwatch_analyzer.py

import sys import time import socket import struct from collections import defaultdict, Counter from datetime import datetime, timedelta import argparse

class ArpwatchAnalyzer: def init(self, database_file): self.database_file = database_file self.entries = [] self.load_database()

def load_database(self):
    """Load and parse arpwatch database"""
    try:
        with open(self.database_file, 'rb') as f:
            while True:
                # Read arpwatch database entry (binary format)
                data = f.read(16)  # Each entry is 16 bytes
                if len(data) < 16:
                    break

                # Parse binary data
                ip_bytes = data[0:4]
                mac_bytes = data[4:10]
                timestamp_bytes = data[10:14]

                # Convert to readable format
                ip = socket.inet_ntoa(ip_bytes)
                mac = ':'.join([f'{b:02x}' for b in mac_bytes])
                timestamp = struct.unpack('>I', timestamp_bytes)[0]

                self.entries.append({
                    'ip': ip,
                    'mac': mac,
                    'timestamp': timestamp,
                    'datetime': datetime.fromtimestamp(timestamp)
                })

    except FileNotFoundError:
        print(f"Database file not found: {self.database_file}")
        sys.exit(1)
    except Exception as e:
        print(f"Error reading database: {e}")
        sys.exit(1)

def get_statistics(self):
    """Generate database statistics"""
    if not self.entries:
        return "No entries in database"

    total_entries = len(self.entries)
    unique_ips = len(set(entry['ip'] for entry in self.entries))
    unique_macs = len(set(entry['mac'] for entry in self.entries))

    # Time range
    timestamps = [entry['timestamp'] for entry in self.entries]
    oldest = datetime.fromtimestamp(min(timestamps))
    newest = datetime.fromtimestamp(max(timestamps))

    # Most active IPs
    ip_counts = Counter(entry['ip'] for entry in self.entries)
    most_active_ips = ip_counts.most_common(5)

    # Most active MACs
    mac_counts = Counter(entry['mac'] for entry in self.entries)
    most_active_macs = mac_counts.most_common(5)

    stats = f"""

Arpwatch Database Statistics

Total entries: {total_entries} Unique IP addresses: {unique_ips} Unique MAC addresses: {unique_macs} Date range: {oldest} to {newest}

Most Active IP Addresses: """ for ip, count in most_active_ips: stats += f" {ip}: {count} entries\n"

    stats += "\nMost Active MAC Addresses:\n"
    for mac, count in most_active_macs:
        vendor = self.get_vendor(mac)
        stats += f"  {mac} ({vendor}): {count} entries\n"

    return stats

def find_suspicious_activity(self):
    """Detect potentially suspicious ARP activity"""
    suspicious = []

    # Group by IP address
    ip_groups = defaultdict(list)
    for entry in self.entries:
        ip_groups[entry['ip']].append(entry)

    # Check for IP addresses with multiple MAC addresses
    for ip, entries in ip_groups.items():
        macs = set(entry['mac'] for entry in entries)
        if len(macs) > 1:
            suspicious.append({
                'type': 'IP_MULTIPLE_MACS',
                'ip': ip,
                'macs': list(macs),
                'count': len(entries),
                'description': f"IP {ip} associated with {len(macs)} different MAC addresses"
            })

    # Group by MAC address
    mac_groups = defaultdict(list)
    for entry in self.entries:
        mac_groups[entry['mac']].append(entry)

    # Check for MAC addresses with multiple IP addresses
    for mac, entries in mac_groups.items():
        ips = set(entry['ip'] for entry in entries)
        if len(ips) > 3:  # Threshold for suspicious activity
            vendor = self.get_vendor(mac)
            suspicious.append({
                'type': 'MAC_MULTIPLE_IPS',
                'mac': mac,
                'vendor': vendor,
                'ips': list(ips),
                'count': len(entries),
                'description': f"MAC {mac} ({vendor}) associated with {len(ips)} different IP addresses"
            })

    # Check for rapid changes (potential ARP spoofing)
    for ip, entries in ip_groups.items():
        if len(entries) > 1:
            sorted_entries = sorted(entries, key=lambda x: x['timestamp'])
            for i in range(1, len(sorted_entries)):
                time_diff = sorted_entries[i]['timestamp'] - sorted_entries[i-1]['timestamp']
                if time_diff < 300 and sorted_entries[i]['mac'] != sorted_entries[i-1]['mac']:  # 5 minutes
                    suspicious.append({
                        'type': 'RAPID_MAC_CHANGE',
                        'ip': ip,
                        'old_mac': sorted_entries[i-1]['mac'],
                        'new_mac': sorted_entries[i]['mac'],
                        'time_diff': time_diff,
                        'description': f"Rapid MAC change for IP {ip} ({time_diff}s between changes)"
                    })

    return suspicious

def get_vendor(self, mac):
    """Get vendor information from MAC address"""
    # Simple vendor lookup (in practice, you'd use a full OUI database)
    oui_map = {
        '00:50:56': 'VMware',
        '08:00:27': 'VirtualBox',
        '52:54:00': 'QEMU',
        '00:0c:29': 'VMware',
        '00:1b:21': 'Intel',
        '00:1e:68': 'Cisco',
        '00:23:69': 'Cisco',
        '00:d0:c9': 'Intel',
        'b8:27:eb': 'Raspberry Pi',
        'dc:a6:32': 'Raspberry Pi'
    }

    oui = mac[:8].upper()
    return oui_map.get(oui, 'Unknown')

def generate_report(self, output_file=None):
    """Generate comprehensive analysis report"""
    report = f"""

Arpwatch Database Analysis Report

Generated: {datetime.now()} Database: {self.database_file}

{self.get_statistics()}

Suspicious Activity Analysis

"""

    suspicious = self.find_suspicious_activity()
    if suspicious:
        for item in suspicious:
            report += f"\n⚠️  {item['type']}: {item['description']}\n"
            if 'macs' in item:
                report += f"   MAC addresses: {', '.join(item['macs'])}\n"
            if 'ips' in item:
                report += f"   IP addresses: {', '.join(item['ips'])}\n"
            if 'time_diff' in item:
                report += f"   Time difference: {item['time_diff']} seconds\n"
    else:
        report += "\nNo suspicious activity detected.\n"

    # Recent activity (last 24 hours)
    recent_threshold = time.time() - 86400  # 24 hours
    recent_entries = [e for e in self.entries if e['timestamp'] > recent_threshold]

    report += f"\nRecent Activity (Last 24 Hours)\n"
    report += f"===============================\n"
    report += f"Recent entries: {len(recent_entries)}\n"

    if recent_entries:
        report += "\nRecent IP/MAC pairs:\n"
        for entry in sorted(recent_entries, key=lambda x: x['timestamp'], reverse=True)[:10]:
            vendor = self.get_vendor(entry['mac'])
            report += f"  {entry['datetime']}: {entry['ip']} -> {entry['mac']} ({vendor})\n"

    if output_file:
        with open(output_file, 'w') as f:
            f.write(report)
        print(f"Report saved to: {output_file}")
    else:
        print(report)

def main(): parser = argparse.ArgumentParser(description='Arpwatch Database Analyzer') parser.add_argument('database', help='Path to arpwatch database file') parser.add_argument('--output', '-o', help='Output file for report') parser.add_argument('--stats-only', action='store_true', help='Show only statistics')

args = parser.parse_args()

analyzer = ArpwatchAnalyzer(args.database)

if args.stats_only:
    print(analyzer.get_statistics())
else:
    analyzer.generate_report(args.output)

if name == "main": main() ```_

Überwachung und Alarmierung

Echtzeit Monitoring Script

```bash

!/bin/bash

arpwatch_monitor.sh

ARPWATCH_LOG="/var/log/arpwatch.log" ALERT_EMAIL="admin@example.com" WHITELIST_FILE="/etc/arpwatch/whitelist.txt" ALERT_LOG="/var/log/arpwatch_alerts.log"

Create whitelist if it doesn't exist

if [ ! -f "$WHITELIST_FILE" ]; then sudo touch "$WHITELIST_FILE" echo "# Arpwatch whitelist - one MAC address per line" | sudo tee "$WHITELIST_FILE" fi

Function to check if MAC is whitelisted

is_whitelisted() { local mac="$1" grep -q "^$mac$" "$WHITELIST_FILE" }

Function to send alert

send_alert() { local alert_type="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

# Log alert
echo "[$timestamp] $alert_type: $message" >> "$ALERT_LOG"

# Send email
echo "$message" | mail -s "Arpwatch Alert: $alert_type" "$ALERT_EMAIL"

# Send to syslog
logger -t arpwatch_monitor "$alert_type: $message"

echo "Alert sent: $alert_type"

}

Function to analyze log entry

analyze_entry() { local entry="$1"

if echo "$entry" | grep -q "new station"; then
    IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
    MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

    if ! is_whitelisted "$MAC"; then
        send_alert "NEW_STATION" "New station detected: IP=$IP, MAC=$MAC"
    fi

elif echo "$entry" | grep -q "changed ethernet address"; then
    IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
    OLD_MAC=$(echo "$entry" | sed -n 's/.*from \([0-9a-f:]\{17\}\).*/\1/p')
    NEW_MAC=$(echo "$entry" | sed -n 's/.*to \([0-9a-f:]\{17\}\).*/\1/p')

    send_alert "MAC_CHANGE" "MAC address changed: IP=$IP, Old MAC=$OLD_MAC, New MAC=$NEW_MAC"

elif echo "$entry" | grep -q "flip flop"; then
    IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
    MAC1=$(echo "$entry" | sed -n 's/.*between \([0-9a-f:]\{17\}\).*/\1/p')
    MAC2=$(echo "$entry" | sed -n 's/.*and \([0-9a-f:]\{17\}\).*/\1/p')

    send_alert "FLIP_FLOP" "Flip flop detected (potential ARP spoofing): IP=$IP, MAC1=$MAC1, MAC2=$MAC2"

elif echo "$entry" | grep -q "reused old ethernet address"; then
    IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
    MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

    send_alert "REUSED_ADDRESS" "Reused ethernet address: IP=$IP, MAC=$MAC"
fi

}

echo "Starting Arpwatch monitor..." echo "Monitoring log: $ARPWATCH_LOG" echo "Alert email: $ALERT_EMAIL" echo "Whitelist file: $WHITELIST_FILE" echo ""

Monitor arpwatch log file

tail -F "$ARPWATCH_LOG" | while read line; do # Skip empty lines if [ -n "$line" ]; then analyze_entry "$line" fi done ```_

Netzwerk Baseline Creation

```python

!/usr/bin/env python3

create_network_baseline.py

import subprocess import json import time from datetime import datetime, timedelta from collections import defaultdict import argparse

class NetworkBaseline: def init(self, interface='eth0', duration_hours=24): self.interface = interface self.duration_hours = duration_hours self.baseline_data = { 'created': datetime.now().isoformat(), 'interface': interface, 'duration_hours': duration_hours, 'devices': {}, 'statistics': {} }

def scan_network(self):
    """Perform network scan to discover devices"""
    print(f"Scanning network on interface {self.interface}...")

    # Get network range
    try:
        # Get IP and netmask of interface
        result = subprocess.run(['ip', 'addr', 'show', self.interface], 
                              capture_output=True, text=True)

        # Parse network range (simplified)
        for line in result.stdout.split('\n'):
            if 'inet ' in line and not '127.0.0.1' in line:
                ip_info = line.strip().split()[1]
                network = ip_info.split('/')[0]
                # Assume /24 network for simplicity
                network_base = '.'.join(network.split('.')[:-1])
                network_range = f"{network_base}.0/24"
                break
        else:
            print(f"Could not determine network range for {self.interface}")
            return

        print(f"Detected network range: {network_range}")

        # Use nmap to scan network
        nmap_cmd = ['nmap', '-sn', network_range]
        result = subprocess.run(nmap_cmd, capture_output=True, text=True)

        # Parse nmap output
        current_ip = None
        for line in result.stdout.split('\n'):
            if 'Nmap scan report for' in line:
                current_ip = line.split()[-1].strip('()')
            elif 'MAC Address:' in line and current_ip:
                mac_info = line.split('MAC Address: ')[1]
                mac = mac_info.split()[0]
                vendor = mac_info.split('(')[1].rstrip(')') if '(' in mac_info else 'Unknown'

                self.baseline_data['devices'][current_ip] = {
                    'mac': mac,
                    'vendor': vendor,
                    'first_seen': datetime.now().isoformat(),
                    'last_seen': datetime.now().isoformat(),
                    'status': 'active'
                }

                print(f"Found device: {current_ip} -> {mac} ({vendor})")
                current_ip = None

    except Exception as e:
        print(f"Error scanning network: {e}")

def monitor_arp_activity(self):
    """Monitor ARP activity for specified duration"""
    print(f"Monitoring ARP activity for {self.duration_hours} hours...")

    end_time = time.time() + (self.duration_hours * 3600)

    # Start tcpdump to capture ARP packets
    tcpdump_cmd = ['tcpdump', '-i', self.interface, '-n', 'arp']

    try:
        process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE, 
                                 stderr=subprocess.PIPE, text=True)

        while time.time() < end_time:
            line = process.stdout.readline()
            if line:
                self.parse_arp_packet(line.strip())

            # Check if process is still running
            if process.poll() is not None:
                break

        process.terminate()

    except KeyboardInterrupt:
        print("\nMonitoring interrupted by user")
        if 'process' in locals():
            process.terminate()
    except Exception as e:
        print(f"Error monitoring ARP activity: {e}")

def parse_arp_packet(self, packet_line):
    """Parse ARP packet from tcpdump output"""
    try:
        # Example: "12:34:56.789012 ARP, Request who-has 192.168.1.1 tell 192.168.1.100, length 28"
        # Example: "12:34:56.789012 ARP, Reply 192.168.1.1 is-at aa:bb:cc:dd:ee:ff, length 28"

        if 'Reply' in packet_line and 'is-at' in packet_line:
            parts = packet_line.split()
            ip = None
            mac = None

            for i, part in enumerate(parts):
                if part == 'Reply':
                    ip = parts[i+1]
                elif part == 'is-at':
                    mac = parts[i+1].rstrip(',')

            if ip and mac:
                self.update_device_info(ip, mac)

    except Exception as e:
        print(f"Error parsing ARP packet: {e}")

def update_device_info(self, ip, mac):
    """Update device information in baseline"""
    current_time = datetime.now().isoformat()

    if ip in self.baseline_data['devices']:
        # Update existing device
        device = self.baseline_data['devices'][ip]
        device['last_seen'] = current_time

        # Check for MAC address change
        if device['mac'] != mac:
            print(f"MAC change detected: {ip} changed from {device['mac']} to {mac}")
            device['mac_history'] = device.get('mac_history', [])
            device['mac_history'].append({
                'old_mac': device['mac'],
                'new_mac': mac,
                'timestamp': current_time
            })
            device['mac'] = mac
    else:
        # New device discovered
        print(f"New device discovered: {ip} -> {mac}")
        self.baseline_data['devices'][ip] = {
            'mac': mac,
            'vendor': self.get_vendor(mac),
            'first_seen': current_time,
            'last_seen': current_time,
            'status': 'active'
        }

def get_vendor(self, mac):
    """Get vendor information from MAC address"""
    # Simplified vendor lookup
    oui_map = {
        '00:50:56': 'VMware',
        '08:00:27': 'VirtualBox',
        '52:54:00': 'QEMU',
        '00:0c:29': 'VMware',
        '00:1b:21': 'Intel',
        '00:1e:68': 'Cisco',
        '00:23:69': 'Cisco',
        '00:d0:c9': 'Intel',
        'b8:27:eb': 'Raspberry Pi',
        'dc:a6:32': 'Raspberry Pi'
    }

    oui = mac[:8].upper()
    return oui_map.get(oui, 'Unknown')

def generate_statistics(self):
    """Generate baseline statistics"""
    devices = self.baseline_data['devices']

    self.baseline_data['statistics'] = {
        'total_devices': len(devices),
        'active_devices': len([d for d in devices.values() if d['status'] == 'active']),
        'vendors': {},
        'mac_changes': 0
    }

    # Count vendors
    vendor_count = defaultdict(int)
    for device in devices.values():
        vendor_count[device['vendor']] += 1
        if 'mac_history' in device:
            self.baseline_data['statistics']['mac_changes'] += len(device['mac_history'])

    self.baseline_data['statistics']['vendors'] = dict(vendor_count)

def save_baseline(self, filename):
    """Save baseline to file"""
    self.generate_statistics()

    with open(filename, 'w') as f:
        json.dump(self.baseline_data, f, indent=2)

    print(f"Baseline saved to: {filename}")

def create_baseline(self, output_file):
    """Create complete network baseline"""
    print("Creating network baseline...")
    print("=" * 40)

    # Initial network scan
    self.scan_network()

    # Monitor ARP activity
    if self.duration_hours > 0:
        self.monitor_arp_activity()

    # Save baseline
    self.save_baseline(output_file)

    # Display summary
    self.display_summary()

def display_summary(self):
    """Display baseline summary"""
    stats = self.baseline_data['statistics']

    print("\nBaseline Summary:")
    print("=" * 20)
    print(f"Total devices discovered: {stats['total_devices']}")
    print(f"Active devices: {stats['active_devices']}")
    print(f"MAC address changes detected: {stats['mac_changes']}")

    print("\nVendor distribution:")
    for vendor, count in stats['vendors'].items():
        print(f"  {vendor}: {count} devices")

    print(f"\nBaseline creation completed for interface: {self.baseline_data['interface']}")
    print(f"Monitoring duration: {self.baseline_data['duration_hours']} hours")

def main(): parser = argparse.ArgumentParser(description='Create network baseline for arpwatch') parser.add_argument('--interface', '-i', default='eth0', help='Network interface to monitor') parser.add_argument('--duration', '-d', type=int, default=1, help='Monitoring duration in hours') parser.add_argument('--output', '-o', default='network_baseline.json', help='Output file for baseline') parser.add_argument('--scan-only', action='store_true', help='Only perform initial scan, no monitoring')

args = parser.parse_args()

if args.scan_only:
    duration = 0
else:
    duration = args.duration

baseline = NetworkBaseline(args.interface, duration)
baseline.create_baseline(args.output)

if name == "main": main() ```_

Sicherheitsanwendungen

ARP Spoofing Detection

```bash

!/bin/bash

arp_spoofing_detector.sh

INTERFACE="eth0" THRESHOLD_SECONDS=60 # Alert if MAC changes within 60 seconds LOG_FILE="/var/log/arp_spoofing.log" ALERT_EMAIL="security@example.com"

Temporary files for tracking

TEMP_DIR="/tmp/arp_detector" mkdir -p "$TEMP_DIR"

Function to log events

log_event() { local level="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

echo "[$timestamp] [$level] $message" | tee -a "$LOG_FILE"

if [ "$level" = "ALERT" ]; then
    echo "$message" | mail -s "ARP Spoofing Alert" "$ALERT_EMAIL"
    logger -t arp_spoofing_detector "ALERT: $message"
fi

}

Function to check for rapid MAC changes

check_rapid_changes() { local ip="$1" local mac="$2" local current_time=$(date +%s)

local tracking_file="$TEMP_DIR/ip_$ip"

if [ -f "$tracking_file" ]; then
    local last_entry=$(tail -1 "$tracking_file")
    local last_time=$(echo "$last_entry" | cut -d: -f1)
    local last_mac=$(echo "$last_entry" | cut -d: -f2)

    local time_diff=$((current_time - last_time))

    if [ "$time_diff" -lt "$THRESHOLD_SECONDS" ] && [ "$mac" != "$last_mac" ]; then
        log_event "ALERT" "Rapid MAC change detected for IP $ip: $last_mac -> $mac (${time_diff}s)"

        # Check if this looks like ARP spoofing
        if [ "$time_diff" -lt 10 ]; then
            log_event "ALERT" "POSSIBLE ARP SPOOFING: Very rapid MAC change for IP $ip"

            # Additional checks
            check_duplicate_mac "$mac" "$ip"
        fi
    fi
fi

# Update tracking file
echo "$current_time:$mac" >> "$tracking_file"

# Keep only last 10 entries
tail -10 "$tracking_file" > "$tracking_file.tmp"
mv "$tracking_file.tmp" "$tracking_file"

}

Function to check for duplicate MAC addresses

check_duplicate_mac() { local mac="$1" local current_ip="$2"

# Check if this MAC is associated with other IPs
for tracking_file in "$TEMP_DIR"/ip_*; do
    if [ -f "$tracking_file" ]; then
        local file_ip=$(basename "$tracking_file" | sed 's/ip_//')

        if [ "$file_ip" != "$current_ip" ]; then
            local last_entry=$(tail -1 "$tracking_file")
            local last_mac=$(echo "$last_entry" | cut -d: -f2)

            if [ "$last_mac" = "$mac" ]; then
                log_event "ALERT" "MAC address conflict: $mac is associated with both $current_ip and $file_ip"
            fi
        fi
    fi
done

}

Function to analyze ARP table

analyze_arp_table() { log_event "INFO" "Analyzing current ARP table"

# Get current ARP table
arp -a | while read line; do
    if echo "$line" | grep -q "at"; then
        local hostname=$(echo "$line" | cut -d' ' -f1)
        local ip=$(echo "$line" | sed 's/.*(\([^)]*\)).*/\1/')
        local mac=$(echo "$line" | awk '{print $4}')

        if [ -n "$ip" ] && [ -n "$mac" ]; then
            check_rapid_changes "$ip" "$mac"
        fi
    fi
done

}

Function to monitor ARP traffic

monitor_arp_traffic() { log_event "INFO" "Starting ARP traffic monitoring on interface $INTERFACE"

# Use tcpdump to monitor ARP traffic
tcpdump -i "$INTERFACE" -n arp | while read line; do
    if echo "$line" | grep -q "Reply.*is-at"; then
        local ip=$(echo "$line" | sed -n 's/.*Reply \([0-9.]*\) is-at.*/\1/p')
        local mac=$(echo "$line" | sed -n 's/.*is-at \([0-9a-f:]*\).*/\1/p')

        if [ -n "$ip" ] && [ -n "$mac" ]; then
            log_event "DEBUG" "ARP Reply: $ip -> $mac"
            check_rapid_changes "$ip" "$mac"
        fi
    fi
done

}

Cleanup function

cleanup() { log_event "INFO" "Shutting down ARP spoofing detector" rm -rf "$TEMP_DIR" exit 0 }

Set up signal handlers

trap cleanup SIGINT SIGTERM

log_event "INFO" "Starting ARP spoofing detector" log_event "INFO" "Interface: $INTERFACE" log_event "INFO" "Threshold: $THRESHOLD_SECONDS seconds" log_event "INFO" "Log file: $LOG_FILE"

Initial ARP table analysis

analyze_arp_table

Start monitoring

monitor_arp_traffic ```_

Netzwerk Sicherheit Dashboard

```python

!/usr/bin/env python3

arpwatch_dashboard.py

import json import time import subprocess from datetime import datetime, timedelta from flask import Flask, render_template, jsonify import sqlite3 import threading

app = Flask(name)

class ArpwatchDashboard: def init(self, db_file='arpwatch_dashboard.db'): self.db_file = db_file self.init_database() self.running = True

def init_database(self):
    """Initialize SQLite database for dashboard data"""
    conn = sqlite3.connect(self.db_file)
    cursor = conn.cursor()

    # Create tables
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS devices (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            ip TEXT NOT NULL,
            mac TEXT NOT NULL,
            vendor TEXT,
            first_seen TIMESTAMP,
            last_seen TIMESTAMP,
            status TEXT DEFAULT 'active'
        )
    ''')

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS events (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TIMESTAMP,
            event_type TEXT,
            ip TEXT,
            mac TEXT,
            old_mac TEXT,
            description TEXT
        )
    ''')

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS alerts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TIMESTAMP,
            alert_type TEXT,
            severity TEXT,
            ip TEXT,
            mac TEXT,
            description TEXT,
            acknowledged BOOLEAN DEFAULT FALSE
        )
    ''')

    conn.commit()
    conn.close()

def monitor_arpwatch_log(self, log_file='/var/log/arpwatch.log'):
    """Monitor arpwatch log file for new entries"""
    try:
        # Follow log file
        process = subprocess.Popen(['tail', '-F', log_file], 
                                 stdout=subprocess.PIPE, 
                                 stderr=subprocess.PIPE, 
                                 text=True)

        while self.running:
            line = process.stdout.readline()
            if line:
                self.process_log_entry(line.strip())

            if process.poll() is not None:
                break

        process.terminate()

    except Exception as e:
        print(f"Error monitoring log: {e}")

def process_log_entry(self, log_entry):
    """Process arpwatch log entry and update database"""
    conn = sqlite3.connect(self.db_file)
    cursor = conn.cursor()

    timestamp = datetime.now()

    try:
        if 'new station' in log_entry:
            # Extract IP and MAC
            import re
            ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
            mac_match = re.search(r'([0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2})', log_entry)

            if ip_match and mac_match:
                ip = ip_match.group(1)
                mac = mac_match.group(1)
                vendor = self.get_vendor(mac)

                # Insert or update device
                cursor.execute('''
                    INSERT OR REPLACE INTO devices 
                    (ip, mac, vendor, first_seen, last_seen, status)
                    VALUES (?, ?, ?, ?, ?, 'active')
                ''', (ip, mac, vendor, timestamp, timestamp))

                # Add event
                cursor.execute('''
                    INSERT INTO events 
                    (timestamp, event_type, ip, mac, description)
                    VALUES (?, 'new_station', ?, ?, ?)
                ''', (timestamp, ip, mac, f'New station: {ip} -> {mac}'))

                # Add alert for new devices
                cursor.execute('''
                    INSERT INTO alerts 
                    (timestamp, alert_type, severity, ip, mac, description)
                    VALUES (?, 'new_device', 'info', ?, ?, ?)
                ''', (timestamp, ip, mac, f'New device detected: {ip} ({vendor})'))

        elif 'changed ethernet address' in log_entry:
            # MAC address change
            import re
            ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
            old_mac_match = re.search(r'from ([0-9a-f:]{17})', log_entry)
            new_mac_match = re.search(r'to ([0-9a-f:]{17})', log_entry)

            if ip_match and old_mac_match and new_mac_match:
                ip = ip_match.group(1)
                old_mac = old_mac_match.group(1)
                new_mac = new_mac_match.group(1)
                vendor = self.get_vendor(new_mac)

                # Update device
                cursor.execute('''
                    UPDATE devices 
                    SET mac = ?, vendor = ?, last_seen = ?
                    WHERE ip = ?
                ''', (new_mac, vendor, timestamp, ip))

                # Add event
                cursor.execute('''
                    INSERT INTO events 
                    (timestamp, event_type, ip, mac, old_mac, description)
                    VALUES (?, 'mac_change', ?, ?, ?, ?)
                ''', (timestamp, ip, new_mac, old_mac, f'MAC changed: {ip} from {old_mac} to {new_mac}'))

                # Add high-priority alert
                cursor.execute('''
                    INSERT INTO alerts 
                    (timestamp, alert_type, severity, ip, mac, description)
                    VALUES (?, 'mac_change', 'warning', ?, ?, ?)
                ''', (timestamp, ip, new_mac, f'MAC address changed for {ip}'))

        elif 'flip flop' in log_entry:
            # Potential ARP spoofing
            import re
            ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)

            if ip_match:
                ip = ip_match.group(1)

                # Add critical alert
                cursor.execute('''
                    INSERT INTO alerts 
                    (timestamp, alert_type, severity, ip, description)
                    VALUES (?, 'flip_flop', 'critical', ?, ?)
                ''', (timestamp, ip, f'Flip flop detected for {ip} - potential ARP spoofing'))

        conn.commit()

    except Exception as e:
        print(f"Error processing log entry: {e}")
    finally:
        conn.close()

def get_vendor(self, mac):
    """Get vendor from MAC address"""
    oui_map = {
        '00:50:56': 'VMware',
        '08:00:27': 'VirtualBox',
        '52:54:00': 'QEMU',
        '00:0c:29': 'VMware',
        '00:1b:21': 'Intel',
        '00:1e:68': 'Cisco',
        '00:23:69': 'Cisco',
        '00:d0:c9': 'Intel',
        'b8:27:eb': 'Raspberry Pi',
        'dc:a6:32': 'Raspberry Pi'
    }

    oui = mac[:8].upper()
    return oui_map.get(oui, 'Unknown')

Global dashboard instance

dashboard = ArpwatchDashboard()

@app.route('/') def index(): """Main dashboard page""" return render_template('dashboard.html')

@app.route('/api/devices') def api_devices(): """API endpoint for device list""" conn = sqlite3.connect(dashboard.db_file) cursor = conn.cursor()

cursor.execute('''
    SELECT ip, mac, vendor, first_seen, last_seen, status
    FROM devices
    ORDER BY last_seen DESC
''')

devices = []
for row in cursor.fetchall():
    devices.append({
        'ip': row[0],
        'mac': row[1],
        'vendor': row[2],
        'first_seen': row[3],
        'last_seen': row[4],
        'status': row[5]
    })

conn.close()
return jsonify(devices)

@app.route('/api/alerts') def api_alerts(): """API endpoint for alerts""" conn = sqlite3.connect(dashboard.db_file) cursor = conn.cursor()

cursor.execute('''
    SELECT timestamp, alert_type, severity, ip, mac, description, acknowledged
    FROM alerts
    ORDER BY timestamp DESC
    LIMIT 50
''')

alerts = []
for row in cursor.fetchall():
    alerts.append({
        'timestamp': row[0],
        'alert_type': row[1],
        'severity': row[2],
        'ip': row[3],
        'mac': row[4],
        'description': row[5],
        'acknowledged': row[6]
    })

conn.close()
return jsonify(alerts)

@app.route('/api/statistics') def api_statistics(): """API endpoint for statistics""" conn = sqlite3.connect(dashboard.db_file) cursor = conn.cursor()

# Get device count
cursor.execute('SELECT COUNT(*) FROM devices WHERE status = "active"')
active_devices = cursor.fetchone()[0]

# Get alert counts
cursor.execute('SELECT COUNT(*) FROM alerts WHERE acknowledged = FALSE')
unacknowledged_alerts = cursor.fetchone()[0]

# Get recent events count
yesterday = datetime.now() - timedelta(days=1)
cursor.execute('SELECT COUNT(*) FROM events WHERE timestamp > ?', (yesterday,))
recent_events = cursor.fetchone()[0]

# Get vendor distribution
cursor.execute('''
    SELECT vendor, COUNT(*) as count
    FROM devices
    WHERE status = "active"
    GROUP BY vendor
    ORDER BY count DESC
''')

vendor_distribution = []
for row in cursor.fetchall():
    vendor_distribution.append({
        'vendor': row[0],
        'count': row[1]
    })

conn.close()

return jsonify({
    'active_devices': active_devices,
    'unacknowledged_alerts': unacknowledged_alerts,
    'recent_events': recent_events,
    'vendor_distribution': vendor_distribution
})

def start_monitoring(): """Start monitoring in background thread""" monitor_thread = threading.Thread(target=dashboard.monitor_arpwatch_log) monitor_thread.daemon = True monitor_thread.start()

if name == 'main': # Start monitoring start_monitoring()

# Start web server
app.run(host='0.0.0.0', port=5000, debug=False)

```_

Dieses umfassende Arpwatch-Catsheet bietet alles, was für eine professionelle Netzwerküberwachung, Sicherheitsanalyse und Intrusionserkennung erforderlich ist, von der grundlegenden ARP-Überwachung bis zur fortschrittlichen Security-Dashboard-Umsetzung und der automatisierten Bedrohungserkennung.