コンテンツにスキップ

Arpwatch Cheatsheet

- :material-content-copy: **[Copy to Clipboard](#copy-to-clipboard)** - :material-file-pdf-box: **[Download PDF](#download-pdf)**

Overview

Arpwatch is a network monitoring tool that keeps track of Ethernet/IP address pairings on a network. It monitors ARP (Address Resolution Protocol) activity and maintains a database of Ethernet/IP address pairings. When it detects changes in the network topology, such as new stations, changed addresses, or potential ARP spoofing attacks, it can send email notifications and log the events.

Key Features

  • ARP Monitoring: Continuous monitoring of ARP traffic and address mappings
  • Change Detection: Automatic detection of new stations and address changes
  • Security Alerting: Email notifications for suspicious ARP activity
  • Database Maintenance: Persistent storage of Ethernet/IP address pairings
  • Network Mapping: Automatic discovery and tracking of network devices
  • Intrusion Detection: Detection of potential ARP spoofing and MAC address conflicts
  • Historical Tracking: Long-term monitoring of network topology changes
  • Multi-interface Support: Monitoring multiple network interfaces simultaneously

Installation

Linux Systems

# Ubuntu/Debian
sudo apt update
sudo apt install arpwatch

# CentOS/RHEL/Fedora
sudo yum install arpwatch
# or
sudo dnf install arpwatch

# Arch Linux
sudo pacman -S arpwatch

# openSUSE
sudo zypper install arpwatch

# From source
wget https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure
make
sudo make install

# Verify installation
arpwatch -h

FreeBSD/OpenBSD

# FreeBSD
sudo pkg install arpwatch

# OpenBSD
sudo pkg_add arpwatch

# From ports (FreeBSD)
cd /usr/ports/net-mgmt/arpwatch
sudo make install clean

# Verify installation
which arpwatch

macOS

# Using Homebrew
brew install arpwatch

# Using MacPorts
sudo port install arpwatch

# Manual installation
curl -O https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure --prefix=/usr/local
make
sudo make install

# Verify installation
arpwatch -h

Basic Usage

Starting Arpwatch

# Basic startup (requires root privileges)
sudo arpwatch

# Specify interface
sudo arpwatch -i eth0

# Run in foreground (don't daemonize)
sudo arpwatch -d

# Specify data file location
sudo arpwatch -f /var/lib/arpwatch/arp.dat

# Enable email notifications
sudo arpwatch -m admin@example.com

# Specify network to monitor
sudo arpwatch -n 192.168.1.0/24

# Combine options
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -m admin@example.com

Configuration Files

# Main configuration file locations
/etc/arpwatch.conf          # Main configuration
/etc/default/arpwatch        # Ubuntu/Debian defaults
/etc/sysconfig/arpwatch      # RHEL/CentOS defaults

# Data file locations
/var/lib/arpwatch/arp.dat    # Default database file
/var/lib/arpwatch/ethercodes.dat  # Ethernet vendor codes

# Log file locations
/var/log/arpwatch.log        # Main log file
/var/log/messages            # System log (may contain arpwatch entries)

Basic Configuration

# Edit configuration file (Ubuntu/Debian)
sudo nano /etc/default/arpwatch

# Example configuration
INTERFACES="eth0 eth1"
ARGS="-m admin@example.com -s root"
OPTIONS="-N"

# Edit configuration file (RHEL/CentOS)
sudo nano /etc/sysconfig/arpwatch

# Example configuration
OPTIONS="-i eth0 -m admin@example.com -f /var/lib/arpwatch/arp.dat"

Advanced Configuration

Multi-Interface Monitoring

# Create separate instances for multiple interfaces
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -P /var/run/arpwatch_eth0.pid &
sudo arpwatch -i eth1 -f /var/lib/arpwatch/eth1.dat -P /var/run/arpwatch_eth1.pid &
sudo arpwatch -i wlan0 -f /var/lib/arpwatch/wlan0.dat -P /var/run/arpwatch_wlan0.pid &

# Systemd service for multiple interfaces (create separate service files)
sudo cp /lib/systemd/system/arpwatch.service /lib/systemd/system/arpwatch@.service

# Edit the template service file
sudo nano /lib/systemd/system/arpwatch@.service

# Example template content:
[Unit]
Description=Arpwatch daemon for %i
After=network.target

[Service]
Type=forking
ExecStart=/usr/sbin/arpwatch -i %i -f /var/lib/arpwatch/%i.dat -P /var/run/arpwatch_%i.pid
PIDFile=/var/run/arpwatch_%i.pid
User=arpwatch
Group=arpwatch

[Install]
WantedBy=multi-user.target

# Enable services for specific interfaces
sudo systemctl enable arpwatch@eth0.service
sudo systemctl enable arpwatch@eth1.service
sudo systemctl start arpwatch@eth0.service
sudo systemctl start arpwatch@eth1.service

Email Notification Setup

# Configure email notifications
sudo nano /etc/arpwatch.conf

# Example email configuration
MAILTO="admin@example.com,security@example.com"
MAILSERVER="localhost"
MAILFROM="arpwatch@example.com"

# Test email functionality
echo "Test message" | mail -s "Arpwatch Test" admin@example.com

# Configure postfix for email delivery (if needed)
sudo apt install postfix
sudo systemctl enable postfix
sudo systemctl start postfix

# Configure email templates
sudo mkdir -p /etc/arpwatch/templates
sudo nano /etc/arpwatch/templates/new_station.txt

# Example template:
Subject: New station detected
From: arpwatch@example.com

New station detected on network:
IP Address: %ip%
MAC Address: %mac%
Interface: %interface%
Timestamp: %timestamp%
Vendor: %vendor%

Custom Alert Scripts

#!/bin/bash
# /usr/local/bin/arpwatch_alert.sh

# Custom alert script for arpwatch
LOG_FILE="/var/log/arpwatch_custom.log"
ALERT_EMAIL="security@example.com"
WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"

# Parse arpwatch input
while read line; do
    TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')

    # Log the event
    echo "[$TIMESTAMP] $line" >> "$LOG_FILE"

    # Parse the alert type and details
    if echo "$line" | grep -q "new station"; then
        ALERT_TYPE="NEW_STATION"
        IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC=$(echo "$line" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

        # Send email alert
        echo "New station detected: IP=$IP, MAC=$MAC" | \
            mail -s "Arpwatch Alert: New Station" "$ALERT_EMAIL"

        # Send Slack notification
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"🚨 Arpwatch Alert: New station detected\\nIP: $IP\\nMAC: $MAC\\nTime: $TIMESTAMP\"}" \
            "$WEBHOOK_URL"

    elif echo "$line" | grep -q "changed ethernet address"; then
        ALERT_TYPE="MAC_CHANGE"
        IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')

        # High priority alert for MAC changes
        echo "SECURITY ALERT: MAC address change detected for IP $IP" | \
            mail -s "URGENT: Arpwatch Security Alert" "$ALERT_EMAIL"

        # Send Slack notification
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"🔥 URGENT: MAC address change detected\\nIP: $IP\\nTime: $TIMESTAMP\"}" \
            "$WEBHOOK_URL"

    elif echo "$line" | grep -q "flip flop"; then
        ALERT_TYPE="FLIP_FLOP"
        IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')

        # Potential ARP spoofing alert
        echo "SECURITY ALERT: Potential ARP spoofing detected for IP $IP" | \
            mail -s "CRITICAL: Potential ARP Spoofing" "$ALERT_EMAIL"
    fi
done

# Make script executable
sudo chmod +x /usr/local/bin/arpwatch_alert.sh

# Configure arpwatch to use custom script
sudo arpwatch -i eth0 -e /usr/local/bin/arpwatch_alert.sh

Database Management

Database Operations

# View current ARP database
sudo cat /var/lib/arpwatch/arp.dat

# Convert database to readable format
sudo awk '{print $1, $2, $3}' /var/lib/arpwatch/arp.dat

# Backup database
sudo cp /var/lib/arpwatch/arp.dat /var/lib/arpwatch/arp.dat.backup.$(date +%Y%m%d)

# Restore database
sudo cp /var/lib/arpwatch/arp.dat.backup.20231201 /var/lib/arpwatch/arp.dat

# Clear database (start fresh)
sudo systemctl stop arpwatch
sudo rm /var/lib/arpwatch/arp.dat
sudo systemctl start arpwatch

# Merge multiple database files
sudo cat /var/lib/arpwatch/eth0.dat /var/lib/arpwatch/eth1.dat | \
    sort -u > /var/lib/arpwatch/merged.dat

Database Analysis Scripts

#!/usr/bin/env python3
# arpwatch_analyzer.py

import sys
import time
import socket
import struct
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import argparse

class ArpwatchAnalyzer:
    def __init__(self, database_file):
        self.database_file = database_file
        self.entries = []
        self.load_database()

    def load_database(self):
        """Load and parse arpwatch database"""
        try:
            with open(self.database_file, 'rb') as f:
                while True:
                    # Read arpwatch database entry (binary format)
                    data = f.read(16)  # Each entry is 16 bytes
                    if len(data) < 16:
                        break

                    # Parse binary data
                    ip_bytes = data[0:4]
                    mac_bytes = data[4:10]
                    timestamp_bytes = data[10:14]

                    # Convert to readable format
                    ip = socket.inet_ntoa(ip_bytes)
                    mac = ':'.join([f'{b:02x}' for b in mac_bytes])
                    timestamp = struct.unpack('>I', timestamp_bytes)[0]

                    self.entries.append({
                        'ip': ip,
                        'mac': mac,
                        'timestamp': timestamp,
                        'datetime': datetime.fromtimestamp(timestamp)
                    })

        except FileNotFoundError:
            print(f"Database file not found: {self.database_file}")
            sys.exit(1)
        except Exception as e:
            print(f"Error reading database: {e}")
            sys.exit(1)

    def get_statistics(self):
        """Generate database statistics"""
        if not self.entries:
            return "No entries in database"

        total_entries = len(self.entries)
        unique_ips = len(set(entry['ip'] for entry in self.entries))
        unique_macs = len(set(entry['mac'] for entry in self.entries))

        # Time range
        timestamps = [entry['timestamp'] for entry in self.entries]
        oldest = datetime.fromtimestamp(min(timestamps))
        newest = datetime.fromtimestamp(max(timestamps))

        # Most active IPs
        ip_counts = Counter(entry['ip'] for entry in self.entries)
        most_active_ips = ip_counts.most_common(5)

        # Most active MACs
        mac_counts = Counter(entry['mac'] for entry in self.entries)
        most_active_macs = mac_counts.most_common(5)

        stats = f"""
Arpwatch Database Statistics
============================
Total entries: {total_entries}
Unique IP addresses: {unique_ips}
Unique MAC addresses: {unique_macs}
Date range: {oldest} to {newest}

Most Active IP Addresses:
"""
        for ip, count in most_active_ips:
            stats += f"  {ip}: {count} entries\n"

        stats += "\nMost Active MAC Addresses:\n"
        for mac, count in most_active_macs:
            vendor = self.get_vendor(mac)
            stats += f"  {mac} ({vendor}): {count} entries\n"

        return stats

    def find_suspicious_activity(self):
        """Detect potentially suspicious ARP activity"""
        suspicious = []

        # Group by IP address
        ip_groups = defaultdict(list)
        for entry in self.entries:
            ip_groups[entry['ip']].append(entry)

        # Check for IP addresses with multiple MAC addresses
        for ip, entries in ip_groups.items():
            macs = set(entry['mac'] for entry in entries)
            if len(macs) > 1:
                suspicious.append({
                    'type': 'IP_MULTIPLE_MACS',
                    'ip': ip,
                    'macs': list(macs),
                    'count': len(entries),
                    'description': f"IP {ip} associated with {len(macs)} different MAC addresses"
                })

        # Group by MAC address
        mac_groups = defaultdict(list)
        for entry in self.entries:
            mac_groups[entry['mac']].append(entry)

        # Check for MAC addresses with multiple IP addresses
        for mac, entries in mac_groups.items():
            ips = set(entry['ip'] for entry in entries)
            if len(ips) > 3:  # Threshold for suspicious activity
                vendor = self.get_vendor(mac)
                suspicious.append({
                    'type': 'MAC_MULTIPLE_IPS',
                    'mac': mac,
                    'vendor': vendor,
                    'ips': list(ips),
                    'count': len(entries),
                    'description': f"MAC {mac} ({vendor}) associated with {len(ips)} different IP addresses"
                })

        # Check for rapid changes (potential ARP spoofing)
        for ip, entries in ip_groups.items():
            if len(entries) > 1:
                sorted_entries = sorted(entries, key=lambda x: x['timestamp'])
                for i in range(1, len(sorted_entries)):
                    time_diff = sorted_entries[i]['timestamp'] - sorted_entries[i-1]['timestamp']
                    if time_diff < 300 and sorted_entries[i]['mac'] != sorted_entries[i-1]['mac']:  # 5 minutes
                        suspicious.append({
                            'type': 'RAPID_MAC_CHANGE',
                            'ip': ip,
                            'old_mac': sorted_entries[i-1]['mac'],
                            'new_mac': sorted_entries[i]['mac'],
                            'time_diff': time_diff,
                            'description': f"Rapid MAC change for IP {ip} ({time_diff}s between changes)"
                        })

        return suspicious

    def get_vendor(self, mac):
        """Get vendor information from MAC address"""
        # Simple vendor lookup (in practice, you'd use a full OUI database)
        oui_map = {
            '00:50:56': 'VMware',
            '08:00:27': 'VirtualBox',
            '52:54:00': 'QEMU',
            '00:0c:29': 'VMware',
            '00:1b:21': 'Intel',
            '00:1e:68': 'Cisco',
            '00:23:69': 'Cisco',
            '00:d0:c9': 'Intel',
            'b8:27:eb': 'Raspberry Pi',
            'dc:a6:32': 'Raspberry Pi'
        }

        oui = mac[:8].upper()
        return oui_map.get(oui, 'Unknown')

    def generate_report(self, output_file=None):
        """Generate comprehensive analysis report"""
        report = f"""
Arpwatch Database Analysis Report
=================================
Generated: {datetime.now()}
Database: {self.database_file}

{self.get_statistics()}

Suspicious Activity Analysis
============================
"""

        suspicious = self.find_suspicious_activity()
        if suspicious:
            for item in suspicious:
                report += f"\n⚠️  {item['type']}: {item['description']}\n"
                if 'macs' in item:
                    report += f"   MAC addresses: {', '.join(item['macs'])}\n"
                if 'ips' in item:
                    report += f"   IP addresses: {', '.join(item['ips'])}\n"
                if 'time_diff' in item:
                    report += f"   Time difference: {item['time_diff']} seconds\n"
        else:
            report += "\nNo suspicious activity detected.\n"

        # Recent activity (last 24 hours)
        recent_threshold = time.time() - 86400  # 24 hours
        recent_entries = [e for e in self.entries if e['timestamp'] > recent_threshold]

        report += f"\nRecent Activity (Last 24 Hours)\n"
        report += f"===============================\n"
        report += f"Recent entries: {len(recent_entries)}\n"

        if recent_entries:
            report += "\nRecent IP/MAC pairs:\n"
            for entry in sorted(recent_entries, key=lambda x: x['timestamp'], reverse=True)[:10]:
                vendor = self.get_vendor(entry['mac'])
                report += f"  {entry['datetime']}: {entry['ip']} -> {entry['mac']} ({vendor})\n"

        if output_file:
            with open(output_file, 'w') as f:
                f.write(report)
            print(f"Report saved to: {output_file}")
        else:
            print(report)

def main():
    parser = argparse.ArgumentParser(description='Arpwatch Database Analyzer')
    parser.add_argument('database', help='Path to arpwatch database file')
    parser.add_argument('--output', '-o', help='Output file for report')
    parser.add_argument('--stats-only', action='store_true', help='Show only statistics')

    args = parser.parse_args()

    analyzer = ArpwatchAnalyzer(args.database)

    if args.stats_only:
        print(analyzer.get_statistics())
    else:
        analyzer.generate_report(args.output)

if __name__ == "__main__":
    main()

Monitoring and Alerting

Real-time Monitoring Script

#!/bin/bash
# arpwatch_monitor.sh

ARPWATCH_LOG="/var/log/arpwatch.log"
ALERT_EMAIL="admin@example.com"
WHITELIST_FILE="/etc/arpwatch/whitelist.txt"
ALERT_LOG="/var/log/arpwatch_alerts.log"

# Create whitelist if it doesn't exist
if [ ! -f "$WHITELIST_FILE" ]; then
    sudo touch "$WHITELIST_FILE"
    echo "# Arpwatch whitelist - one MAC address per line" | sudo tee "$WHITELIST_FILE"
fi

# Function to check if MAC is whitelisted
is_whitelisted() {
    local mac="$1"
    grep -q "^$mac$" "$WHITELIST_FILE"
}

# Function to send alert
send_alert() {
    local alert_type="$1"
    local message="$2"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

    # Log alert
    echo "[$timestamp] $alert_type: $message" >> "$ALERT_LOG"

    # Send email
    echo "$message" | mail -s "Arpwatch Alert: $alert_type" "$ALERT_EMAIL"

    # Send to syslog
    logger -t arpwatch_monitor "$alert_type: $message"

    echo "Alert sent: $alert_type"
}

# Function to analyze log entry
analyze_entry() {
    local entry="$1"

    if echo "$entry" | grep -q "new station"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

        if ! is_whitelisted "$MAC"; then
            send_alert "NEW_STATION" "New station detected: IP=$IP, MAC=$MAC"
        fi

    elif echo "$entry" | grep -q "changed ethernet address"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        OLD_MAC=$(echo "$entry" | sed -n 's/.*from \([0-9a-f:]\{17\}\).*/\1/p')
        NEW_MAC=$(echo "$entry" | sed -n 's/.*to \([0-9a-f:]\{17\}\).*/\1/p')

        send_alert "MAC_CHANGE" "MAC address changed: IP=$IP, Old MAC=$OLD_MAC, New MAC=$NEW_MAC"

    elif echo "$entry" | grep -q "flip flop"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC1=$(echo "$entry" | sed -n 's/.*between \([0-9a-f:]\{17\}\).*/\1/p')
        MAC2=$(echo "$entry" | sed -n 's/.*and \([0-9a-f:]\{17\}\).*/\1/p')

        send_alert "FLIP_FLOP" "Flip flop detected (potential ARP spoofing): IP=$IP, MAC1=$MAC1, MAC2=$MAC2"

    elif echo "$entry" | grep -q "reused old ethernet address"; then
        IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
        MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')

        send_alert "REUSED_ADDRESS" "Reused ethernet address: IP=$IP, MAC=$MAC"
    fi
}

echo "Starting Arpwatch monitor..."
echo "Monitoring log: $ARPWATCH_LOG"
echo "Alert email: $ALERT_EMAIL"
echo "Whitelist file: $WHITELIST_FILE"
echo ""

# Monitor arpwatch log file
tail -F "$ARPWATCH_LOG" | while read line; do
    # Skip empty lines
    if [ -n "$line" ]; then
        analyze_entry "$line"
    fi
done

Network Baseline Creation

#!/usr/bin/env python3
# create_network_baseline.py

import subprocess
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
import argparse

class NetworkBaseline:
    def __init__(self, interface='eth0', duration_hours=24):
        self.interface = interface
        self.duration_hours = duration_hours
        self.baseline_data = {
            'created': datetime.now().isoformat(),
            'interface': interface,
            'duration_hours': duration_hours,
            'devices': {},
            'statistics': {}
        }

    def scan_network(self):
        """Perform network scan to discover devices"""
        print(f"Scanning network on interface {self.interface}...")

        # Get network range
        try:
            # Get IP and netmask of interface
            result = subprocess.run(['ip', 'addr', 'show', self.interface], 
                                  capture_output=True, text=True)

            # Parse network range (simplified)
            for line in result.stdout.split('\n'):
                if 'inet ' in line and not '127.0.0.1' in line:
                    ip_info = line.strip().split()[1]
                    network = ip_info.split('/')[0]
                    # Assume /24 network for simplicity
                    network_base = '.'.join(network.split('.')[:-1])
                    network_range = f"{network_base}.0/24"
                    break
            else:
                print(f"Could not determine network range for {self.interface}")
                return

            print(f"Detected network range: {network_range}")

            # Use nmap to scan network
            nmap_cmd = ['nmap', '-sn', network_range]
            result = subprocess.run(nmap_cmd, capture_output=True, text=True)

            # Parse nmap output
            current_ip = None
            for line in result.stdout.split('\n'):
                if 'Nmap scan report for' in line:
                    current_ip = line.split()[-1].strip('()')
                elif 'MAC Address:' in line and current_ip:
                    mac_info = line.split('MAC Address: ')[1]
                    mac = mac_info.split()[0]
                    vendor = mac_info.split('(')[1].rstrip(')') if '(' in mac_info else 'Unknown'

                    self.baseline_data['devices'][current_ip] = {
                        'mac': mac,
                        'vendor': vendor,
                        'first_seen': datetime.now().isoformat(),
                        'last_seen': datetime.now().isoformat(),
                        'status': 'active'
                    }

                    print(f"Found device: {current_ip} -> {mac} ({vendor})")
                    current_ip = None

        except Exception as e:
            print(f"Error scanning network: {e}")

    def monitor_arp_activity(self):
        """Monitor ARP activity for specified duration"""
        print(f"Monitoring ARP activity for {self.duration_hours} hours...")

        end_time = time.time() + (self.duration_hours * 3600)

        # Start tcpdump to capture ARP packets
        tcpdump_cmd = ['tcpdump', '-i', self.interface, '-n', 'arp']

        try:
            process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE, 
                                     stderr=subprocess.PIPE, text=True)

            while time.time() < end_time:
                line = process.stdout.readline()
                if line:
                    self.parse_arp_packet(line.strip())

                # Check if process is still running
                if process.poll() is not None:
                    break

            process.terminate()

        except KeyboardInterrupt:
            print("\nMonitoring interrupted by user")
            if 'process' in locals():
                process.terminate()
        except Exception as e:
            print(f"Error monitoring ARP activity: {e}")

    def parse_arp_packet(self, packet_line):
        """Parse ARP packet from tcpdump output"""
        try:
            # Example: "12:34:56.789012 ARP, Request who-has 192.168.1.1 tell 192.168.1.100, length 28"
            # Example: "12:34:56.789012 ARP, Reply 192.168.1.1 is-at aa:bb:cc:dd:ee:ff, length 28"

            if 'Reply' in packet_line and 'is-at' in packet_line:
                parts = packet_line.split()
                ip = None
                mac = None

                for i, part in enumerate(parts):
                    if part == 'Reply':
                        ip = parts[i+1]
                    elif part == 'is-at':
                        mac = parts[i+1].rstrip(',')

                if ip and mac:
                    self.update_device_info(ip, mac)

        except Exception as e:
            print(f"Error parsing ARP packet: {e}")

    def update_device_info(self, ip, mac):
        """Update device information in baseline"""
        current_time = datetime.now().isoformat()

        if ip in self.baseline_data['devices']:
            # Update existing device
            device = self.baseline_data['devices'][ip]
            device['last_seen'] = current_time

            # Check for MAC address change
            if device['mac'] != mac:
                print(f"MAC change detected: {ip} changed from {device['mac']} to {mac}")
                device['mac_history'] = device.get('mac_history', [])
                device['mac_history'].append({
                    'old_mac': device['mac'],
                    'new_mac': mac,
                    'timestamp': current_time
                })
                device['mac'] = mac
        else:
            # New device discovered
            print(f"New device discovered: {ip} -> {mac}")
            self.baseline_data['devices'][ip] = {
                'mac': mac,
                'vendor': self.get_vendor(mac),
                'first_seen': current_time,
                'last_seen': current_time,
                'status': 'active'
            }

    def get_vendor(self, mac):
        """Get vendor information from MAC address"""
        # Simplified vendor lookup
        oui_map = {
            '00:50:56': 'VMware',
            '08:00:27': 'VirtualBox',
            '52:54:00': 'QEMU',
            '00:0c:29': 'VMware',
            '00:1b:21': 'Intel',
            '00:1e:68': 'Cisco',
            '00:23:69': 'Cisco',
            '00:d0:c9': 'Intel',
            'b8:27:eb': 'Raspberry Pi',
            'dc:a6:32': 'Raspberry Pi'
        }

        oui = mac[:8].upper()
        return oui_map.get(oui, 'Unknown')

    def generate_statistics(self):
        """Generate baseline statistics"""
        devices = self.baseline_data['devices']

        self.baseline_data['statistics'] = {
            'total_devices': len(devices),
            'active_devices': len([d for d in devices.values() if d['status'] == 'active']),
            'vendors': {},
            'mac_changes': 0
        }

        # Count vendors
        vendor_count = defaultdict(int)
        for device in devices.values():
            vendor_count[device['vendor']] += 1
            if 'mac_history' in device:
                self.baseline_data['statistics']['mac_changes'] += len(device['mac_history'])

        self.baseline_data['statistics']['vendors'] = dict(vendor_count)

    def save_baseline(self, filename):
        """Save baseline to file"""
        self.generate_statistics()

        with open(filename, 'w') as f:
            json.dump(self.baseline_data, f, indent=2)

        print(f"Baseline saved to: {filename}")

    def create_baseline(self, output_file):
        """Create complete network baseline"""
        print("Creating network baseline...")
        print("=" * 40)

        # Initial network scan
        self.scan_network()

        # Monitor ARP activity
        if self.duration_hours > 0:
            self.monitor_arp_activity()

        # Save baseline
        self.save_baseline(output_file)

        # Display summary
        self.display_summary()

    def display_summary(self):
        """Display baseline summary"""
        stats = self.baseline_data['statistics']

        print("\nBaseline Summary:")
        print("=" * 20)
        print(f"Total devices discovered: {stats['total_devices']}")
        print(f"Active devices: {stats['active_devices']}")
        print(f"MAC address changes detected: {stats['mac_changes']}")

        print("\nVendor distribution:")
        for vendor, count in stats['vendors'].items():
            print(f"  {vendor}: {count} devices")

        print(f"\nBaseline creation completed for interface: {self.baseline_data['interface']}")
        print(f"Monitoring duration: {self.baseline_data['duration_hours']} hours")

def main():
    parser = argparse.ArgumentParser(description='Create network baseline for arpwatch')
    parser.add_argument('--interface', '-i', default='eth0', help='Network interface to monitor')
    parser.add_argument('--duration', '-d', type=int, default=1, help='Monitoring duration in hours')
    parser.add_argument('--output', '-o', default='network_baseline.json', help='Output file for baseline')
    parser.add_argument('--scan-only', action='store_true', help='Only perform initial scan, no monitoring')

    args = parser.parse_args()

    if args.scan_only:
        duration = 0
    else:
        duration = args.duration

    baseline = NetworkBaseline(args.interface, duration)
    baseline.create_baseline(args.output)

if __name__ == "__main__":
    main()

Security Applications

ARP Spoofing Detection

#!/bin/bash
# arp_spoofing_detector.sh

INTERFACE="eth0"
THRESHOLD_SECONDS=60  # Alert if MAC changes within 60 seconds
LOG_FILE="/var/log/arp_spoofing.log"
ALERT_EMAIL="security@example.com"

# Temporary files for tracking
TEMP_DIR="/tmp/arp_detector"
mkdir -p "$TEMP_DIR"

# Function to log events
log_event() {
    local level="$1"
    local message="$2"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

    echo "[$timestamp] [$level] $message" | tee -a "$LOG_FILE"

    if [ "$level" = "ALERT" ]; then
        echo "$message" | mail -s "ARP Spoofing Alert" "$ALERT_EMAIL"
        logger -t arp_spoofing_detector "ALERT: $message"
    fi
}

# Function to check for rapid MAC changes
check_rapid_changes() {
    local ip="$1"
    local mac="$2"
    local current_time=$(date +%s)

    local tracking_file="$TEMP_DIR/ip_$ip"

    if [ -f "$tracking_file" ]; then
        local last_entry=$(tail -1 "$tracking_file")
        local last_time=$(echo "$last_entry" | cut -d: -f1)
        local last_mac=$(echo "$last_entry" | cut -d: -f2)

        local time_diff=$((current_time - last_time))

        if [ "$time_diff" -lt "$THRESHOLD_SECONDS" ] && [ "$mac" != "$last_mac" ]; then
            log_event "ALERT" "Rapid MAC change detected for IP $ip: $last_mac -> $mac (${time_diff}s)"

            # Check if this looks like ARP spoofing
            if [ "$time_diff" -lt 10 ]; then
                log_event "ALERT" "POSSIBLE ARP SPOOFING: Very rapid MAC change for IP $ip"

                # Additional checks
                check_duplicate_mac "$mac" "$ip"
            fi
        fi
    fi

    # Update tracking file
    echo "$current_time:$mac" >> "$tracking_file"

    # Keep only last 10 entries
    tail -10 "$tracking_file" > "$tracking_file.tmp"
    mv "$tracking_file.tmp" "$tracking_file"
}

# Function to check for duplicate MAC addresses
check_duplicate_mac() {
    local mac="$1"
    local current_ip="$2"

    # Check if this MAC is associated with other IPs
    for tracking_file in "$TEMP_DIR"/ip_*; do
        if [ -f "$tracking_file" ]; then
            local file_ip=$(basename "$tracking_file" | sed 's/ip_//')

            if [ "$file_ip" != "$current_ip" ]; then
                local last_entry=$(tail -1 "$tracking_file")
                local last_mac=$(echo "$last_entry" | cut -d: -f2)

                if [ "$last_mac" = "$mac" ]; then
                    log_event "ALERT" "MAC address conflict: $mac is associated with both $current_ip and $file_ip"
                fi
            fi
        fi
    done
}

# Function to analyze ARP table
analyze_arp_table() {
    log_event "INFO" "Analyzing current ARP table"

    # Get current ARP table
    arp -a | while read line; do
        if echo "$line" | grep -q "at"; then
            local hostname=$(echo "$line" | cut -d' ' -f1)
            local ip=$(echo "$line" | sed 's/.*(\([^)]*\)).*/\1/')
            local mac=$(echo "$line" | awk '{print $4}')

            if [ -n "$ip" ] && [ -n "$mac" ]; then
                check_rapid_changes "$ip" "$mac"
            fi
        fi
    done
}

# Function to monitor ARP traffic
monitor_arp_traffic() {
    log_event "INFO" "Starting ARP traffic monitoring on interface $INTERFACE"

    # Use tcpdump to monitor ARP traffic
    tcpdump -i "$INTERFACE" -n arp | while read line; do
        if echo "$line" | grep -q "Reply.*is-at"; then
            local ip=$(echo "$line" | sed -n 's/.*Reply \([0-9.]*\) is-at.*/\1/p')
            local mac=$(echo "$line" | sed -n 's/.*is-at \([0-9a-f:]*\).*/\1/p')

            if [ -n "$ip" ] && [ -n "$mac" ]; then
                log_event "DEBUG" "ARP Reply: $ip -> $mac"
                check_rapid_changes "$ip" "$mac"
            fi
        fi
    done
}

# Cleanup function
cleanup() {
    log_event "INFO" "Shutting down ARP spoofing detector"
    rm -rf "$TEMP_DIR"
    exit 0
}

# Set up signal handlers
trap cleanup SIGINT SIGTERM

log_event "INFO" "Starting ARP spoofing detector"
log_event "INFO" "Interface: $INTERFACE"
log_event "INFO" "Threshold: $THRESHOLD_SECONDS seconds"
log_event "INFO" "Log file: $LOG_FILE"

# Initial ARP table analysis
analyze_arp_table

# Start monitoring
monitor_arp_traffic

Network Security Dashboard

#!/usr/bin/env python3
# arpwatch_dashboard.py

import json
import time
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify
import sqlite3
import threading

app = Flask(__name__)

class ArpwatchDashboard:
    def __init__(self, db_file='arpwatch_dashboard.db'):
        self.db_file = db_file
        self.init_database()
        self.running = True

    def init_database(self):
        """Initialize SQLite database for dashboard data"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()

        # Create tables
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS devices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                ip TEXT NOT NULL,
                mac TEXT NOT NULL,
                vendor TEXT,
                first_seen TIMESTAMP,
                last_seen TIMESTAMP,
                status TEXT DEFAULT 'active'
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TIMESTAMP,
                event_type TEXT,
                ip TEXT,
                mac TEXT,
                old_mac TEXT,
                description TEXT
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TIMESTAMP,
                alert_type TEXT,
                severity TEXT,
                ip TEXT,
                mac TEXT,
                description TEXT,
                acknowledged BOOLEAN DEFAULT FALSE
            )
        ''')

        conn.commit()
        conn.close()

    def monitor_arpwatch_log(self, log_file='/var/log/arpwatch.log'):
        """Monitor arpwatch log file for new entries"""
        try:
            # Follow log file
            process = subprocess.Popen(['tail', '-F', log_file], 
                                     stdout=subprocess.PIPE, 
                                     stderr=subprocess.PIPE, 
                                     text=True)

            while self.running:
                line = process.stdout.readline()
                if line:
                    self.process_log_entry(line.strip())

                if process.poll() is not None:
                    break

            process.terminate()

        except Exception as e:
            print(f"Error monitoring log: {e}")

    def process_log_entry(self, log_entry):
        """Process arpwatch log entry and update database"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()

        timestamp = datetime.now()

        try:
            if 'new station' in log_entry:
                # Extract IP and MAC
                import re
                ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
                mac_match = re.search(r'([0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2})', log_entry)

                if ip_match and mac_match:
                    ip = ip_match.group(1)
                    mac = mac_match.group(1)
                    vendor = self.get_vendor(mac)

                    # Insert or update device
                    cursor.execute('''
                        INSERT OR REPLACE INTO devices 
                        (ip, mac, vendor, first_seen, last_seen, status)
                        VALUES (?, ?, ?, ?, ?, 'active')
                    ''', (ip, mac, vendor, timestamp, timestamp))

                    # Add event
                    cursor.execute('''
                        INSERT INTO events 
                        (timestamp, event_type, ip, mac, description)
                        VALUES (?, 'new_station', ?, ?, ?)
                    ''', (timestamp, ip, mac, f'New station: {ip} -> {mac}'))

                    # Add alert for new devices
                    cursor.execute('''
                        INSERT INTO alerts 
                        (timestamp, alert_type, severity, ip, mac, description)
                        VALUES (?, 'new_device', 'info', ?, ?, ?)
                    ''', (timestamp, ip, mac, f'New device detected: {ip} ({vendor})'))

            elif 'changed ethernet address' in log_entry:
                # MAC address change
                import re
                ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
                old_mac_match = re.search(r'from ([0-9a-f:]{17})', log_entry)
                new_mac_match = re.search(r'to ([0-9a-f:]{17})', log_entry)

                if ip_match and old_mac_match and new_mac_match:
                    ip = ip_match.group(1)
                    old_mac = old_mac_match.group(1)
                    new_mac = new_mac_match.group(1)
                    vendor = self.get_vendor(new_mac)

                    # Update device
                    cursor.execute('''
                        UPDATE devices 
                        SET mac = ?, vendor = ?, last_seen = ?
                        WHERE ip = ?
                    ''', (new_mac, vendor, timestamp, ip))

                    # Add event
                    cursor.execute('''
                        INSERT INTO events 
                        (timestamp, event_type, ip, mac, old_mac, description)
                        VALUES (?, 'mac_change', ?, ?, ?, ?)
                    ''', (timestamp, ip, new_mac, old_mac, f'MAC changed: {ip} from {old_mac} to {new_mac}'))

                    # Add high-priority alert
                    cursor.execute('''
                        INSERT INTO alerts 
                        (timestamp, alert_type, severity, ip, mac, description)
                        VALUES (?, 'mac_change', 'warning', ?, ?, ?)
                    ''', (timestamp, ip, new_mac, f'MAC address changed for {ip}'))

            elif 'flip flop' in log_entry:
                # Potential ARP spoofing
                import re
                ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)

                if ip_match:
                    ip = ip_match.group(1)

                    # Add critical alert
                    cursor.execute('''
                        INSERT INTO alerts 
                        (timestamp, alert_type, severity, ip, description)
                        VALUES (?, 'flip_flop', 'critical', ?, ?)
                    ''', (timestamp, ip, f'Flip flop detected for {ip} - potential ARP spoofing'))

            conn.commit()

        except Exception as e:
            print(f"Error processing log entry: {e}")
        finally:
            conn.close()

    def get_vendor(self, mac):
        """Get vendor from MAC address"""
        oui_map = {
            '00:50:56': 'VMware',
            '08:00:27': 'VirtualBox',
            '52:54:00': 'QEMU',
            '00:0c:29': 'VMware',
            '00:1b:21': 'Intel',
            '00:1e:68': 'Cisco',
            '00:23:69': 'Cisco',
            '00:d0:c9': 'Intel',
            'b8:27:eb': 'Raspberry Pi',
            'dc:a6:32': 'Raspberry Pi'
        }

        oui = mac[:8].upper()
        return oui_map.get(oui, 'Unknown')

