Saltar a contenido

Arpwatch Cheatsheet

■/div titulada

Sinopsis

Arpwatch es una herramienta de monitoreo de la red que mantiene un seguimiento de los pares de direcciones Ethernet/IP en una red. Supervisa la actividad ARP (Protocolo de Resolución de Adress) y mantiene una base de datos de pares de direcciones Ethernet/IP. Cuando detecta cambios en la topología de la red, tales como nuevas estaciones, direcciones modificadas o posibles ataques de prueba ARP, puede enviar notificaciones de correo electrónico y registrar los eventos.

Características clave

  • ARP Monitoring: Supervisión continua del tráfico de ARP y asignación de mapas
  • Detección de cambios: Detección automática de nuevas estaciones y cambios de dirección
  • ** Alerta de seguridad**: notificaciones de correo electrónico para actividad sospechosa de ARP
  • Mantenimiento de bases de datos: Almacenamiento persistente de pares de direcciones Ethernet/IP
  • ** Mapping de red**: descubrimiento y seguimiento automáticos de dispositivos de red
  • Detección de la intrusión: Detección de la posible lucha contra los ARP y solución de conflictos
  • ** Seguimiento histórico**: Monitoreo a largo plazo de cambios en topología de red
  • Multi-interface Support: Monitorización de múltiples interfaces de red simultáneamente

Instalación

Linux Systems

# Ubuntu/Debian
sudo apt update
sudo apt install arpwatch

# CentOS/RHEL/Fedora
sudo yum install arpwatch
# or
sudo dnf install arpwatch

# Arch Linux
sudo pacman -S arpwatch

# openSUSE
sudo zypper install arpwatch

# From source
wget https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure
make
sudo make install

# Verify installation
arpwatch -h

FreeBSD/OpenBSD

# FreeBSD
sudo pkg install arpwatch

# OpenBSD
sudo pkg_add arpwatch

# From ports (FreeBSD)
cd /usr/ports/net-mgmt/arpwatch
sudo make install clean

# Verify installation
which arpwatch

macOS

# Using Homebrew
brew install arpwatch

# Using MacPorts
sudo port install arpwatch

# Manual installation
curl -O https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure --prefix=/usr/local
make
sudo make install

# Verify installation
arpwatch -h

Uso básico

Inicio del reloj Arp

# Basic startup (requires root privileges)
sudo arpwatch

# Specify interface
sudo arpwatch -i eth0

# Run in foreground (don't daemonize)
sudo arpwatch -d

# Specify data file location
sudo arpwatch -f /var/lib/arpwatch/arp.dat

# Enable email notifications
sudo arpwatch -m admin@example.com

# Specify network to monitor
sudo arpwatch -n 192.168.1.0/24

# Combine options
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -m admin@example.com

Archivos de configuración

# Main configuration file locations
/etc/arpwatch.conf          # Main configuration
/etc/default/arpwatch        # Ubuntu/Debian defaults
/etc/sysconfig/arpwatch      # RHEL/CentOS defaults

# Data file locations
/var/lib/arpwatch/arp.dat    # Default database file
/var/lib/arpwatch/ethercodes.dat  # Ethernet vendor codes

# Log file locations
/var/log/arpwatch.log        # Main log file
/var/log/messages            # System log (may contain arpwatch entries)

Configuración básica

# Edit configuration file (Ubuntu/Debian)
sudo nano /etc/default/arpwatch

# Example configuration
INTERFACES="eth0 eth1"
ARGS="-m admin@example.com -s root"
OPTIONS="-N"

# Edit configuration file (RHEL/CentOS)
sudo nano /etc/sysconfig/arpwatch

# Example configuration
OPTIONS="-i eth0 -m admin@example.com -f /var/lib/arpwatch/arp.dat"

Configuración avanzada

Multi-Interface Monitoring

# Create separate instances for multiple interfaces
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -P /var/run/arpwatch_eth0.pid &
sudo arpwatch -i eth1 -f /var/lib/arpwatch/eth1.dat -P /var/run/arpwatch_eth1.pid &
sudo arpwatch -i wlan0 -f /var/lib/arpwatch/wlan0.dat -P /var/run/arpwatch_wlan0.pid &

# Systemd service for multiple interfaces (create separate service files)
sudo cp /lib/systemd/system/arpwatch.service /lib/systemd/system/arpwatch@.service

# Edit the template service file
sudo nano /lib/systemd/system/arpwatch@.service

# Example template content:
[Unit]
Description=Arpwatch daemon for %i
After=network.target

[Service]
Type=forking
ExecStart=/usr/sbin/arpwatch -i %i -f /var/lib/arpwatch/%i.dat -P /var/run/arpwatch_%i.pid
PIDFile=/var/run/arpwatch_%i.pid
User=arpwatch
Group=arpwatch

[Install]
WantedBy=multi-user.target

# Enable services for specific interfaces
sudo systemctl enable arpwatch@eth0.service
sudo systemctl enable arpwatch@eth1.service
sudo systemctl start arpwatch@eth0.service
sudo systemctl start arpwatch@eth1.service

Configuración de notificación por correo electrónico

# Configure email notifications
sudo nano /etc/arpwatch.conf

