Arpwatch Cheatsheet
Überblick
Arpwatch ist ein Netzwerk-Monitoring-Tool, das den Überblick über Ethernet/IP-Adressenpaarungen auf einem Netzwerk hält. Es überwacht die ARP-Aktivität (Address Resolution Protocol) und hält eine Datenbank mit Ethernet/IP-Adressenpaarungen aufrecht. Wenn es Änderungen in der Netzwerktopologie, wie neue Stationen, geänderte Adressen oder potenzielle ARP-Spoofing-Angriffe erkennt, kann es E-Mail-Benachrichtigungen senden und die Ereignisse protokollieren.
Schlüsselmerkmale
- ARP Monitoring: Kontinuierliche Überwachung des ARP-Verkehrs und der Adress-Mappings
- ** Ändern der Erkennung**: Automatische Erkennung neuer Stationen und Adressänderungen
- *Security Alerting: E-Mail-Benachrichtigungen für verdächtige ARP-Aktivitäten
- Datenbankwartung: Dauerhafte Speicherung von Ethernet/IP-Adressenpaarungen
- *Network Mapping: Automatische Entdeckung und Verfolgung von Netzwerkgeräten
- *Intrusion Detection: Erkennung potenzieller ARP-Spoofing- und MAC-Adressenkonflikte
- *Historisches Tracking: Langzeitüberwachung von Netzwerktopologieänderungen
- Multiinterface Support: Mehrere Netzwerkschnittstellen gleichzeitig überwachen
Installation
Linux Systeme
```bash
Ubuntu/Debian
sudo apt update sudo apt install arpwatch
CentOS/RHEL/Fedora
sudo yum install arpwatch
or
sudo dnf install arpwatch
Arch Linux
sudo pacman -S arpwatch
openSUSE
sudo zypper install arpwatch
From source
wget https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz tar -xzf arpwatch-2.1a15.tar.gz cd arpwatch-2.1a15 ./configure make sudo make install
Verify installation
arpwatch -h ```_
FreeBSD/OpenBSD
```bash
FreeBSD
sudo pkg install arpwatch
OpenBSD
sudo pkg_add arpwatch
From ports (FreeBSD)
cd /usr/ports/net-mgmt/arpwatch sudo make install clean
Verify installation
which arpwatch ```_
macOS
```bash
Using Homebrew
brew install arpwatch
Using MacPorts
sudo port install arpwatch
Manual installation
curl -O https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz tar -xzf arpwatch-2.1a15.tar.gz cd arpwatch-2.1a15 ./configure --prefix=/usr/local make sudo make install
Verify installation
arpwatch -h ```_
Basisnutzung
Starten von Arpwatch
```bash
Basic startup (requires root privileges)
sudo arpwatch
Specify interface
sudo arpwatch -i eth0
Run in foreground (don't daemonize)
sudo arpwatch -d
Specify data file location
sudo arpwatch -f /var/lib/arpwatch/arp.dat
Enable email notifications
sudo arpwatch -m admin@example.com
Specify network to monitor
sudo arpwatch -n 192.168.1.0/24
Combine options
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -m admin@example.com ```_
Konfigurationsdateien
```bash
Main configuration file locations
/etc/arpwatch.conf # Main configuration /etc/default/arpwatch # Ubuntu/Debian defaults /etc/sysconfig/arpwatch # RHEL/CentOS defaults
Data file locations
/var/lib/arpwatch/arp.dat # Default database file /var/lib/arpwatch/ethercodes.dat # Ethernet vendor codes
Log file locations
/var/log/arpwatch.log # Main log file /var/log/messages # System log (may contain arpwatch entries) ```_
Grundkonfiguration
```bash
Edit configuration file (Ubuntu/Debian)
sudo nano /etc/default/arpwatch
Example configuration
INTERFACES="eth0 eth1" ARGS="-m admin@example.com -s root" OPTIONS="-N"
Edit configuration file (RHEL/CentOS)
sudo nano /etc/sysconfig/arpwatch
Example configuration
OPTIONS="-i eth0 -m admin@example.com -f /var/lib/arpwatch/arp.dat" ```_
Erweiterte Konfiguration
Multi-Interface Monitoring
```bash
Create separate instances for multiple interfaces
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -P /var/run/arpwatch_eth0.pid & sudo arpwatch -i eth1 -f /var/lib/arpwatch/eth1.dat -P /var/run/arpwatch_eth1.pid & sudo arpwatch -i wlan0 -f /var/lib/arpwatch/wlan0.dat -P /var/run/arpwatch_wlan0.pid &
Systemd service for multiple interfaces (create separate service files)
sudo cp /lib/systemd/system/arpwatch.service /lib/systemd/system/arpwatch@.service
Edit the template service file
sudo nano /lib/systemd/system/arpwatch@.service
Example template content:
[Unit] Description=Arpwatch daemon for %i After=network.target
[Service] Type=forking ExecStart=/usr/sbin/arpwatch -i %i -f /var/lib/arpwatch/%i.dat -P /var/run/arpwatch_%i.pid PIDFile=/var/run/arpwatch_%i.pid User=arpwatch Group=arpwatch
[Install] WantedBy=multi-user.target
Enable services for specific interfaces
sudo systemctl enable arpwatch@eth0.service sudo systemctl enable arpwatch@eth1.service sudo systemctl start arpwatch@eth0.service sudo systemctl start arpwatch@eth1.service ```_
E-Mail-Benachrichtigung Setup
```bash
Configure email notifications
sudo nano /etc/arpwatch.conf
Example email configuration
MAILTO="admin@example.com,security@example.com" MAILSERVER="localhost" MAILFROM="arpwatch@example.com"
Test email functionality
echo "Test message" | mail -s "Arpwatch Test" admin@example.com
Configure postfix for email delivery (if needed)
sudo apt install postfix sudo systemctl enable postfix sudo systemctl start postfix
Configure email templates
sudo mkdir -p /etc/arpwatch/templates sudo nano /etc/arpwatch/templates/new_station.txt
Example template:
Subject: New station detected From: arpwatch@example.com
New station detected on network: IP Address: %ip% MAC Address: %mac% Interface: %interface% Timestamp: %timestamp% Vendor: %vendor% ```_
Benutzerdefinierte Alert Scripts
```bash
!/bin/bash
/usr/local/bin/arpwatch_alert.sh
Custom alert script for arpwatch
LOG_FILE="/var/log/arpwatch_custom.log" ALERT_EMAIL="security@example.com" WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
Parse arpwatch input
while read line; do TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')
# Log the event
echo "[$TIMESTAMP] $line" >> "$LOG_FILE"
# Parse the alert type and details
if echo "$line" | grep -q "new station"; then
ALERT_TYPE="NEW_STATION"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$line" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
# Send email alert
echo "New station detected: IP=$IP, MAC=$MAC" | \
mail -s "Arpwatch Alert: New Station" "$ALERT_EMAIL"
# Send Slack notification
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🚨 Arpwatch Alert: New station detected\\nIP: $IP\\nMAC: $MAC\\nTime: $TIMESTAMP\"}" \
"$WEBHOOK_URL"
elif echo "$line" | grep -q "changed ethernet address"; then
ALERT_TYPE="MAC_CHANGE"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
# High priority alert for MAC changes
echo "SECURITY ALERT: MAC address change detected for IP $IP" | \
mail -s "URGENT: Arpwatch Security Alert" "$ALERT_EMAIL"
# Send Slack notification
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🔥 URGENT: MAC address change detected\\nIP: $IP\\nTime: $TIMESTAMP\"}" \
"$WEBHOOK_URL"
elif echo "$line" | grep -q "flip flop"; then
ALERT_TYPE="FLIP_FLOP"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
# Potential ARP spoofing alert
echo "SECURITY ALERT: Potential ARP spoofing detected for IP $IP" | \
mail -s "CRITICAL: Potential ARP Spoofing" "$ALERT_EMAIL"
fi
done
Make script executable
sudo chmod +x /usr/local/bin/arpwatch_alert.sh
Configure arpwatch to use custom script
sudo arpwatch -i eth0 -e /usr/local/bin/arpwatch_alert.sh ```_
Datenbankverwaltung
Datenbanken
```bash
View current ARP database
sudo cat /var/lib/arpwatch/arp.dat
Convert database to readable format
sudo awk '{print $1, $2, $3}' /var/lib/arpwatch/arp.dat
Backup database
sudo cp /var/lib/arpwatch/arp.dat /var/lib/arpwatch/arp.dat.backup.$(date +%Y%m%d)
Restore database
sudo cp /var/lib/arpwatch/arp.dat.backup.20231201 /var/lib/arpwatch/arp.dat
Clear database (start fresh)
sudo systemctl stop arpwatch sudo rm /var/lib/arpwatch/arp.dat sudo systemctl start arpwatch
Merge multiple database files
sudo cat /var/lib/arpwatch/eth0.dat /var/lib/arpwatch/eth1.dat | \ sort -u > /var/lib/arpwatch/merged.dat ```_
Datenbankanalyse Scripts
```python
!/usr/bin/env python3
arpwatch_analyzer.py
import sys import time import socket import struct from collections import defaultdict, Counter from datetime import datetime, timedelta import argparse
class ArpwatchAnalyzer: def init(self, database_file): self.database_file = database_file self.entries = [] self.load_database()
def load_database(self):
"""Load and parse arpwatch database"""
try:
with open(self.database_file, 'rb') as f:
while True:
# Read arpwatch database entry (binary format)
data = f.read(16) # Each entry is 16 bytes
if len(data) < 16:
break
# Parse binary data
ip_bytes = data[0:4]
mac_bytes = data[4:10]
timestamp_bytes = data[10:14]
# Convert to readable format
ip = socket.inet_ntoa(ip_bytes)
mac = ':'.join([f'{b:02x}' for b in mac_bytes])
timestamp = struct.unpack('>I', timestamp_bytes)[0]
self.entries.append({
'ip': ip,
'mac': mac,
'timestamp': timestamp,
'datetime': datetime.fromtimestamp(timestamp)
})
except FileNotFoundError:
print(f"Database file not found: {self.database_file}")
sys.exit(1)
except Exception as e:
print(f"Error reading database: {e}")
sys.exit(1)
def get_statistics(self):
"""Generate database statistics"""
if not self.entries:
return "No entries in database"
total_entries = len(self.entries)
unique_ips = len(set(entry['ip'] for entry in self.entries))
unique_macs = len(set(entry['mac'] for entry in self.entries))
# Time range
timestamps = [entry['timestamp'] for entry in self.entries]
oldest = datetime.fromtimestamp(min(timestamps))
newest = datetime.fromtimestamp(max(timestamps))
# Most active IPs
ip_counts = Counter(entry['ip'] for entry in self.entries)
most_active_ips = ip_counts.most_common(5)
# Most active MACs
mac_counts = Counter(entry['mac'] for entry in self.entries)
most_active_macs = mac_counts.most_common(5)
stats = f"""
Arpwatch Database Statistics
Total entries: {total_entries} Unique IP addresses: {unique_ips} Unique MAC addresses: {unique_macs} Date range: {oldest} to {newest}
Most Active IP Addresses: """ for ip, count in most_active_ips: stats += f" {ip}: {count} entries\n"
stats += "\nMost Active MAC Addresses:\n"
for mac, count in most_active_macs:
vendor = self.get_vendor(mac)
stats += f" {mac} ({vendor}): {count} entries\n"
return stats
def find_suspicious_activity(self):
"""Detect potentially suspicious ARP activity"""
suspicious = []
# Group by IP address
ip_groups = defaultdict(list)
for entry in self.entries:
ip_groups[entry['ip']].append(entry)
# Check for IP addresses with multiple MAC addresses
for ip, entries in ip_groups.items():
macs = set(entry['mac'] for entry in entries)
if len(macs) > 1:
suspicious.append({
'type': 'IP_MULTIPLE_MACS',
'ip': ip,
'macs': list(macs),
'count': len(entries),
'description': f"IP {ip} associated with {len(macs)} different MAC addresses"
})
# Group by MAC address
mac_groups = defaultdict(list)
for entry in self.entries:
mac_groups[entry['mac']].append(entry)
# Check for MAC addresses with multiple IP addresses
for mac, entries in mac_groups.items():
ips = set(entry['ip'] for entry in entries)
if len(ips) > 3: # Threshold for suspicious activity
vendor = self.get_vendor(mac)
suspicious.append({
'type': 'MAC_MULTIPLE_IPS',
'mac': mac,
'vendor': vendor,
'ips': list(ips),
'count': len(entries),
'description': f"MAC {mac} ({vendor}) associated with {len(ips)} different IP addresses"
})
# Check for rapid changes (potential ARP spoofing)
for ip, entries in ip_groups.items():
if len(entries) > 1:
sorted_entries = sorted(entries, key=lambda x: x['timestamp'])
for i in range(1, len(sorted_entries)):
time_diff = sorted_entries[i]['timestamp'] - sorted_entries[i-1]['timestamp']
if time_diff < 300 and sorted_entries[i]['mac'] != sorted_entries[i-1]['mac']: # 5 minutes
suspicious.append({
'type': 'RAPID_MAC_CHANGE',
'ip': ip,
'old_mac': sorted_entries[i-1]['mac'],
'new_mac': sorted_entries[i]['mac'],
'time_diff': time_diff,
'description': f"Rapid MAC change for IP {ip} ({time_diff}s between changes)"
})
return suspicious
def get_vendor(self, mac):
"""Get vendor information from MAC address"""
# Simple vendor lookup (in practice, you'd use a full OUI database)
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
def generate_report(self, output_file=None):
"""Generate comprehensive analysis report"""
report = f"""
Arpwatch Database Analysis Report
Generated: {datetime.now()} Database: {self.database_file}
{self.get_statistics()}
Suspicious Activity Analysis
"""
suspicious = self.find_suspicious_activity()
if suspicious:
for item in suspicious:
report += f"\n⚠️ {item['type']}: {item['description']}\n"
if 'macs' in item:
report += f" MAC addresses: {', '.join(item['macs'])}\n"
if 'ips' in item:
report += f" IP addresses: {', '.join(item['ips'])}\n"
if 'time_diff' in item:
report += f" Time difference: {item['time_diff']} seconds\n"
else:
report += "\nNo suspicious activity detected.\n"
# Recent activity (last 24 hours)
recent_threshold = time.time() - 86400 # 24 hours
recent_entries = [e for e in self.entries if e['timestamp'] > recent_threshold]
report += f"\nRecent Activity (Last 24 Hours)\n"
report += f"===============================\n"
report += f"Recent entries: {len(recent_entries)}\n"
if recent_entries:
report += "\nRecent IP/MAC pairs:\n"
for entry in sorted(recent_entries, key=lambda x: x['timestamp'], reverse=True)[:10]:
vendor = self.get_vendor(entry['mac'])
report += f" {entry['datetime']}: {entry['ip']} -> {entry['mac']} ({vendor})\n"
if output_file:
with open(output_file, 'w') as f:
f.write(report)
print(f"Report saved to: {output_file}")
else:
print(report)
def main(): parser = argparse.ArgumentParser(description='Arpwatch Database Analyzer') parser.add_argument('database', help='Path to arpwatch database file') parser.add_argument('--output', '-o', help='Output file for report') parser.add_argument('--stats-only', action='store_true', help='Show only statistics')
args = parser.parse_args()
analyzer = ArpwatchAnalyzer(args.database)
if args.stats_only:
print(analyzer.get_statistics())
else:
analyzer.generate_report(args.output)
if name == "main": main() ```_
Überwachung und Alarmierung
Echtzeit Monitoring Script
```bash
!/bin/bash
arpwatch_monitor.sh
ARPWATCH_LOG="/var/log/arpwatch.log" ALERT_EMAIL="admin@example.com" WHITELIST_FILE="/etc/arpwatch/whitelist.txt" ALERT_LOG="/var/log/arpwatch_alerts.log"
Create whitelist if it doesn't exist
if [ ! -f "$WHITELIST_FILE" ]; then sudo touch "$WHITELIST_FILE" echo "# Arpwatch whitelist - one MAC address per line" | sudo tee "$WHITELIST_FILE" fi
Function to check if MAC is whitelisted
is_whitelisted() { local mac="$1" grep -q "^$mac$" "$WHITELIST_FILE" }
Function to send alert
send_alert() { local alert_type="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
# Log alert
echo "[$timestamp] $alert_type: $message" >> "$ALERT_LOG"
# Send email
echo "$message" | mail -s "Arpwatch Alert: $alert_type" "$ALERT_EMAIL"
# Send to syslog
logger -t arpwatch_monitor "$alert_type: $message"
echo "Alert sent: $alert_type"
}
Function to analyze log entry
analyze_entry() { local entry="$1"
if echo "$entry" | grep -q "new station"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
if ! is_whitelisted "$MAC"; then
send_alert "NEW_STATION" "New station detected: IP=$IP, MAC=$MAC"
fi
elif echo "$entry" | grep -q "changed ethernet address"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
OLD_MAC=$(echo "$entry" | sed -n 's/.*from \([0-9a-f:]\{17\}\).*/\1/p')
NEW_MAC=$(echo "$entry" | sed -n 's/.*to \([0-9a-f:]\{17\}\).*/\1/p')
send_alert "MAC_CHANGE" "MAC address changed: IP=$IP, Old MAC=$OLD_MAC, New MAC=$NEW_MAC"
elif echo "$entry" | grep -q "flip flop"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC1=$(echo "$entry" | sed -n 's/.*between \([0-9a-f:]\{17\}\).*/\1/p')
MAC2=$(echo "$entry" | sed -n 's/.*and \([0-9a-f:]\{17\}\).*/\1/p')
send_alert "FLIP_FLOP" "Flip flop detected (potential ARP spoofing): IP=$IP, MAC1=$MAC1, MAC2=$MAC2"
elif echo "$entry" | grep -q "reused old ethernet address"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
send_alert "REUSED_ADDRESS" "Reused ethernet address: IP=$IP, MAC=$MAC"
fi
}
echo "Starting Arpwatch monitor..." echo "Monitoring log: $ARPWATCH_LOG" echo "Alert email: $ALERT_EMAIL" echo "Whitelist file: $WHITELIST_FILE" echo ""
Monitor arpwatch log file
tail -F "$ARPWATCH_LOG" | while read line; do # Skip empty lines if [ -n "$line" ]; then analyze_entry "$line" fi done ```_
Netzwerk Baseline Creation
```python
!/usr/bin/env python3
create_network_baseline.py
import subprocess import json import time from datetime import datetime, timedelta from collections import defaultdict import argparse
class NetworkBaseline: def init(self, interface='eth0', duration_hours=24): self.interface = interface self.duration_hours = duration_hours self.baseline_data = { 'created': datetime.now().isoformat(), 'interface': interface, 'duration_hours': duration_hours, 'devices': {}, 'statistics': {} }
def scan_network(self):
"""Perform network scan to discover devices"""
print(f"Scanning network on interface {self.interface}...")
# Get network range
try:
# Get IP and netmask of interface
result = subprocess.run(['ip', 'addr', 'show', self.interface],
capture_output=True, text=True)
# Parse network range (simplified)
for line in result.stdout.split('\n'):
if 'inet ' in line and not '127.0.0.1' in line:
ip_info = line.strip().split()[1]
network = ip_info.split('/')[0]
# Assume /24 network for simplicity
network_base = '.'.join(network.split('.')[:-1])
network_range = f"{network_base}.0/24"
break
else:
print(f"Could not determine network range for {self.interface}")
return
print(f"Detected network range: {network_range}")
# Use nmap to scan network
nmap_cmd = ['nmap', '-sn', network_range]
result = subprocess.run(nmap_cmd, capture_output=True, text=True)
# Parse nmap output
current_ip = None
for line in result.stdout.split('\n'):
if 'Nmap scan report for' in line:
current_ip = line.split()[-1].strip('()')
elif 'MAC Address:' in line and current_ip:
mac_info = line.split('MAC Address: ')[1]
mac = mac_info.split()[0]
vendor = mac_info.split('(')[1].rstrip(')') if '(' in mac_info else 'Unknown'
self.baseline_data['devices'][current_ip] = {
'mac': mac,
'vendor': vendor,
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
'status': 'active'
}
print(f"Found device: {current_ip} -> {mac} ({vendor})")
current_ip = None
except Exception as e:
print(f"Error scanning network: {e}")
def monitor_arp_activity(self):
"""Monitor ARP activity for specified duration"""
print(f"Monitoring ARP activity for {self.duration_hours} hours...")
end_time = time.time() + (self.duration_hours * 3600)
# Start tcpdump to capture ARP packets
tcpdump_cmd = ['tcpdump', '-i', self.interface, '-n', 'arp']
try:
process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
while time.time() < end_time:
line = process.stdout.readline()
if line:
self.parse_arp_packet(line.strip())
# Check if process is still running
if process.poll() is not None:
break
process.terminate()
except KeyboardInterrupt:
print("\nMonitoring interrupted by user")
if 'process' in locals():
process.terminate()
except Exception as e:
print(f"Error monitoring ARP activity: {e}")
def parse_arp_packet(self, packet_line):
"""Parse ARP packet from tcpdump output"""
try:
# Example: "12:34:56.789012 ARP, Request who-has 192.168.1.1 tell 192.168.1.100, length 28"
# Example: "12:34:56.789012 ARP, Reply 192.168.1.1 is-at aa:bb:cc:dd:ee:ff, length 28"
if 'Reply' in packet_line and 'is-at' in packet_line:
parts = packet_line.split()
ip = None
mac = None
for i, part in enumerate(parts):
if part == 'Reply':
ip = parts[i+1]
elif part == 'is-at':
mac = parts[i+1].rstrip(',')
if ip and mac:
self.update_device_info(ip, mac)
except Exception as e:
print(f"Error parsing ARP packet: {e}")
def update_device_info(self, ip, mac):
"""Update device information in baseline"""
current_time = datetime.now().isoformat()
if ip in self.baseline_data['devices']:
# Update existing device
device = self.baseline_data['devices'][ip]
device['last_seen'] = current_time
# Check for MAC address change
if device['mac'] != mac:
print(f"MAC change detected: {ip} changed from {device['mac']} to {mac}")
device['mac_history'] = device.get('mac_history', [])
device['mac_history'].append({
'old_mac': device['mac'],
'new_mac': mac,
'timestamp': current_time
})
device['mac'] = mac
else:
# New device discovered
print(f"New device discovered: {ip} -> {mac}")
self.baseline_data['devices'][ip] = {
'mac': mac,
'vendor': self.get_vendor(mac),
'first_seen': current_time,
'last_seen': current_time,
'status': 'active'
}
def get_vendor(self, mac):
"""Get vendor information from MAC address"""
# Simplified vendor lookup
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
def generate_statistics(self):
"""Generate baseline statistics"""
devices = self.baseline_data['devices']
self.baseline_data['statistics'] = {
'total_devices': len(devices),
'active_devices': len([d for d in devices.values() if d['status'] == 'active']),
'vendors': {},
'mac_changes': 0
}
# Count vendors
vendor_count = defaultdict(int)
for device in devices.values():
vendor_count[device['vendor']] += 1
if 'mac_history' in device:
self.baseline_data['statistics']['mac_changes'] += len(device['mac_history'])
self.baseline_data['statistics']['vendors'] = dict(vendor_count)
def save_baseline(self, filename):
"""Save baseline to file"""
self.generate_statistics()
with open(filename, 'w') as f:
json.dump(self.baseline_data, f, indent=2)
print(f"Baseline saved to: {filename}")
def create_baseline(self, output_file):
"""Create complete network baseline"""
print("Creating network baseline...")
print("=" * 40)
# Initial network scan
self.scan_network()
# Monitor ARP activity
if self.duration_hours > 0:
self.monitor_arp_activity()
# Save baseline
self.save_baseline(output_file)
# Display summary
self.display_summary()
def display_summary(self):
"""Display baseline summary"""
stats = self.baseline_data['statistics']
print("\nBaseline Summary:")
print("=" * 20)
print(f"Total devices discovered: {stats['total_devices']}")
print(f"Active devices: {stats['active_devices']}")
print(f"MAC address changes detected: {stats['mac_changes']}")
print("\nVendor distribution:")
for vendor, count in stats['vendors'].items():
print(f" {vendor}: {count} devices")
print(f"\nBaseline creation completed for interface: {self.baseline_data['interface']}")
print(f"Monitoring duration: {self.baseline_data['duration_hours']} hours")
def main(): parser = argparse.ArgumentParser(description='Create network baseline for arpwatch') parser.add_argument('--interface', '-i', default='eth0', help='Network interface to monitor') parser.add_argument('--duration', '-d', type=int, default=1, help='Monitoring duration in hours') parser.add_argument('--output', '-o', default='network_baseline.json', help='Output file for baseline') parser.add_argument('--scan-only', action='store_true', help='Only perform initial scan, no monitoring')
args = parser.parse_args()
if args.scan_only:
duration = 0
else:
duration = args.duration
baseline = NetworkBaseline(args.interface, duration)
baseline.create_baseline(args.output)
if name == "main": main() ```_
Sicherheitsanwendungen
ARP Spoofing Detection
```bash
!/bin/bash
arp_spoofing_detector.sh
INTERFACE="eth0" THRESHOLD_SECONDS=60 # Alert if MAC changes within 60 seconds LOG_FILE="/var/log/arp_spoofing.log" ALERT_EMAIL="security@example.com"
Temporary files for tracking
TEMP_DIR="/tmp/arp_detector" mkdir -p "$TEMP_DIR"
Function to log events
log_event() { local level="$1" local message="$2" local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
echo "[$timestamp] [$level] $message" | tee -a "$LOG_FILE"
if [ "$level" = "ALERT" ]; then
echo "$message" | mail -s "ARP Spoofing Alert" "$ALERT_EMAIL"
logger -t arp_spoofing_detector "ALERT: $message"
fi
}
Function to check for rapid MAC changes
check_rapid_changes() { local ip="$1" local mac="$2" local current_time=$(date +%s)
local tracking_file="$TEMP_DIR/ip_$ip"
if [ -f "$tracking_file" ]; then
local last_entry=$(tail -1 "$tracking_file")
local last_time=$(echo "$last_entry" | cut -d: -f1)
local last_mac=$(echo "$last_entry" | cut -d: -f2)
local time_diff=$((current_time - last_time))
if [ "$time_diff" -lt "$THRESHOLD_SECONDS" ] && [ "$mac" != "$last_mac" ]; then
log_event "ALERT" "Rapid MAC change detected for IP $ip: $last_mac -> $mac (${time_diff}s)"
# Check if this looks like ARP spoofing
if [ "$time_diff" -lt 10 ]; then
log_event "ALERT" "POSSIBLE ARP SPOOFING: Very rapid MAC change for IP $ip"
# Additional checks
check_duplicate_mac "$mac" "$ip"
fi
fi
fi
# Update tracking file
echo "$current_time:$mac" >> "$tracking_file"
# Keep only last 10 entries
tail -10 "$tracking_file" > "$tracking_file.tmp"
mv "$tracking_file.tmp" "$tracking_file"
}
Function to check for duplicate MAC addresses
check_duplicate_mac() { local mac="$1" local current_ip="$2"
# Check if this MAC is associated with other IPs
for tracking_file in "$TEMP_DIR"/ip_*; do
if [ -f "$tracking_file" ]; then
local file_ip=$(basename "$tracking_file" | sed 's/ip_//')
if [ "$file_ip" != "$current_ip" ]; then
local last_entry=$(tail -1 "$tracking_file")
local last_mac=$(echo "$last_entry" | cut -d: -f2)
if [ "$last_mac" = "$mac" ]; then
log_event "ALERT" "MAC address conflict: $mac is associated with both $current_ip and $file_ip"
fi
fi
fi
done
}
Function to analyze ARP table
analyze_arp_table() { log_event "INFO" "Analyzing current ARP table"
# Get current ARP table
arp -a | while read line; do
if echo "$line" | grep -q "at"; then
local hostname=$(echo "$line" | cut -d' ' -f1)
local ip=$(echo "$line" | sed 's/.*(\([^)]*\)).*/\1/')
local mac=$(echo "$line" | awk '{print $4}')
if [ -n "$ip" ] && [ -n "$mac" ]; then
check_rapid_changes "$ip" "$mac"
fi
fi
done
}
Function to monitor ARP traffic
monitor_arp_traffic() { log_event "INFO" "Starting ARP traffic monitoring on interface $INTERFACE"
# Use tcpdump to monitor ARP traffic
tcpdump -i "$INTERFACE" -n arp | while read line; do
if echo "$line" | grep -q "Reply.*is-at"; then
local ip=$(echo "$line" | sed -n 's/.*Reply \([0-9.]*\) is-at.*/\1/p')
local mac=$(echo "$line" | sed -n 's/.*is-at \([0-9a-f:]*\).*/\1/p')
if [ -n "$ip" ] && [ -n "$mac" ]; then
log_event "DEBUG" "ARP Reply: $ip -> $mac"
check_rapid_changes "$ip" "$mac"
fi
fi
done
}
Cleanup function
cleanup() { log_event "INFO" "Shutting down ARP spoofing detector" rm -rf "$TEMP_DIR" exit 0 }
Set up signal handlers
trap cleanup SIGINT SIGTERM
log_event "INFO" "Starting ARP spoofing detector" log_event "INFO" "Interface: $INTERFACE" log_event "INFO" "Threshold: $THRESHOLD_SECONDS seconds" log_event "INFO" "Log file: $LOG_FILE"
Initial ARP table analysis
analyze_arp_table
Start monitoring
monitor_arp_traffic ```_
Netzwerk Sicherheit Dashboard
```python
!/usr/bin/env python3
arpwatch_dashboard.py
import json import time import subprocess from datetime import datetime, timedelta from flask import Flask, render_template, jsonify import sqlite3 import threading
app = Flask(name)
class ArpwatchDashboard: def init(self, db_file='arpwatch_dashboard.db'): self.db_file = db_file self.init_database() self.running = True
def init_database(self):
"""Initialize SQLite database for dashboard data"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS devices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
mac TEXT NOT NULL,
vendor TEXT,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
status TEXT DEFAULT 'active'
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
event_type TEXT,
ip TEXT,
mac TEXT,
old_mac TEXT,
description TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
alert_type TEXT,
severity TEXT,
ip TEXT,
mac TEXT,
description TEXT,
acknowledged BOOLEAN DEFAULT FALSE
)
''')
conn.commit()
conn.close()
def monitor_arpwatch_log(self, log_file='/var/log/arpwatch.log'):
"""Monitor arpwatch log file for new entries"""
try:
# Follow log file
process = subprocess.Popen(['tail', '-F', log_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
while self.running:
line = process.stdout.readline()
if line:
self.process_log_entry(line.strip())
if process.poll() is not None:
break
process.terminate()
except Exception as e:
print(f"Error monitoring log: {e}")
def process_log_entry(self, log_entry):
"""Process arpwatch log entry and update database"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
timestamp = datetime.now()
try:
if 'new station' in log_entry:
# Extract IP and MAC
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
mac_match = re.search(r'([0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2})', log_entry)
if ip_match and mac_match:
ip = ip_match.group(1)
mac = mac_match.group(1)
vendor = self.get_vendor(mac)
# Insert or update device
cursor.execute('''
INSERT OR REPLACE INTO devices
(ip, mac, vendor, first_seen, last_seen, status)
VALUES (?, ?, ?, ?, ?, 'active')
''', (ip, mac, vendor, timestamp, timestamp))
# Add event
cursor.execute('''
INSERT INTO events
(timestamp, event_type, ip, mac, description)
VALUES (?, 'new_station', ?, ?, ?)
''', (timestamp, ip, mac, f'New station: {ip} -> {mac}'))
# Add alert for new devices
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, mac, description)
VALUES (?, 'new_device', 'info', ?, ?, ?)
''', (timestamp, ip, mac, f'New device detected: {ip} ({vendor})'))
elif 'changed ethernet address' in log_entry:
# MAC address change
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
old_mac_match = re.search(r'from ([0-9a-f:]{17})', log_entry)
new_mac_match = re.search(r'to ([0-9a-f:]{17})', log_entry)
if ip_match and old_mac_match and new_mac_match:
ip = ip_match.group(1)
old_mac = old_mac_match.group(1)
new_mac = new_mac_match.group(1)
vendor = self.get_vendor(new_mac)
# Update device
cursor.execute('''
UPDATE devices
SET mac = ?, vendor = ?, last_seen = ?
WHERE ip = ?
''', (new_mac, vendor, timestamp, ip))
# Add event
cursor.execute('''
INSERT INTO events
(timestamp, event_type, ip, mac, old_mac, description)
VALUES (?, 'mac_change', ?, ?, ?, ?)
''', (timestamp, ip, new_mac, old_mac, f'MAC changed: {ip} from {old_mac} to {new_mac}'))
# Add high-priority alert
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, mac, description)
VALUES (?, 'mac_change', 'warning', ?, ?, ?)
''', (timestamp, ip, new_mac, f'MAC address changed for {ip}'))
elif 'flip flop' in log_entry:
# Potential ARP spoofing
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
if ip_match:
ip = ip_match.group(1)
# Add critical alert
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, description)
VALUES (?, 'flip_flop', 'critical', ?, ?)
''', (timestamp, ip, f'Flip flop detected for {ip} - potential ARP spoofing'))
conn.commit()
except Exception as e:
print(f"Error processing log entry: {e}")
finally:
conn.close()
def get_vendor(self, mac):
"""Get vendor from MAC address"""
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
Global dashboard instance
dashboard = ArpwatchDashboard()
@app.route('/') def index(): """Main dashboard page""" return render_template('dashboard.html')
@app.route('/api/devices') def api_devices(): """API endpoint for device list""" conn = sqlite3.connect(dashboard.db_file) cursor = conn.cursor()
cursor.execute('''
SELECT ip, mac, vendor, first_seen, last_seen, status
FROM devices
ORDER BY last_seen DESC
''')
devices = []
for row in cursor.fetchall():
devices.append({
'ip': row[0],
'mac': row[1],
'vendor': row[2],
'first_seen': row[3],
'last_seen': row[4],
'status': row[5]
})
conn.close()
return jsonify(devices)
@app.route('/api/alerts') def api_alerts(): """API endpoint for alerts""" conn = sqlite3.connect(dashboard.db_file) cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, alert_type, severity, ip, mac, description, acknowledged
FROM alerts
ORDER BY timestamp DESC
LIMIT 50
''')
alerts = []
for row in cursor.fetchall():
alerts.append({
'timestamp': row[0],
'alert_type': row[1],
'severity': row[2],
'ip': row[3],
'mac': row[4],
'description': row[5],
'acknowledged': row[6]
})
conn.close()
return jsonify(alerts)
@app.route('/api/statistics') def api_statistics(): """API endpoint for statistics""" conn = sqlite3.connect(dashboard.db_file) cursor = conn.cursor()
# Get device count
cursor.execute('SELECT COUNT(*) FROM devices WHERE status = "active"')
active_devices = cursor.fetchone()[0]
# Get alert counts
cursor.execute('SELECT COUNT(*) FROM alerts WHERE acknowledged = FALSE')
unacknowledged_alerts = cursor.fetchone()[0]
# Get recent events count
yesterday = datetime.now() - timedelta(days=1)
cursor.execute('SELECT COUNT(*) FROM events WHERE timestamp > ?', (yesterday,))
recent_events = cursor.fetchone()[0]
# Get vendor distribution
cursor.execute('''
SELECT vendor, COUNT(*) as count
FROM devices
WHERE status = "active"
GROUP BY vendor
ORDER BY count DESC
''')
vendor_distribution = []
for row in cursor.fetchall():
vendor_distribution.append({
'vendor': row[0],
'count': row[1]
})
conn.close()
return jsonify({
'active_devices': active_devices,
'unacknowledged_alerts': unacknowledged_alerts,
'recent_events': recent_events,
'vendor_distribution': vendor_distribution
})
def start_monitoring(): """Start monitoring in background thread""" monitor_thread = threading.Thread(target=dashboard.monitor_arpwatch_log) monitor_thread.daemon = True monitor_thread.start()
if name == 'main': # Start monitoring start_monitoring()
# Start web server
app.run(host='0.0.0.0', port=5000, debug=False)
```_
Dieses umfassende Arpwatch-Catsheet bietet alles, was für eine professionelle Netzwerküberwachung, Sicherheitsanalyse und Intrusionserkennung erforderlich ist, von der grundlegenden ARP-Überwachung bis zur fortschrittlichen Security-Dashboard-Umsetzung und der automatisierten Bedrohungserkennung.