# Global dashboard instance
dashboard = ArpwatchDashboard()

@app.route('/')
def index():
    """Main dashboard page"""
    return render_template('dashboard.html')

@app.route('/api/devices')
def api_devices():
    """API endpoint for device list"""
    conn = sqlite3.connect(dashboard.db_file)
    cursor = conn.cursor()

    cursor.execute('''
        SELECT ip, mac, vendor, first_seen, last_seen, status
        FROM devices
        ORDER BY last_seen DESC
    ''')

    devices = []
    for row in cursor.fetchall():
        devices.append({
            'ip': row[0],
            'mac': row[1],
            'vendor': row[2],
            'first_seen': row[3],
            'last_seen': row[4],
            'status': row[5]
        })

    conn.close()
    return jsonify(devices)

@app.route('/api/alerts')
def api_alerts():
    """API endpoint for alerts"""
    conn = sqlite3.connect(dashboard.db_file)
    cursor = conn.cursor()

    cursor.execute('''
        SELECT timestamp, alert_type, severity, ip, mac, description, acknowledged
        FROM alerts
        ORDER BY timestamp DESC
        LIMIT 50
    ''')

    alerts = []
    for row in cursor.fetchall():
        alerts.append({
            'timestamp': row[0],
            'alert_type': row[1],
            'severity': row[2],
            'ip': row[3],
            'mac': row[4],
            'description': row[5],
            'acknowledged': row[6]
        })

    conn.close()
    return jsonify(alerts)