# Example email configuration
MAILTO="admin@example.com,security@example.com"
MAILSERVER="localhost"
MAILFROM="arpwatch@example.com"

# Test email functionality
echo "Test message" | mail -s "Arpwatch Test" admin@example.com

# Configure postfix for email delivery (if needed)
sudo apt install postfix
sudo systemctl enable postfix
sudo systemctl start postfix

# Configure email templates
sudo mkdir -p /etc/arpwatch/templates
sudo nano /etc/arpwatch/templates/new_station.txt

# Example template:
Subject: New station detected
From: arpwatch@example.com

New station detected on network:
IP Address: %ip%
MAC Address: %mac%
Interface: %interface%
Timestamp: %timestamp%
Vendor: %vendor%

Scripts de Alerta Personalizada

#!/bin/bash
# /usr/local/bin/arpwatch_alert.sh

# Custom alert script for arpwatch
LOG_FILE="/var/log/arpwatch_custom.log"
ALERT_EMAIL="security@example.com"
WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"

# Parse arpwatch input
while read line; do
    TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')

    # Log the event
    echo "[$TIMESTAMP] $line" >> "$LOG_FILE"

    # Parse the alert type and details
    if echo "$line" | grep -q "new station"; then
        ALERT_TYPE="NEW_STATION"
        IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC=$(echo "$line" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

        # Send email alert
        echo "New station detected: IP=$IP, MAC=$MAC" | \
            mail -s "Arpwatch Alert: New Station" "$ALERT_EMAIL"

        # Send Slack notification
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"🚨 Arpwatch Alert: New station detected\\nIP: $IP\\nMAC: $MAC\\nTime: $TIMESTAMP\"}" \
            "$WEBHOOK_URL"

    elif echo "$line" | grep -q "changed ethernet address"; then
        ALERT_TYPE="MAC_CHANGE"
        IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')

        # High priority alert for MAC changes
        echo "SECURITY ALERT: MAC address change detected for IP $IP" | \
            mail -s "URGENT: Arpwatch Security Alert" "$ALERT_EMAIL"

        # Send Slack notification
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"🔥 URGENT: MAC address change detected\\nIP: $IP\\nTime: $TIMESTAMP\"}" \
            "$WEBHOOK_URL"

    elif echo "$line" | grep -q "flip flop"; then
        ALERT_TYPE="FLIP_FLOP"
        IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')

        # Potential ARP spoofing alert
        echo "SECURITY ALERT: Potential ARP spoofing detected for IP $IP" | \
            mail -s "CRITICAL: Potential ARP Spoofing" "$ALERT_EMAIL"
    fi
done

# Make script executable
sudo chmod +x /usr/local/bin/arpwatch_alert.sh

# Configure arpwatch to use custom script
sudo arpwatch -i eth0 -e /usr/local/bin/arpwatch_alert.sh

Gestión de bases de datos

Operaciones de base de datos

# View current ARP database
sudo cat /var/lib/arpwatch/arp.dat

# Convert database to readable format
sudo awk '{print $1, $2, $3}' /var/lib/arpwatch/arp.dat

# Backup database
sudo cp /var/lib/arpwatch/arp.dat /var/lib/arpwatch/arp.dat.backup.$(date +%Y%m%d)

# Restore database
sudo cp /var/lib/arpwatch/arp.dat.backup.20231201 /var/lib/arpwatch/arp.dat

# Clear database (start fresh)
sudo systemctl stop arpwatch
sudo rm /var/lib/arpwatch/arp.dat
sudo systemctl start arpwatch

# Merge multiple database files
sudo cat /var/lib/arpwatch/eth0.dat /var/lib/arpwatch/eth1.dat | \
    sort -u > /var/lib/arpwatch/merged.dat

Scripts de análisis de bases de datos

#!/usr/bin/env python3
# arpwatch_analyzer.py

import sys
import time
import socket
import struct
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import argparse

class ArpwatchAnalyzer:
    def __init__(self, database_file):
        self.database_file = database_file
        self.entries = []
        self.load_database()

    def load_database(self):
        """Load and parse arpwatch database"""
        try:
            with open(self.database_file, 'rb') as f:
                while True:
                    # Read arpwatch database entry (binary format)
                    data = f.read(16)  # Each entry is 16 bytes
                    if len(data) < 16:
                        break

                    # Parse binary data
                    ip_bytes = data[0:4]
                    mac_bytes = data[4:10]
                    timestamp_bytes = data[10:14]

                    # Convert to readable format
                    ip = socket.inet_ntoa(ip_bytes)
                    mac = ':'.join([f'{b:02x}' for b in mac_bytes])
                    timestamp = struct.unpack('>I', timestamp_bytes)[0]

                    self.entries.append({
                        'ip': ip,
                        'mac': mac,
                        'timestamp': timestamp,
                        'datetime': datetime.fromtimestamp(timestamp)
                    })

        except FileNotFoundError:
            print(f"Database file not found: {self.database_file}")
            sys.exit(1)
        except Exception as e:
            print(f"Error reading database: {e}")
            sys.exit(1)

    def get_statistics(self):
        """Generate database statistics"""
        if not self.entries:
            return "No entries in database"

        total_entries = len(self.entries)
        unique_ips = len(set(entry['ip'] for entry in self.entries))
        unique_macs = len(set(entry['mac'] for entry in self.entries))

        # Time range
        timestamps = [entry['timestamp'] for entry in self.entries]
        oldest = datetime.fromtimestamp(min(timestamps))
        newest = datetime.fromtimestamp(max(timestamps))

        # Most active IPs
        ip_counts = Counter(entry['ip'] for entry in self.entries)
        most_active_ips = ip_counts.most_common(5)

        # Most active MACs
        mac_counts = Counter(entry['mac'] for entry in self.entries)
        most_active_macs = mac_counts.most_common(5)

        stats = f"""
Arpwatch Database Statistics
============================
Total entries: {total_entries}
Unique IP addresses: {unique_ips}
Unique MAC addresses: {unique_macs}
Date range: {oldest} to {newest}

Most Active IP Addresses:
"""
        for ip, count in most_active_ips:
            stats += f"  {ip}: {count} entries\n"

        stats += "\nMost Active MAC Addresses:\n"
        for mac, count in most_active_macs:
            vendor = self.get_vendor(mac)
            stats += f"  {mac} ({vendor}): {count} entries\n"

        return stats

    def find_suspicious_activity(self):
        """Detect potentially suspicious ARP activity"""
        suspicious = []

        # Group by IP address
        ip_groups = defaultdict(list)
        for entry in self.entries:
            ip_groups[entry['ip']].append(entry)

        # Check for IP addresses with multiple MAC addresses
        for ip, entries in ip_groups.items():
            macs = set(entry['mac'] for entry in entries)
            if len(macs) > 1:
                suspicious.append({
                    'type': 'IP_MULTIPLE_MACS',
                    'ip': ip,
                    'macs': list(macs),
                    'count': len(entries),
                    'description': f"IP {ip} associated with {len(macs)} different MAC addresses"
                })

        # Group by MAC address
        mac_groups = defaultdict(list)
        for entry in self.entries:
            mac_groups[entry['mac']].append(entry)

        # Check for MAC addresses with multiple IP addresses
        for mac, entries in mac_groups.items():
            ips = set(entry['ip'] for entry in entries)
            if len(ips) > 3:  # Threshold for suspicious activity
                vendor = self.get_vendor(mac)
                suspicious.append({
                    'type': 'MAC_MULTIPLE_IPS',
                    'mac': mac,
                    'vendor': vendor,
                    'ips': list(ips),
                    'count': len(entries),
                    'description': f"MAC {mac} ({vendor}) associated with {len(ips)} different IP addresses"
                })

        # Check for rapid changes (potential ARP spoofing)
        for ip, entries in ip_groups.items():
            if len(entries) > 1:
                sorted_entries = sorted(entries, key=lambda x: x['timestamp'])
                for i in range(1, len(sorted_entries)):
                    time_diff = sorted_entries[i]['timestamp'] - sorted_entries[i-1]['timestamp']
                    if time_diff < 300 and sorted_entries[i]['mac'] != sorted_entries[i-1]['mac']:  # 5 minutes
                        suspicious.append({
                            'type': 'RAPID_MAC_CHANGE',
                            'ip': ip,
                            'old_mac': sorted_entries[i-1]['mac'],
                            'new_mac': sorted_entries[i]['mac'],
                            'time_diff': time_diff,
                            'description': f"Rapid MAC change for IP {ip} ({time_diff}s between changes)"
                        })

        return suspicious

    def get_vendor(self, mac):
        """Get vendor information from MAC address"""
        # Simple vendor lookup (in practice, you'd use a full OUI database)
        oui_map = {
            '00:50:56': 'VMware',
            '08:00:27': 'VirtualBox',
            '52:54:00': 'QEMU',
            '00:0c:29': 'VMware',
            '00:1b:21': 'Intel',
            '00:1e:68': 'Cisco',
            '00:23:69': 'Cisco',
            '00:d0:c9': 'Intel',
            'b8:27:eb': 'Raspberry Pi',
            'dc:a6:32': 'Raspberry Pi'
        }

        oui = mac[:8].upper()
        return oui_map.get(oui, 'Unknown')

    def generate_report(self, output_file=None):
        """Generate comprehensive analysis report"""
        report = f"""
Arpwatch Database Analysis Report
=================================
Generated: {datetime.now()}
Database: {self.database_file}

{self.get_statistics()}

Suspicious Activity Analysis
============================
"""

        suspicious = self.find_suspicious_activity()
        if suspicious:
            for item in suspicious:
                report += f"\n⚠️  {item['type']}: {item['description']}\n"
                if 'macs' in item:
                    report += f"   MAC addresses: {', '.join(item['macs'])}\n"
                if 'ips' in item:
                    report += f"   IP addresses: {', '.join(item['ips'])}\n"
                if 'time_diff' in item:
                    report += f"   Time difference: {item['time_diff']} seconds\n"
        else:
            report += "\nNo suspicious activity detected.\n"

        # Recent activity (last 24 hours)
        recent_threshold = time.time() - 86400  # 24 hours
        recent_entries = [e for e in self.entries if e['timestamp'] > recent_threshold]

        report += f"\nRecent Activity (Last 24 Hours)\n"
        report += f"===============================\n"
        report += f"Recent entries: {len(recent_entries)}\n"

        if recent_entries:
            report += "\nRecent IP/MAC pairs:\n"
            for entry in sorted(recent_entries, key=lambda x: x['timestamp'], reverse=True)[:10]:
                vendor = self.get_vendor(entry['mac'])
                report += f"  {entry['datetime']}: {entry['ip']} -> {entry['mac']} ({vendor})\n"

        if output_file:
            with open(output_file, 'w') as f:
                f.write(report)
            print(f"Report saved to: {output_file}")
        else:
            print(report)

def main():
    parser = argparse.ArgumentParser(description='Arpwatch Database Analyzer')
    parser.add_argument('database', help='Path to arpwatch database file')
    parser.add_argument('--output', '-o', help='Output file for report')
    parser.add_argument('--stats-only', action='store_true', help='Show only statistics')

    args = parser.parse_args()

    analyzer = ArpwatchAnalyzer(args.database)

    if args.stats_only:
        print(analyzer.get_statistics())
    else:
        analyzer.generate_report(args.output)

if __name__ == "__main__":
    main()

Vigilancia y alerta

Script de Monitoreo en tiempo real

#!/bin/bash
# arpwatch_monitor.sh

ARPWATCH_LOG="/var/log/arpwatch.log"
ALERT_EMAIL="admin@example.com"
WHITELIST_FILE="/etc/arpwatch/whitelist.txt"
ALERT_LOG="/var/log/arpwatch_alerts.log"

# Create whitelist if it doesn't exist
if [ ! -f "$WHITELIST_FILE" ]; then
    sudo touch "$WHITELIST_FILE"
    echo "# Arpwatch whitelist - one MAC address per line" | sudo tee "$WHITELIST_FILE"
fi

# Function to check if MAC is whitelisted
is_whitelisted() {
    local mac="$1"
    grep -q "^$mac$" "$WHITELIST_FILE"
}

# Function to send alert
send_alert() {
    local alert_type="$1"
    local message="$2"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

    # Log alert
    echo "[$timestamp] $alert_type: $message" >> "$ALERT_LOG"

    # Send email
    echo "$message" | mail -s "Arpwatch Alert: $alert_type" "$ALERT_EMAIL"

    # Send to syslog
    logger -t arpwatch_monitor "$alert_type: $message"

    echo "Alert sent: $alert_type"
}

# Function to analyze log entry
analyze_entry() {
    local entry="$1"

    if echo "$entry" | grep -q "new station"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

        if ! is_whitelisted "$MAC"; then
            send_alert "NEW_STATION" "New station detected: IP=$IP, MAC=$MAC"
        fi

    elif echo "$entry" | grep -q "changed ethernet address"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        OLD_MAC=$(echo "$entry" | sed -n 's/.*from \([0-9a-f:]\{17\}\).*/\1/p')
        NEW_MAC=$(echo "$entry" | sed -n 's/.*to \([0-9a-f:]\{17\}\).*/\1/p')

        send_alert "MAC_CHANGE" "MAC address changed: IP=$IP, Old MAC=$OLD_MAC, New MAC=$NEW_MAC"

    elif echo "$entry" | grep -q "flip flop"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC1=$(echo "$entry" | sed -n 's/.*between \([0-9a-f:]\{17\}\).*/\1/p')
        MAC2=$(echo "$entry" | sed -n 's/.*and \([0-9a-f:]\{17\}\).*/\1/p')

        send_alert "FLIP_FLOP" "Flip flop detected (potential ARP spoofing): IP=$IP, MAC1=$MAC1, MAC2=$MAC2"

    elif echo "$entry" | grep -q "reused old ethernet address"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

        send_alert "REUSED_ADDRESS" "Reused ethernet address: IP=$IP, MAC=$MAC"
    fi
}

echo "Starting Arpwatch monitor..."
echo "Monitoring log: $ARPWATCH_LOG"
echo "Alert email: $ALERT_EMAIL"
echo "Whitelist file: $WHITELIST_FILE"
echo ""

# Monitor arpwatch log file
tail -F "$ARPWATCH_LOG" | while read line; do
    # Skip empty lines
    if [ -n "$line" ]; then
        analyze_entry "$line"
    fi
done

Creación de bases de red

#!/usr/bin/env python3
# create_network_baseline.py

import subprocess
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
import argparse

class NetworkBaseline:
    def __init__(self, interface='eth0', duration_hours=24):
        self.interface = interface
        self.duration_hours = duration_hours
        self.baseline_data = {
            'created': datetime.now().isoformat(),
            'interface': interface,
            'duration_hours': duration_hours,
            'devices': {},
            'statistics': {}
        }

    def scan_network(self):
        """Perform network scan to discover devices"""
        print(f"Scanning network on interface {self.interface}...")

        # Get network range
        try:
            # Get IP and netmask of interface
            result = subprocess.run(['ip', 'addr', 'show', self.interface], 
                                  capture_output=True, text=True)

            # Parse network range (simplified)
            for line in result.stdout.split('\n'):
                if 'inet ' in line and not '127.0.0.1' in line:
                    ip_info = line.strip().split()[1]
                    network = ip_info.split('/')[0]
                    # Assume /24 network for simplicity
                    network_base = '.'.join(network.split('.')[:-1])
                    network_range = f"{network_base}.0/24"
                    break
            else:
                print(f"Could not determine network range for {self.interface}")
                return

            print(f"Detected network range: {network_range}")

            # Use nmap to scan network
            nmap_cmd = ['nmap', '-sn', network_range]
            result = subprocess.run(nmap_cmd, capture_output=True, text=True)

            # Parse nmap output
            current_ip = None
            for line in result.stdout.split('\n'):
                if 'Nmap scan report for' in line:
                    current_ip = line.split()[-1].strip('()')
                elif 'MAC Address:' in line and current_ip:
                    mac_info = line.split('MAC Address: ')[1]
                    mac = mac_info.split()[0]
                    vendor = mac_info.split('(')[1].rstrip(')') if '(' in mac_info else 'Unknown'

                    self.baseline_data['devices'][current_ip] = {
                        'mac': mac,
                        'vendor': vendor,
                        'first_seen': datetime.now().isoformat(),
                        'last_seen': datetime.now().isoformat(),
                        'status': 'active'
                    }

                    print(f"Found device: {current_ip} -> {mac} ({vendor})")
                    current_ip = None

        except Exception as e:
            print(f"Error scanning network: {e}")

    def monitor_arp_activity(self):
        """Monitor ARP activity for specified duration"""
        print(f"Monitoring ARP activity for {self.duration_hours} hours...")

        end_time = time.time() + (self.duration_hours * 3600)

        # Start tcpdump to capture ARP packets
        tcpdump_cmd = ['tcpdump', '-i', self.interface, '-n', 'arp']

        try:
            process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE, 
                                     stderr=subprocess.PIPE, text=True)

            while time.time() < end_time:
                line = process.stdout.readline()
                if line:
                    self.parse_arp_packet(line.strip())

                # Check if process is still running
                if process.poll() is not None:
                    break

            process.terminate()

        except KeyboardInterrupt:
            print("\nMonitoring interrupted by user")
            if 'process' in locals():
                process.terminate()
        except Exception as e:
            print(f"Error monitoring ARP activity: {e}")

    def parse_arp_packet(self, packet_line):
        """Parse ARP packet from tcpdump output"""
        try:
            # Example: "12:34:56.789012 ARP, Request who-has 192.168.1.1 tell 192.168.1.100, length 28"
            # Example: "12:34:56.789012 ARP, Reply 192.168.1.1 is-at aa:bb:cc:dd:ee:ff, length 28"

            if 'Reply' in packet_line and 'is-at' in packet_line:
                parts = packet_line.split()
                ip = None
                mac = None

                for i, part in enumerate(parts):
                    if part == 'Reply':
                        ip = parts[i+1]
                    elif part == 'is-at':
                        mac = parts[i+1].rstrip(',')

                if ip and mac:
                    self.update_device_info(ip, mac)

        except Exception as e:
            print(f"Error parsing ARP packet: {e}")

    def update_device_info(self, ip, mac):
        """Update device information in baseline"""
        current_time = datetime.now().isoformat()

        if ip in self.baseline_data['devices']:
            # Update existing device
            device = self.baseline_data['devices'][ip]
            device['last_seen'] = current_time

            # Check for MAC address change
            if device['mac'] != mac:
                print(f"MAC change detected: {ip} changed from {device['mac']} to {mac}")
                device['mac_history'] = device.get('mac_history', [])
                device['mac_history'].append({
                    'old_mac': device['mac'],
                    'new_mac': mac,
                    'timestamp': current_time
                })
                device['mac'] = mac
        else:
            # New device discovered
            print(f"New device discovered: {ip} -> {mac}")
            self.baseline_data['devices'][ip] = {
                'mac': mac,
                'vendor': self.get_vendor(mac),
                'first_seen': current_time,
                'last_seen': current_time,
                'status': 'active'
            }

    def get_vendor(self, mac):
        """Get vendor information from MAC address"""
        # Simplified vendor lookup
        oui_map = {
            '00:50:56': 'VMware',
            '08:00:27': 'VirtualBox',
            '52:54:00': 'QEMU',
            '00:0c:29': 'VMware',
            '00:1b:21': 'Intel',
            '00:1e:68': 'Cisco',
            '00:23:69': 'Cisco',
            '00:d0:c9': 'Intel',
            'b8:27:eb': 'Raspberry Pi',
            'dc:a6:32': 'Raspberry Pi'
        }

        oui = mac[:8].upper()
        return oui_map.get(oui, 'Unknown')

    def generate_statistics(self):
        """Generate baseline statistics"""
        devices = self.baseline_data['devices']

        self.baseline_data['statistics'] = {
            'total_devices': len(devices),
            'active_devices': len([d for d in devices.values() if d['status'] == 'active']),
            'vendors': {},
            'mac_changes': 0
        }

        # Count vendors
        vendor_count = defaultdict(int)
        for device in devices.values():
            vendor_count[device['vendor']] += 1
            if 'mac_history' in device:
                self.baseline_data['statistics']['mac_changes'] += len(device['mac_history'])

        self.baseline_data['statistics']['vendors'] = dict(vendor_count)

    def save_baseline(self, filename):
        """Save baseline to file"""
        self.generate_statistics()

        with open(filename, 'w') as f:
            json.dump(self.baseline_data, f, indent=2)

        print(f"Baseline saved to: {filename}")

    def create_baseline(self, output_file):
        """Create complete network baseline"""
        print("Creating network baseline...")
        print("=" * 40)

        # Initial network scan
        self.scan_network()

        # Monitor ARP activity
        if self.duration_hours > 0:
            self.monitor_arp_activity()

        # Save baseline
        self.save_baseline(output_file)

        # Display summary
        self.display_summary()

    def display_summary(self):
        """Display baseline summary"""
        stats = self.baseline_data['statistics']

        print("\nBaseline Summary:")
        print("=" * 20)
        print(f"Total devices discovered: {stats['total_devices']}")
        print(f"Active devices: {stats['active_devices']}")
        print(f"MAC address changes detected: {stats['mac_changes']}")

        print("\nVendor distribution:")
        for vendor, count in stats['vendors'].items():
            print(f"  {vendor}: {count} devices")

        print(f"\nBaseline creation completed for interface: {self.baseline_data['interface']}")
        print(f"Monitoring duration: {self.baseline_data['duration_hours']} hours")

