Feuille de chaleur Arpwatch
Aperçu général
Arpwatch est un outil de surveillance du réseau qui permet de suivre les appariements Ethernet/IP sur un réseau. Il surveille l'activité ARP (Address Resolution Protocol) et gère une base de données sur les appariements Ethernet/IP. Lorsqu'il détecte des changements dans la topologie du réseau, tels que de nouvelles stations, des adresses modifiées, ou des attaques potentielles de spoofing ARP, il peut envoyer des notifications par courriel et enregistrer les événements.
Caractéristiques principales
- Surveillance ARP: Surveillance continue du trafic ARP et cartographie des adresses
- Détection des changements: Détection automatique des nouvelles stations et changement d'adresse
- Alerte de sécurité: notifications par courriel pour les activités suspectes du PRA
- ** Maintenance de la base de données**: Stockage persistant des appariements Ethernet/IP
- Cartographie du réseau: Découverte automatique et suivi des appareils réseau
- Détection d'intrusion: Détection de conflits potentiels d'attaques ARP et MAC
- ** Suivi historique** : Surveillance à long terme des changements de topologie du réseau
- ** Support multi-interface**: Surveillance simultanée de plusieurs interfaces réseau
Installation
Systèmes Linux
# Ubuntu/Debian
sudo apt update
sudo apt install arpwatch
# CentOS/RHEL/Fedora
sudo yum install arpwatch
# or
sudo dnf install arpwatch
# Arch Linux
sudo pacman -S arpwatch
# openSUSE
sudo zypper install arpwatch
# From source
wget https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure
make
sudo make install
# Verify installation
arpwatch -h
FreeBSD/OpenBSD
# FreeBSD
sudo pkg install arpwatch
# OpenBSD
sudo pkg_add arpwatch
# From ports (FreeBSD)
cd /usr/ports/net-mgmt/arpwatch
sudo make install clean
# Verify installation
which arpwatch
```_
### MACOS
```bash
# Using Homebrew
brew install arpwatch
# Using MacPorts
sudo port install arpwatch
# Manual installation
curl -O https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure --prefix=/usr/local
make
sudo make install
# Verify installation
arpwatch -h
```_
## Utilisation de base
### Démarrer Arpwatch
```bash
# Basic startup (requires root privileges)
sudo arpwatch
# Specify interface
sudo arpwatch -i eth0
# Run in foreground (don't daemonize)
sudo arpwatch -d
# Specify data file location
sudo arpwatch -f /var/lib/arpwatch/arp.dat
# Enable email notifications
sudo arpwatch -m admin@example.com
# Specify network to monitor
sudo arpwatch -n 192.168.1.0/24
# Combine options
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -m admin@example.com
Fichiers de configuration
# Main configuration file locations
/etc/arpwatch.conf # Main configuration
/etc/default/arpwatch # Ubuntu/Debian defaults
/etc/sysconfig/arpwatch # RHEL/CentOS defaults
# Data file locations
/var/lib/arpwatch/arp.dat # Default database file
/var/lib/arpwatch/ethercodes.dat # Ethernet vendor codes
# Log file locations
/var/log/arpwatch.log # Main log file
/var/log/messages # System log (may contain arpwatch entries)
Configuration de base
# Edit configuration file (Ubuntu/Debian)
sudo nano /etc/default/arpwatch
# Example configuration
INTERFACES="eth0 eth1"
ARGS="-m admin@example.com -s root"
OPTIONS="-N"
# Edit configuration file (RHEL/CentOS)
sudo nano /etc/sysconfig/arpwatch
# Example configuration
OPTIONS="-i eth0 -m admin@example.com -f /var/lib/arpwatch/arp.dat"
Configuration avancée
Surveillance multi-interfaces
# Create separate instances for multiple interfaces
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -P /var/run/arpwatch_eth0.pid &
sudo arpwatch -i eth1 -f /var/lib/arpwatch/eth1.dat -P /var/run/arpwatch_eth1.pid &
sudo arpwatch -i wlan0 -f /var/lib/arpwatch/wlan0.dat -P /var/run/arpwatch_wlan0.pid &
# Systemd service for multiple interfaces (create separate service files)
sudo cp /lib/systemd/system/arpwatch.service /lib/systemd/system/arpwatch@.service
# Edit the template service file
sudo nano /lib/systemd/system/arpwatch@.service
# Example template content:
[Unit]
Description=Arpwatch daemon for %i
After=network.target
[Service]
Type=forking
ExecStart=/usr/sbin/arpwatch -i %i -f /var/lib/arpwatch/%i.dat -P /var/run/arpwatch_%i.pid
PIDFile=/var/run/arpwatch_%i.pid
User=arpwatch
Group=arpwatch
[Install]
WantedBy=multi-user.target
# Enable services for specific interfaces
sudo systemctl enable arpwatch@eth0.service
sudo systemctl enable arpwatch@eth1.service
sudo systemctl start arpwatch@eth0.service
sudo systemctl start arpwatch@eth1.service
Configuration des notifications par courriel
# Configure email notifications
sudo nano /etc/arpwatch.conf
# Example email configuration
MAILTO="admin@example.com,security@example.com"
MAILSERVER="localhost"
MAILFROM="arpwatch@example.com"
# Test email functionality
echo "Test message" | mail -s "Arpwatch Test" admin@example.com
# Configure postfix for email delivery (if needed)
sudo apt install postfix
sudo systemctl enable postfix
sudo systemctl start postfix
# Configure email templates
sudo mkdir -p /etc/arpwatch/templates
sudo nano /etc/arpwatch/templates/new_station.txt
# Example template:
Subject: New station detected
From: arpwatch@example.com
New station detected on network:
IP Address: %ip%
MAC Address: %mac%
Interface: %interface%
Timestamp: %timestamp%
Vendor: %vendor%
Scripts d'alerte personnalisés
#!/bin/bash
# /usr/local/bin/arpwatch_alert.sh
# Custom alert script for arpwatch
LOG_FILE="/var/log/arpwatch_custom.log"
ALERT_EMAIL="security@example.com"
WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
# Parse arpwatch input
while read line; do
TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')
# Log the event
echo "[$TIMESTAMP] $line" >> "$LOG_FILE"
# Parse the alert type and details
if echo "$line" | grep -q "new station"; then
ALERT_TYPE="NEW_STATION"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$line" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
# Send email alert
echo "New station detected: IP=$IP, MAC=$MAC" | \
mail -s "Arpwatch Alert: New Station" "$ALERT_EMAIL"
# Send Slack notification
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🚨 Arpwatch Alert: New station detected\\nIP: $IP\\nMAC: $MAC\\nTime: $TIMESTAMP\"}" \
"$WEBHOOK_URL"
elif echo "$line" | grep -q "changed ethernet address"; then
ALERT_TYPE="MAC_CHANGE"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
# High priority alert for MAC changes
echo "SECURITY ALERT: MAC address change detected for IP $IP" | \
mail -s "URGENT: Arpwatch Security Alert" "$ALERT_EMAIL"
# Send Slack notification
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🔥 URGENT: MAC address change detected\\nIP: $IP\\nTime: $TIMESTAMP\"}" \
"$WEBHOOK_URL"
elif echo "$line" | grep -q "flip flop"; then
ALERT_TYPE="FLIP_FLOP"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
# Potential ARP spoofing alert
echo "SECURITY ALERT: Potential ARP spoofing detected for IP $IP" | \
mail -s "CRITICAL: Potential ARP Spoofing" "$ALERT_EMAIL"
fi
done
# Make script executable
sudo chmod +x /usr/local/bin/arpwatch_alert.sh
# Configure arpwatch to use custom script
sudo arpwatch -i eth0 -e /usr/local/bin/arpwatch_alert.sh
Gestion des bases de données
Opérations de bases de données
# View current ARP database
sudo cat /var/lib/arpwatch/arp.dat
# Convert database to readable format
sudo awk '{print $1, $2, $3}' /var/lib/arpwatch/arp.dat
# Backup database
sudo cp /var/lib/arpwatch/arp.dat /var/lib/arpwatch/arp.dat.backup.$(date +%Y%m%d)
# Restore database
sudo cp /var/lib/arpwatch/arp.dat.backup.20231201 /var/lib/arpwatch/arp.dat
# Clear database (start fresh)
sudo systemctl stop arpwatch
sudo rm /var/lib/arpwatch/arp.dat
sudo systemctl start arpwatch
# Merge multiple database files
sudo cat /var/lib/arpwatch/eth0.dat /var/lib/arpwatch/eth1.dat | \
sort -u > /var/lib/arpwatch/merged.dat
Scripts d'analyse des bases de données
#!/usr/bin/env python3
# arpwatch_analyzer.py
import sys
import time
import socket
import struct
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import argparse
class ArpwatchAnalyzer:
def __init__(self, database_file):
self.database_file = database_file
self.entries = []
self.load_database()
def load_database(self):
"""Load and parse arpwatch database"""
try:
with open(self.database_file, 'rb') as f:
while True:
# Read arpwatch database entry (binary format)
data = f.read(16) # Each entry is 16 bytes
if len(data) < 16:
break
# Parse binary data
ip_bytes = data[0:4]
mac_bytes = data[4:10]
timestamp_bytes = data[10:14]
# Convert to readable format
ip = socket.inet_ntoa(ip_bytes)
mac = ':'.join([f'{b:02x}' for b in mac_bytes])
timestamp = struct.unpack('>I', timestamp_bytes)[0]
self.entries.append({
'ip': ip,
'mac': mac,
'timestamp': timestamp,
'datetime': datetime.fromtimestamp(timestamp)
})
except FileNotFoundError:
print(f"Database file not found: {self.database_file}")
sys.exit(1)
except Exception as e:
print(f"Error reading database: {e}")
sys.exit(1)
def get_statistics(self):
"""Generate database statistics"""
if not self.entries:
return "No entries in database"
total_entries = len(self.entries)
unique_ips = len(set(entry['ip'] for entry in self.entries))
unique_macs = len(set(entry['mac'] for entry in self.entries))
# Time range
timestamps = [entry['timestamp'] for entry in self.entries]
oldest = datetime.fromtimestamp(min(timestamps))
newest = datetime.fromtimestamp(max(timestamps))
# Most active IPs
ip_counts = Counter(entry['ip'] for entry in self.entries)
most_active_ips = ip_counts.most_common(5)
# Most active MACs
mac_counts = Counter(entry['mac'] for entry in self.entries)
most_active_macs = mac_counts.most_common(5)
stats = f"""
Arpwatch Database Statistics
============================
Total entries: {total_entries}
Unique IP addresses: {unique_ips}
Unique MAC addresses: {unique_macs}
Date range: {oldest} to {newest}
Most Active IP Addresses:
"""
for ip, count in most_active_ips:
stats += f" {ip}: {count} entries\n"
stats += "\nMost Active MAC Addresses:\n"
for mac, count in most_active_macs:
vendor = self.get_vendor(mac)
stats += f" {mac} ({vendor}): {count} entries\n"
return stats
def find_suspicious_activity(self):
"""Detect potentially suspicious ARP activity"""
suspicious = []
# Group by IP address
ip_groups = defaultdict(list)
for entry in self.entries:
ip_groups[entry['ip']].append(entry)
# Check for IP addresses with multiple MAC addresses
for ip, entries in ip_groups.items():
macs = set(entry['mac'] for entry in entries)
if len(macs) > 1:
suspicious.append({
'type': 'IP_MULTIPLE_MACS',
'ip': ip,
'macs': list(macs),
'count': len(entries),
'description': f"IP {ip} associated with {len(macs)} different MAC addresses"
})
# Group by MAC address
mac_groups = defaultdict(list)
for entry in self.entries:
mac_groups[entry['mac']].append(entry)
# Check for MAC addresses with multiple IP addresses
for mac, entries in mac_groups.items():
ips = set(entry['ip'] for entry in entries)
if len(ips) > 3: # Threshold for suspicious activity
vendor = self.get_vendor(mac)
suspicious.append({
'type': 'MAC_MULTIPLE_IPS',
'mac': mac,
'vendor': vendor,
'ips': list(ips),
'count': len(entries),
'description': f"MAC {mac} ({vendor}) associated with {len(ips)} different IP addresses"
})
# Check for rapid changes (potential ARP spoofing)
for ip, entries in ip_groups.items():
if len(entries) > 1:
sorted_entries = sorted(entries, key=lambda x: x['timestamp'])
for i in range(1, len(sorted_entries)):
time_diff = sorted_entries[i]['timestamp'] - sorted_entries[i-1]['timestamp']
if time_diff < 300 and sorted_entries[i]['mac'] != sorted_entries[i-1]['mac']: # 5 minutes
suspicious.append({
'type': 'RAPID_MAC_CHANGE',
'ip': ip,
'old_mac': sorted_entries[i-1]['mac'],
'new_mac': sorted_entries[i]['mac'],
'time_diff': time_diff,
'description': f"Rapid MAC change for IP {ip} ({time_diff}s between changes)"
})
return suspicious
def get_vendor(self, mac):
"""Get vendor information from MAC address"""
# Simple vendor lookup (in practice, you'd use a full OUI database)
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
def generate_report(self, output_file=None):
"""Generate comprehensive analysis report"""
report = f"""
Arpwatch Database Analysis Report
=================================
Generated: {datetime.now()}
Database: {self.database_file}
{self.get_statistics()}
Suspicious Activity Analysis
============================
"""
suspicious = self.find_suspicious_activity()
if suspicious:
for item in suspicious:
report += f"\n⚠️ {item['type']}: {item['description']}\n"
if 'macs' in item:
report += f" MAC addresses: {', '.join(item['macs'])}\n"
if 'ips' in item:
report += f" IP addresses: {', '.join(item['ips'])}\n"
if 'time_diff' in item:
report += f" Time difference: {item['time_diff']} seconds\n"
else:
report += "\nNo suspicious activity detected.\n"
# Recent activity (last 24 hours)
recent_threshold = time.time() - 86400 # 24 hours
recent_entries = [e for e in self.entries if e['timestamp'] > recent_threshold]
report += f"\nRecent Activity (Last 24 Hours)\n"
report += f"===============================\n"
report += f"Recent entries: {len(recent_entries)}\n"
if recent_entries:
report += "\nRecent IP/MAC pairs:\n"
for entry in sorted(recent_entries, key=lambda x: x['timestamp'], reverse=True)[:10]:
vendor = self.get_vendor(entry['mac'])
report += f" {entry['datetime']}: {entry['ip']} -> {entry['mac']} ({vendor})\n"
if output_file:
with open(output_file, 'w') as f:
f.write(report)
print(f"Report saved to: {output_file}")
else:
print(report)
def main():
parser = argparse.ArgumentParser(description='Arpwatch Database Analyzer')
parser.add_argument('database', help='Path to arpwatch database file')
parser.add_argument('--output', '-o', help='Output file for report')
parser.add_argument('--stats-only', action='store_true', help='Show only statistics')
args = parser.parse_args()
analyzer = ArpwatchAnalyzer(args.database)
if args.stats_only:
print(analyzer.get_statistics())
else:
analyzer.generate_report(args.output)
if __name__ == "__main__":
main()
Surveillance et alerte
Scénario de surveillance en temps réel
#!/bin/bash
# arpwatch_monitor.sh
ARPWATCH_LOG="/var/log/arpwatch.log"
ALERT_EMAIL="admin@example.com"
WHITELIST_FILE="/etc/arpwatch/whitelist.txt"
ALERT_LOG="/var/log/arpwatch_alerts.log"
# Create whitelist if it doesn't exist
if [ ! -f "$WHITELIST_FILE" ]; then
sudo touch "$WHITELIST_FILE"
echo "# Arpwatch whitelist - one MAC address per line" | sudo tee "$WHITELIST_FILE"
fi
# Function to check if MAC is whitelisted
is_whitelisted() {
local mac="$1"
grep -q "^$mac$" "$WHITELIST_FILE"
}
# Function to send alert
send_alert() {
local alert_type="$1"
local message="$2"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
# Log alert
echo "[$timestamp] $alert_type: $message" >> "$ALERT_LOG"
# Send email
echo "$message" | mail -s "Arpwatch Alert: $alert_type" "$ALERT_EMAIL"
# Send to syslog
logger -t arpwatch_monitor "$alert_type: $message"
echo "Alert sent: $alert_type"
}
# Function to analyze log entry
analyze_entry() {
local entry="$1"
if echo "$entry" | grep -q "new station"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
if ! is_whitelisted "$MAC"; then
send_alert "NEW_STATION" "New station detected: IP=$IP, MAC=$MAC"
fi
elif echo "$entry" | grep -q "changed ethernet address"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
OLD_MAC=$(echo "$entry" | sed -n 's/.*from \([0-9a-f:]\{17\}\).*/\1/p')
NEW_MAC=$(echo "$entry" | sed -n 's/.*to \([0-9a-f:]\{17\}\).*/\1/p')
send_alert "MAC_CHANGE" "MAC address changed: IP=$IP, Old MAC=$OLD_MAC, New MAC=$NEW_MAC"
elif echo "$entry" | grep -q "flip flop"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC1=$(echo "$entry" | sed -n 's/.*between \([0-9a-f:]\{17\}\).*/\1/p')
MAC2=$(echo "$entry" | sed -n 's/.*and \([0-9a-f:]\{17\}\).*/\1/p')
send_alert "FLIP_FLOP" "Flip flop detected (potential ARP spoofing): IP=$IP, MAC1=$MAC1, MAC2=$MAC2"
elif echo "$entry" | grep -q "reused old ethernet address"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
send_alert "REUSED_ADDRESS" "Reused ethernet address: IP=$IP, MAC=$MAC"
fi
}
echo "Starting Arpwatch monitor..."
echo "Monitoring log: $ARPWATCH_LOG"
echo "Alert email: $ALERT_EMAIL"
echo "Whitelist file: $WHITELIST_FILE"
echo ""
# Monitor arpwatch log file
tail -F "$ARPWATCH_LOG" | while read line; do
# Skip empty lines
if [ -n "$line" ]; then
analyze_entry "$line"
fi
done
Création de base de réseau
#!/usr/bin/env python3
# create_network_baseline.py
import subprocess
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
import argparse
class NetworkBaseline:
def __init__(self, interface='eth0', duration_hours=24):
self.interface = interface
self.duration_hours = duration_hours
self.baseline_data = {
'created': datetime.now().isoformat(),
'interface': interface,
'duration_hours': duration_hours,
'devices': {},
'statistics': {}
}
def scan_network(self):
"""Perform network scan to discover devices"""
print(f"Scanning network on interface {self.interface}...")
# Get network range
try:
# Get IP and netmask of interface
result = subprocess.run(['ip', 'addr', 'show', self.interface],
capture_output=True, text=True)
# Parse network range (simplified)
for line in result.stdout.split('\n'):
if 'inet ' in line and not '127.0.0.1' in line:
ip_info = line.strip().split()[1]
network = ip_info.split('/')[0]
# Assume /24 network for simplicity
network_base = '.'.join(network.split('.')[:-1])
network_range = f"{network_base}.0/24"
break
else:
print(f"Could not determine network range for {self.interface}")
return
print(f"Detected network range: {network_range}")
# Use nmap to scan network
nmap_cmd = ['nmap', '-sn', network_range]
result = subprocess.run(nmap_cmd, capture_output=True, text=True)
# Parse nmap output
current_ip = None
for line in result.stdout.split('\n'):
if 'Nmap scan report for' in line:
current_ip = line.split()[-1].strip('()')
elif 'MAC Address:' in line and current_ip:
mac_info = line.split('MAC Address: ')[1]
mac = mac_info.split()[0]
vendor = mac_info.split('(')[1].rstrip(')') if '(' in mac_info else 'Unknown'
self.baseline_data['devices'][current_ip] = {
'mac': mac,
'vendor': vendor,
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
'status': 'active'
}
print(f"Found device: {current_ip} -> {mac} ({vendor})")
current_ip = None
except Exception as e:
print(f"Error scanning network: {e}")
def monitor_arp_activity(self):
"""Monitor ARP activity for specified duration"""
print(f"Monitoring ARP activity for {self.duration_hours} hours...")
end_time = time.time() + (self.duration_hours * 3600)
# Start tcpdump to capture ARP packets
tcpdump_cmd = ['tcpdump', '-i', self.interface, '-n', 'arp']
try:
process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
while time.time() < end_time:
line = process.stdout.readline()
if line:
self.parse_arp_packet(line.strip())
# Check if process is still running
if process.poll() is not None:
break
process.terminate()
except KeyboardInterrupt:
print("\nMonitoring interrupted by user")
if 'process' in locals():
process.terminate()
except Exception as e:
print(f"Error monitoring ARP activity: {e}")
def parse_arp_packet(self, packet_line):
"""Parse ARP packet from tcpdump output"""
try:
# Example: "12:34:56.789012 ARP, Request who-has 192.168.1.1 tell 192.168.1.100, length 28"
# Example: "12:34:56.789012 ARP, Reply 192.168.1.1 is-at aa:bb:cc:dd:ee:ff, length 28"
if 'Reply' in packet_line and 'is-at' in packet_line:
parts = packet_line.split()
ip = None
mac = None
for i, part in enumerate(parts):
if part == 'Reply':
ip = parts[i+1]
elif part == 'is-at':
mac = parts[i+1].rstrip(',')
if ip and mac:
self.update_device_info(ip, mac)
except Exception as e:
print(f"Error parsing ARP packet: {e}")
def update_device_info(self, ip, mac):
"""Update device information in baseline"""
current_time = datetime.now().isoformat()
if ip in self.baseline_data['devices']:
# Update existing device
device = self.baseline_data['devices'][ip]
device['last_seen'] = current_time
# Check for MAC address change
if device['mac'] != mac:
print(f"MAC change detected: {ip} changed from {device['mac']} to {mac}")
device['mac_history'] = device.get('mac_history', [])
device['mac_history'].append({
'old_mac': device['mac'],
'new_mac': mac,
'timestamp': current_time
})
device['mac'] = mac
else:
# New device discovered
print(f"New device discovered: {ip} -> {mac}")
self.baseline_data['devices'][ip] = {
'mac': mac,
'vendor': self.get_vendor(mac),
'first_seen': current_time,
'last_seen': current_time,
'status': 'active'
}
def get_vendor(self, mac):
"""Get vendor information from MAC address"""
# Simplified vendor lookup
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
def generate_statistics(self):
"""Generate baseline statistics"""
devices = self.baseline_data['devices']
self.baseline_data['statistics'] = {
'total_devices': len(devices),
'active_devices': len([d for d in devices.values() if d['status'] == 'active']),
'vendors': {},
'mac_changes': 0
}
# Count vendors
vendor_count = defaultdict(int)
for device in devices.values():
vendor_count[device['vendor']] += 1
if 'mac_history' in device:
self.baseline_data['statistics']['mac_changes'] += len(device['mac_history'])
self.baseline_data['statistics']['vendors'] = dict(vendor_count)
def save_baseline(self, filename):
"""Save baseline to file"""
self.generate_statistics()
with open(filename, 'w') as f:
json.dump(self.baseline_data, f, indent=2)
print(f"Baseline saved to: {filename}")
def create_baseline(self, output_file):
"""Create complete network baseline"""
print("Creating network baseline...")
print("=" * 40)
# Initial network scan
self.scan_network()
# Monitor ARP activity
if self.duration_hours > 0:
self.monitor_arp_activity()
# Save baseline
self.save_baseline(output_file)
# Display summary
self.display_summary()
def display_summary(self):
"""Display baseline summary"""
stats = self.baseline_data['statistics']
print("\nBaseline Summary:")
print("=" * 20)
print(f"Total devices discovered: {stats['total_devices']}")
print(f"Active devices: {stats['active_devices']}")
print(f"MAC address changes detected: {stats['mac_changes']}")
print("\nVendor distribution:")
for vendor, count in stats['vendors'].items():
print(f" {vendor}: {count} devices")
print(f"\nBaseline creation completed for interface: {self.baseline_data['interface']}")
print(f"Monitoring duration: {self.baseline_data['duration_hours']} hours")
def main():
parser = argparse.ArgumentParser(description='Create network baseline for arpwatch')
parser.add_argument('--interface', '-i', default='eth0', help='Network interface to monitor')
parser.add_argument('--duration', '-d', type=int, default=1, help='Monitoring duration in hours')
parser.add_argument('--output', '-o', default='network_baseline.json', help='Output file for baseline')
parser.add_argument('--scan-only', action='store_true', help='Only perform initial scan, no monitoring')
args = parser.parse_args()
if args.scan_only:
duration = 0
else:
duration = args.duration
baseline = NetworkBaseline(args.interface, duration)
baseline.create_baseline(args.output)
if __name__ == "__main__":
main()
Applications de sécurité
Détection de spoofing ARP
#!/bin/bash
# arp_spoofing_detector.sh
INTERFACE="eth0"
THRESHOLD_SECONDS=60 # Alert if MAC changes within 60 seconds
LOG_FILE="/var/log/arp_spoofing.log"
ALERT_EMAIL="security@example.com"
# Temporary files for tracking
TEMP_DIR="/tmp/arp_detector"
mkdir -p "$TEMP_DIR"
# Function to log events
log_event() {
local level="$1"
local message="$2"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
echo "[$timestamp] [$level] $message" | tee -a "$LOG_FILE"
if [ "$level" = "ALERT" ]; then
echo "$message" | mail -s "ARP Spoofing Alert" "$ALERT_EMAIL"
logger -t arp_spoofing_detector "ALERT: $message"
fi
}
# Function to check for rapid MAC changes
check_rapid_changes() {
local ip="$1"
local mac="$2"
local current_time=$(date +%s)
local tracking_file="$TEMP_DIR/ip_$ip"
if [ -f "$tracking_file" ]; then
local last_entry=$(tail -1 "$tracking_file")
local last_time=$(echo "$last_entry" | cut -d: -f1)
local last_mac=$(echo "$last_entry" | cut -d: -f2)
local time_diff=$((current_time - last_time))
if [ "$time_diff" -lt "$THRESHOLD_SECONDS" ] && [ "$mac" != "$last_mac" ]; then
log_event "ALERT" "Rapid MAC change detected for IP $ip: $last_mac -> $mac (${time_diff}s)"
# Check if this looks like ARP spoofing
if [ "$time_diff" -lt 10 ]; then
log_event "ALERT" "POSSIBLE ARP SPOOFING: Very rapid MAC change for IP $ip"
# Additional checks
check_duplicate_mac "$mac" "$ip"
fi
fi
fi
# Update tracking file
echo "$current_time:$mac" >> "$tracking_file"
# Keep only last 10 entries
tail -10 "$tracking_file" > "$tracking_file.tmp"
mv "$tracking_file.tmp" "$tracking_file"
}
# Function to check for duplicate MAC addresses
check_duplicate_mac() {
local mac="$1"
local current_ip="$2"
# Check if this MAC is associated with other IPs
for tracking_file in "$TEMP_DIR"/ip_*; do
if [ -f "$tracking_file" ]; then
local file_ip=$(basename "$tracking_file" | sed 's/ip_//')
if [ "$file_ip" != "$current_ip" ]; then
local last_entry=$(tail -1 "$tracking_file")
local last_mac=$(echo "$last_entry" | cut -d: -f2)
if [ "$last_mac" = "$mac" ]; then
log_event "ALERT" "MAC address conflict: $mac is associated with both $current_ip and $file_ip"
fi
fi
fi
done
}
# Function to analyze ARP table
analyze_arp_table() {
log_event "INFO" "Analyzing current ARP table"
# Get current ARP table
arp -a | while read line; do
if echo "$line" | grep -q "at"; then
local hostname=$(echo "$line" | cut -d' ' -f1)
local ip=$(echo "$line" | sed 's/.*(\([^)]*\)).*/\1/')
local mac=$(echo "$line" | awk '{print $4}')
if [ -n "$ip" ] && [ -n "$mac" ]; then
check_rapid_changes "$ip" "$mac"
fi
fi
done
}
# Function to monitor ARP traffic
monitor_arp_traffic() {
log_event "INFO" "Starting ARP traffic monitoring on interface $INTERFACE"
# Use tcpdump to monitor ARP traffic
tcpdump -i "$INTERFACE" -n arp | while read line; do
if echo "$line" | grep -q "Reply.*is-at"; then
local ip=$(echo "$line" | sed -n 's/.*Reply \([0-9.]*\) is-at.*/\1/p')
local mac=$(echo "$line" | sed -n 's/.*is-at \([0-9a-f:]*\).*/\1/p')
if [ -n "$ip" ] && [ -n "$mac" ]; then
log_event "DEBUG" "ARP Reply: $ip -> $mac"
check_rapid_changes "$ip" "$mac"
fi
fi
done
}
# Cleanup function
cleanup() {
log_event "INFO" "Shutting down ARP spoofing detector"
rm -rf "$TEMP_DIR"
exit 0
}
# Set up signal handlers
trap cleanup SIGINT SIGTERM
log_event "INFO" "Starting ARP spoofing detector"
log_event "INFO" "Interface: $INTERFACE"
log_event "INFO" "Threshold: $THRESHOLD_SECONDS seconds"
log_event "INFO" "Log file: $LOG_FILE"
# Initial ARP table analysis
analyze_arp_table
# Start monitoring
monitor_arp_traffic
Tableau de bord de la sécurité du réseau
#!/usr/bin/env python3
# arpwatch_dashboard.py
import json
import time
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify
import sqlite3
import threading
app = Flask(__name__)
class ArpwatchDashboard:
def __init__(self, db_file='arpwatch_dashboard.db'):
self.db_file = db_file
self.init_database()
self.running = True
def init_database(self):
"""Initialize SQLite database for dashboard data"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS devices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
mac TEXT NOT NULL,
vendor TEXT,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
status TEXT DEFAULT 'active'
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
event_type TEXT,
ip TEXT,
mac TEXT,
old_mac TEXT,
description TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
alert_type TEXT,
severity TEXT,
ip TEXT,
mac TEXT,
description TEXT,
acknowledged BOOLEAN DEFAULT FALSE
)
''')
conn.commit()
conn.close()
def monitor_arpwatch_log(self, log_file='/var/log/arpwatch.log'):
"""Monitor arpwatch log file for new entries"""
try:
# Follow log file
process = subprocess.Popen(['tail', '-F', log_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
while self.running:
line = process.stdout.readline()
if line:
self.process_log_entry(line.strip())
if process.poll() is not None:
break
process.terminate()
except Exception as e:
print(f"Error monitoring log: {e}")
def process_log_entry(self, log_entry):
"""Process arpwatch log entry and update database"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
timestamp = datetime.now()
try:
if 'new station' in log_entry:
# Extract IP and MAC
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
mac_match = re.search(r'([0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2})', log_entry)
if ip_match and mac_match:
ip = ip_match.group(1)
mac = mac_match.group(1)
vendor = self.get_vendor(mac)
# Insert or update device
cursor.execute('''
INSERT OR REPLACE INTO devices
(ip, mac, vendor, first_seen, last_seen, status)
VALUES (?, ?, ?, ?, ?, 'active')
''', (ip, mac, vendor, timestamp, timestamp))
# Add event
cursor.execute('''
INSERT INTO events
(timestamp, event_type, ip, mac, description)
VALUES (?, 'new_station', ?, ?, ?)
''', (timestamp, ip, mac, f'New station: {ip} -> {mac}'))
# Add alert for new devices
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, mac, description)
VALUES (?, 'new_device', 'info', ?, ?, ?)
''', (timestamp, ip, mac, f'New device detected: {ip} ({vendor})'))
elif 'changed ethernet address' in log_entry:
# MAC address change
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
old_mac_match = re.search(r'from ([0-9a-f:]{17})', log_entry)
new_mac_match = re.search(r'to ([0-9a-f:]{17})', log_entry)
if ip_match and old_mac_match and new_mac_match:
ip = ip_match.group(1)
old_mac = old_mac_match.group(1)
new_mac = new_mac_match.group(1)
vendor = self.get_vendor(new_mac)
# Update device
cursor.execute('''
UPDATE devices
SET mac = ?, vendor = ?, last_seen = ?
WHERE ip = ?
''', (new_mac, vendor, timestamp, ip))
# Add event
cursor.execute('''
INSERT INTO events
(timestamp, event_type, ip, mac, old_mac, description)
VALUES (?, 'mac_change', ?, ?, ?, ?)
''', (timestamp, ip, new_mac, old_mac, f'MAC changed: {ip} from {old_mac} to {new_mac}'))
# Add high-priority alert
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, mac, description)
VALUES (?, 'mac_change', 'warning', ?, ?, ?)
''', (timestamp, ip, new_mac, f'MAC address changed for {ip}'))
elif 'flip flop' in log_entry:
# Potential ARP spoofing
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
if ip_match:
ip = ip_match.group(1)
# Add critical alert
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, description)
VALUES (?, 'flip_flop', 'critical', ?, ?)
''', (timestamp, ip, f'Flip flop detected for {ip} - potential ARP spoofing'))
conn.commit()
except Exception as e:
print(f"Error processing log entry: {e}")
finally:
conn.close()
def get_vendor(self, mac):
"""Get vendor from MAC address"""
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
# Global dashboard instance
dashboard = ArpwatchDashboard()
@app.route('/')
def index():
"""Main dashboard page"""
return render_template('dashboard.html')
@app.route('/api/devices')
def api_devices():
"""API endpoint for device list"""
conn = sqlite3.connect(dashboard.db_file)
cursor = conn.cursor()
cursor.execute('''
SELECT ip, mac, vendor, first_seen, last_seen, status
FROM devices
ORDER BY last_seen DESC
''')
devices = []
for row in cursor.fetchall():
devices.append({
'ip': row[0],
'mac': row[1],
'vendor': row[2],
'first_seen': row[3],
'last_seen': row[4],
'status': row[5]
})
conn.close()
return jsonify(devices)
@app.route('/api/alerts')
def api_alerts():
"""API endpoint for alerts"""
conn = sqlite3.connect(dashboard.db_file)
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, alert_type, severity, ip, mac, description, acknowledged
FROM alerts
ORDER BY timestamp DESC
LIMIT 50
''')
alerts = []
for row in cursor.fetchall():
alerts.append({
'timestamp': row[0],
'alert_type': row[1],
'severity': row[2],
'ip': row[3],
'mac': row[4],
'description': row[5],
'acknowledged': row[6]
})
conn.close()
return jsonify(alerts)
@app.route('/api/statistics')
def api_statistics():
"""API endpoint for statistics"""
conn = sqlite3.connect(dashboard.db_file)
cursor = conn.cursor()
# Get device count
cursor.execute('SELECT COUNT(*) FROM devices WHERE status = "active"')
active_devices = cursor.fetchone()[0]
# Get alert counts
cursor.execute('SELECT COUNT(*) FROM alerts WHERE acknowledged = FALSE')
unacknowledged_alerts = cursor.fetchone()[0]
# Get recent events count
yesterday = datetime.now() - timedelta(days=1)
cursor.execute('SELECT COUNT(*) FROM events WHERE timestamp > ?', (yesterday,))
recent_events = cursor.fetchone()[0]
# Get vendor distribution
cursor.execute('''
SELECT vendor, COUNT(*) as count
FROM devices
WHERE status = "active"
GROUP BY vendor
ORDER BY count DESC
''')
vendor_distribution = []
for row in cursor.fetchall():
vendor_distribution.append({
'vendor': row[0],
'count': row[1]
})
conn.close()
return jsonify({
'active_devices': active_devices,
'unacknowledged_alerts': unacknowledged_alerts,
'recent_events': recent_events,
'vendor_distribution': vendor_distribution
})
def start_monitoring():
"""Start monitoring in background thread"""
monitor_thread = threading.Thread(target=dashboard.monitor_arpwatch_log)
monitor_thread.daemon = True
monitor_thread.start()
if __name__ == '__main__':
# Start monitoring
start_monitoring()
# Start web server
app.run(host='0.0.0.0', port=5000, debug=False)
Cette feuille de triche complète d'Arpwatch fournit tout ce qui est nécessaire pour la surveillance du réseau professionnel, l'analyse de sécurité et la détection d'intrusion, de la surveillance ARP de base à la mise en œuvre avancée du tableau de bord de sécurité et la détection automatisée des menaces.