@app.route('/api/statistics')
def api_statistics():
    """API endpoint for statistics"""
    conn = sqlite3.connect(dashboard.db_file)
    cursor = conn.cursor()

    # Get device count
    cursor.execute('SELECT COUNT(*) FROM devices WHERE status = "active"')
    active_devices = cursor.fetchone()[0]

    # Get alert counts
    cursor.execute('SELECT COUNT(*) FROM alerts WHERE acknowledged = FALSE')
    unacknowledged_alerts = cursor.fetchone()[0]

    # Get recent events count
    yesterday = datetime.now() - timedelta(days=1)
    cursor.execute('SELECT COUNT(*) FROM events WHERE timestamp > ?', (yesterday,))
    recent_events = cursor.fetchone()[0]

    # Get vendor distribution
    cursor.execute('''
        SELECT vendor, COUNT(*) as count
        FROM devices
        WHERE status = "active"
        GROUP BY vendor
        ORDER BY count DESC
    ''')

    vendor_distribution = []
    for row in cursor.fetchall():
        vendor_distribution.append({
            'vendor': row[0],
            'count': row[1]
        })

    conn.close()

    return jsonify({
        'active_devices': active_devices,
        'unacknowledged_alerts': unacknowledged_alerts,
        'recent_events': recent_events,
        'vendor_distribution': vendor_distribution
    })

def start_monitoring():
    """Start monitoring in background thread"""
    monitor_thread = threading.Thread(target=dashboard.monitor_arpwatch_log)
    monitor_thread.daemon = True
    monitor_thread.start()

if __name__ == '__main__':
    # Start monitoring
    start_monitoring()

    # Start web server
    app.run(host='0.0.0.0', port=5000, debug=False)

This comprehensive Arpwatch cheatsheet provides everything needed for professional network monitoring, security analysis, and intrusion detection, from basic ARP monitoring to advanced security dashboard implementation and automated threat detection.