def main():
    parser = argparse.ArgumentParser(description='Create network baseline for arpwatch')
    parser.add_argument('--interface', '-i', default='eth0', help='Network interface to monitor')
    parser.add_argument('--duration', '-d', type=int, default=1, help='Monitoring duration in hours')
    parser.add_argument('--output', '-o', default='network_baseline.json', help='Output file for baseline')
    parser.add_argument('--scan-only', action='store_true', help='Only perform initial scan, no monitoring')

    args = parser.parse_args()

    if args.scan_only:
        duration = 0
    else:
        duration = args.duration

    baseline = NetworkBaseline(args.interface, duration)
    baseline.create_baseline(args.output)

if __name__ == "__main__":
    main()

Aplicaciones de seguridad

ARP Spoofing Detection

#!/bin/bash
# arp_spoofing_detector.sh

INTERFACE="eth0"
THRESHOLD_SECONDS=60  # Alert if MAC changes within 60 seconds
LOG_FILE="/var/log/arp_spoofing.log"
ALERT_EMAIL="security@example.com"

# Temporary files for tracking
TEMP_DIR="/tmp/arp_detector"
mkdir -p "$TEMP_DIR"

# Function to log events
log_event() {
    local level="$1"
    local message="$2"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

    echo "[$timestamp] [$level] $message" | tee -a "$LOG_FILE"

    if [ "$level" = "ALERT" ]; then
        echo "$message" | mail -s "ARP Spoofing Alert" "$ALERT_EMAIL"
        logger -t arp_spoofing_detector "ALERT: $message"
    fi
}

# Function to check for rapid MAC changes
check_rapid_changes() {
    local ip="$1"
    local mac="$2"
    local current_time=$(date +%s)

    local tracking_file="$TEMP_DIR/ip_$ip"

    if [ -f "$tracking_file" ]; then
        local last_entry=$(tail -1 "$tracking_file")
        local last_time=$(echo "$last_entry" | cut -d: -f1)
        local last_mac=$(echo "$last_entry" | cut -d: -f2)

        local time_diff=$((current_time - last_time))

        if [ "$time_diff" -lt "$THRESHOLD_SECONDS" ] && [ "$mac" != "$last_mac" ]; then
            log_event "ALERT" "Rapid MAC change detected for IP $ip: $last_mac -> $mac (${time_diff}s)"

            # Check if this looks like ARP spoofing
            if [ "$time_diff" -lt 10 ]; then
                log_event "ALERT" "POSSIBLE ARP SPOOFING: Very rapid MAC change for IP $ip"

                # Additional checks
                check_duplicate_mac "$mac" "$ip"
            fi
        fi
    fi

    # Update tracking file
    echo "$current_time:$mac" >> "$tracking_file"

    # Keep only last 10 entries
    tail -10 "$tracking_file" > "$tracking_file.tmp"
    mv "$tracking_file.tmp" "$tracking_file"
}

# Function to check for duplicate MAC addresses
check_duplicate_mac() {
    local mac="$1"
    local current_ip="$2"

    # Check if this MAC is associated with other IPs
    for tracking_file in "$TEMP_DIR"/ip_*; do
        if [ -f "$tracking_file" ]; then
            local file_ip=$(basename "$tracking_file" | sed 's/ip_//')

            if [ "$file_ip" != "$current_ip" ]; then
                local last_entry=$(tail -1 "$tracking_file")
                local last_mac=$(echo "$last_entry" | cut -d: -f2)

                if [ "$last_mac" = "$mac" ]; then
                    log_event "ALERT" "MAC address conflict: $mac is associated with both $current_ip and $file_ip"
                fi
            fi
        fi
    done
}

# Function to analyze ARP table
analyze_arp_table() {
    log_event "INFO" "Analyzing current ARP table"

    # Get current ARP table
    arp -a | while read line; do
        if echo "$line" | grep -q "at"; then
            local hostname=$(echo "$line" | cut -d' ' -f1)
            local ip=$(echo "$line" | sed 's/.*(\([^)]*\)).*/\1/')
            local mac=$(echo "$line" | awk '{print $4}')

            if [ -n "$ip" ] && [ -n "$mac" ]; then
                check_rapid_changes "$ip" "$mac"
            fi
        fi
    done
}

# Function to monitor ARP traffic
monitor_arp_traffic() {
    log_event "INFO" "Starting ARP traffic monitoring on interface $INTERFACE"

    # Use tcpdump to monitor ARP traffic
    tcpdump -i "$INTERFACE" -n arp | while read line; do
        if echo "$line" | grep -q "Reply.*is-at"; then
            local ip=$(echo "$line" | sed -n 's/.*Reply \([0-9.]*\) is-at.*/\1/p')
            local mac=$(echo "$line" | sed -n 's/.*is-at \([0-9a-f:]*\).*/\1/p')

            if [ -n "$ip" ] && [ -n "$mac" ]; then
                log_event "DEBUG" "ARP Reply: $ip -> $mac"
                check_rapid_changes "$ip" "$mac"
            fi
        fi
    done
}

# Cleanup function
cleanup() {
    log_event "INFO" "Shutting down ARP spoofing detector"
    rm -rf "$TEMP_DIR"
    exit 0
}

# Set up signal handlers
trap cleanup SIGINT SIGTERM

log_event "INFO" "Starting ARP spoofing detector"
log_event "INFO" "Interface: $INTERFACE"
log_event "INFO" "Threshold: $THRESHOLD_SECONDS seconds"
log_event "INFO" "Log file: $LOG_FILE"

# Initial ARP table analysis
analyze_arp_table

# Start monitoring
monitor_arp_traffic

Network Security Dashboard

#!/usr/bin/env python3
# arpwatch_dashboard.py

import json
import time
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify
import sqlite3
import threading

app = Flask(__name__)

class ArpwatchDashboard:
    def __init__(self, db_file='arpwatch_dashboard.db'):
        self.db_file = db_file
        self.init_database()
        self.running = True

    def init_database(self):
        """Initialize SQLite database for dashboard data"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()

        # Create tables
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS devices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                ip TEXT NOT NULL,
                mac TEXT NOT NULL,
                vendor TEXT,
                first_seen TIMESTAMP,
                last_seen TIMESTAMP,
                status TEXT DEFAULT 'active'
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TIMESTAMP,
                event_type TEXT,
                ip TEXT,
                mac TEXT,
                old_mac TEXT,
                description TEXT
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TIMESTAMP,
                alert_type TEXT,
                severity TEXT,
                ip TEXT,
                mac TEXT,
                description TEXT,
                acknowledged BOOLEAN DEFAULT FALSE
            )
        ''')

        conn.commit()
        conn.close()

    def monitor_arpwatch_log(self, log_file='/var/log/arpwatch.log'):
        """Monitor arpwatch log file for new entries"""
        try:
            # Follow log file
            process = subprocess.Popen(['tail', '-F', log_file], 
                                     stdout=subprocess.PIPE, 
                                     stderr=subprocess.PIPE, 
                                     text=True)

            while self.running:
                line = process.stdout.readline()
                if line:
                    self.process_log_entry(line.strip())

                if process.poll() is not None:
                    break

            process.terminate()

        except Exception as e:
            print(f"Error monitoring log: {e}")

    def process_log_entry(self, log_entry):
        """Process arpwatch log entry and update database"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()

        timestamp = datetime.now()

        try:
            if 'new station' in log_entry:
                # Extract IP and MAC
                import re
                ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
                mac_match = re.search(r'([0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2})', log_entry)

                if ip_match and mac_match:
                    ip = ip_match.group(1)
                    mac = mac_match.group(1)
                    vendor = self.get_vendor(mac)

                    # Insert or update device
                    cursor.execute('''
                        INSERT OR REPLACE INTO devices 
                        (ip, mac, vendor, first_seen, last_seen, status)
                        VALUES (?, ?, ?, ?, ?, 'active')
                    ''', (ip, mac, vendor, timestamp, timestamp))

                    # Add event
                    cursor.execute('''
                        INSERT INTO events 
                        (timestamp, event_type, ip, mac, description)
                        VALUES (?, 'new_station', ?, ?, ?)
                    ''', (timestamp, ip, mac, f'New station: {ip} -> {mac}'))

                    # Add alert for new devices
                    cursor.execute('''
                        INSERT INTO alerts 
                        (timestamp, alert_type, severity, ip, mac, description)
                        VALUES (?, 'new_device', 'info', ?, ?, ?)
                    ''', (timestamp, ip, mac, f'New device detected: {ip} ({vendor})'))

            elif 'changed ethernet address' in log_entry:
                # MAC address change
                import re
                ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
                old_mac_match = re.search(r'from ([0-9a-f:]{17})', log_entry)
                new_mac_match = re.search(r'to ([0-9a-f:]{17})', log_entry)

                if ip_match and old_mac_match and new_mac_match:
                    ip = ip_match.group(1)
                    old_mac = old_mac_match.group(1)
                    new_mac = new_mac_match.group(1)
                    vendor = self.get_vendor(new_mac)

                    # Update device
                    cursor.execute('''
                        UPDATE devices 
                        SET mac = ?, vendor = ?, last_seen = ?
                        WHERE ip = ?
                    ''', (new_mac, vendor, timestamp, ip))

                    # Add event
                    cursor.execute('''
                        INSERT INTO events 
                        (timestamp, event_type, ip, mac, old_mac, description)
                        VALUES (?, 'mac_change', ?, ?, ?, ?)
                    ''', (timestamp, ip, new_mac, old_mac, f'MAC changed: {ip} from {old_mac} to {new_mac}'))

                    # Add high-priority alert
                    cursor.execute('''
                        INSERT INTO alerts 
                        (timestamp, alert_type, severity, ip, mac, description)
                        VALUES (?, 'mac_change', 'warning', ?, ?, ?)
                    ''', (timestamp, ip, new_mac, f'MAC address changed for {ip}'))

            elif 'flip flop' in log_entry:
                # Potential ARP spoofing
                import re
                ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)

                if ip_match:
                    ip = ip_match.group(1)

                    # Add critical alert
                    cursor.execute('''
                        INSERT INTO alerts 
                        (timestamp, alert_type, severity, ip, description)
                        VALUES (?, 'flip_flop', 'critical', ?, ?)
                    ''', (timestamp, ip, f'Flip flop detected for {ip} - potential ARP spoofing'))

            conn.commit()

        except Exception as e:
            print(f"Error processing log entry: {e}")
        finally:
            conn.close()

    def get_vendor(self, mac):
        """Get vendor from MAC address"""
        oui_map = {
            '00:50:56': 'VMware',
            '08:00:27': 'VirtualBox',
            '52:54:00': 'QEMU',
            '00:0c:29': 'VMware',
            '00:1b:21': 'Intel',
            '00:1e:68': 'Cisco',
            '00:23:69': 'Cisco',
            '00:d0:c9': 'Intel',
            'b8:27:eb': 'Raspberry Pi',
            'dc:a6:32': 'Raspberry Pi'
        }

        oui = mac[:8].upper()
        return oui_map.get(oui, 'Unknown')

# Global dashboard instance
dashboard = ArpwatchDashboard()

@app.route('/')
def index():
    """Main dashboard page"""
    return render_template('dashboard.html')

@app.route('/api/devices')
def api_devices():
    """API endpoint for device list"""
    conn = sqlite3.connect(dashboard.db_file)
    cursor = conn.cursor()

    cursor.execute('''
        SELECT ip, mac, vendor, first_seen, last_seen, status
        FROM devices
        ORDER BY last_seen DESC
    ''')

    devices = []
    for row in cursor.fetchall():
        devices.append({
            'ip': row[0],
            'mac': row[1],
            'vendor': row[2],
            'first_seen': row[3],
            'last_seen': row[4],
            'status': row[5]
        })

    conn.close()
    return jsonify(devices)

@app.route('/api/alerts')
def api_alerts():
    """API endpoint for alerts"""
    conn = sqlite3.connect(dashboard.db_file)
    cursor = conn.cursor()

    cursor.execute('''
        SELECT timestamp, alert_type, severity, ip, mac, description, acknowledged
        FROM alerts
        ORDER BY timestamp DESC
        LIMIT 50
    ''')

    alerts = []
    for row in cursor.fetchall():
        alerts.append({
            'timestamp': row[0],
            'alert_type': row[1],
            'severity': row[2],
            'ip': row[3],
            'mac': row[4],
            'description': row[5],
            'acknowledged': row[6]
        })

    conn.close()
    return jsonify(alerts)

@app.route('/api/statistics')
def api_statistics():
    """API endpoint for statistics"""
    conn = sqlite3.connect(dashboard.db_file)
    cursor = conn.cursor()

    # Get device count
    cursor.execute('SELECT COUNT(*) FROM devices WHERE status = "active"')
    active_devices = cursor.fetchone()[0]

    # Get alert counts
    cursor.execute('SELECT COUNT(*) FROM alerts WHERE acknowledged = FALSE')
    unacknowledged_alerts = cursor.fetchone()[0]

    # Get recent events count
    yesterday = datetime.now() - timedelta(days=1)
    cursor.execute('SELECT COUNT(*) FROM events WHERE timestamp > ?', (yesterday,))
    recent_events = cursor.fetchone()[0]

    # Get vendor distribution
    cursor.execute('''
        SELECT vendor, COUNT(*) as count
        FROM devices
        WHERE status = "active"
        GROUP BY vendor
        ORDER BY count DESC
    ''')

    vendor_distribution = []
    for row in cursor.fetchall():
        vendor_distribution.append({
            'vendor': row[0],
            'count': row[1]
        })

    conn.close()

    return jsonify({
        'active_devices': active_devices,
        'unacknowledged_alerts': unacknowledged_alerts,
        'recent_events': recent_events,
        'vendor_distribution': vendor_distribution
    })

def start_monitoring():
    """Start monitoring in background thread"""
    monitor_thread = threading.Thread(target=dashboard.monitor_arpwatch_log)
    monitor_thread.daemon = True
    monitor_thread.start()

if __name__ == '__main__':
    # Start monitoring
    start_monitoring()

    # Start web server
    app.run(host='0.0.0.0', port=5000, debug=False)

Esta hoja de trampa completa de Arpwatch proporciona todo lo necesario para la vigilancia profesional de la red, el análisis de seguridad y la detección de intrusiones, desde el monitoreo básico de ARP hasta la implementación avanzada del panel de seguridad y la detección automática de amenazas.