Arpwatch Cheatsheet
- :material-content-copy: ** Copy to Clipboard**
- :material-file-pdf-box: ** Descargar PDF**
■/div titulada
Sinopsis
Arpwatch es una herramienta de monitoreo de la red que mantiene un seguimiento de los pares de direcciones Ethernet/IP en una red. Supervisa la actividad ARP (Protocolo de Resolución de Adress) y mantiene una base de datos de pares de direcciones Ethernet/IP. Cuando detecta cambios en la topología de la red, tales como nuevas estaciones, direcciones modificadas o posibles ataques de prueba ARP, puede enviar notificaciones de correo electrónico y registrar los eventos.
Características clave
- ARP Monitoring: Supervisión continua del tráfico de ARP y asignación de mapas
- Detección de cambios: Detección automática de nuevas estaciones y cambios de dirección
- ** Alerta de seguridad**: notificaciones de correo electrónico para actividad sospechosa de ARP
- Mantenimiento de bases de datos: Almacenamiento persistente de pares de direcciones Ethernet/IP
- ** Mapping de red**: descubrimiento y seguimiento automáticos de dispositivos de red
- Detección de la intrusión: Detección de la posible lucha contra los ARP y solución de conflictos
- ** Seguimiento histórico**: Monitoreo a largo plazo de cambios en topología de red
- Multi-interface Support: Monitorización de múltiples interfaces de red simultáneamente
Instalación
Linux Systems
# Ubuntu/Debian
sudo apt update
sudo apt install arpwatch
# CentOS/RHEL/Fedora
sudo yum install arpwatch
# or
sudo dnf install arpwatch
# Arch Linux
sudo pacman -S arpwatch
# openSUSE
sudo zypper install arpwatch
# From source
wget https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure
make
sudo make install
# Verify installation
arpwatch -h
FreeBSD/OpenBSD
# FreeBSD
sudo pkg install arpwatch
# OpenBSD
sudo pkg_add arpwatch
# From ports (FreeBSD)
cd /usr/ports/net-mgmt/arpwatch
sudo make install clean
# Verify installation
which arpwatch
macOS
# Using Homebrew
brew install arpwatch
# Using MacPorts
sudo port install arpwatch
# Manual installation
curl -O https://ee.lbl.gov/downloads/arpwatch/arpwatch-2.1a15.tar.gz
tar -xzf arpwatch-2.1a15.tar.gz
cd arpwatch-2.1a15
./configure --prefix=/usr/local
make
sudo make install
# Verify installation
arpwatch -h
Uso básico
Inicio del reloj Arp
# Basic startup (requires root privileges)
sudo arpwatch
# Specify interface
sudo arpwatch -i eth0
# Run in foreground (don't daemonize)
sudo arpwatch -d
# Specify data file location
sudo arpwatch -f /var/lib/arpwatch/arp.dat
# Enable email notifications
sudo arpwatch -m admin@example.com
# Specify network to monitor
sudo arpwatch -n 192.168.1.0/24
# Combine options
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -m admin@example.com
Archivos de configuración
# Main configuration file locations
/etc/arpwatch.conf # Main configuration
/etc/default/arpwatch # Ubuntu/Debian defaults
/etc/sysconfig/arpwatch # RHEL/CentOS defaults
# Data file locations
/var/lib/arpwatch/arp.dat # Default database file
/var/lib/arpwatch/ethercodes.dat # Ethernet vendor codes
# Log file locations
/var/log/arpwatch.log # Main log file
/var/log/messages # System log (may contain arpwatch entries)
Configuración básica
# Edit configuration file (Ubuntu/Debian)
sudo nano /etc/default/arpwatch
# Example configuration
INTERFACES="eth0 eth1"
ARGS="-m admin@example.com -s root"
OPTIONS="-N"
# Edit configuration file (RHEL/CentOS)
sudo nano /etc/sysconfig/arpwatch
# Example configuration
OPTIONS="-i eth0 -m admin@example.com -f /var/lib/arpwatch/arp.dat"
Configuración avanzada
Multi-Interface Monitoring
# Create separate instances for multiple interfaces
sudo arpwatch -i eth0 -f /var/lib/arpwatch/eth0.dat -P /var/run/arpwatch_eth0.pid &
sudo arpwatch -i eth1 -f /var/lib/arpwatch/eth1.dat -P /var/run/arpwatch_eth1.pid &
sudo arpwatch -i wlan0 -f /var/lib/arpwatch/wlan0.dat -P /var/run/arpwatch_wlan0.pid &
# Systemd service for multiple interfaces (create separate service files)
sudo cp /lib/systemd/system/arpwatch.service /lib/systemd/system/arpwatch@.service
# Edit the template service file
sudo nano /lib/systemd/system/arpwatch@.service
# Example template content:
[Unit]
Description=Arpwatch daemon for %i
After=network.target
[Service]
Type=forking
ExecStart=/usr/sbin/arpwatch -i %i -f /var/lib/arpwatch/%i.dat -P /var/run/arpwatch_%i.pid
PIDFile=/var/run/arpwatch_%i.pid
User=arpwatch
Group=arpwatch
[Install]
WantedBy=multi-user.target
# Enable services for specific interfaces
sudo systemctl enable arpwatch@eth0.service
sudo systemctl enable arpwatch@eth1.service
sudo systemctl start arpwatch@eth0.service
sudo systemctl start arpwatch@eth1.service
Configuración de notificación por correo electrónico
# Configure email notifications
sudo nano /etc/arpwatch.conf
# Example email configuration
MAILTO="admin@example.com,security@example.com"
MAILSERVER="localhost"
MAILFROM="arpwatch@example.com"
# Test email functionality
echo "Test message" | mail -s "Arpwatch Test" admin@example.com
# Configure postfix for email delivery (if needed)
sudo apt install postfix
sudo systemctl enable postfix
sudo systemctl start postfix
# Configure email templates
sudo mkdir -p /etc/arpwatch/templates
sudo nano /etc/arpwatch/templates/new_station.txt
# Example template:
Subject: New station detected
From: arpwatch@example.com
New station detected on network:
IP Address: %ip%
MAC Address: %mac%
Interface: %interface%
Timestamp: %timestamp%
Vendor: %vendor%
Scripts de Alerta Personalizada
#!/bin/bash
# /usr/local/bin/arpwatch_alert.sh
# Custom alert script for arpwatch
LOG_FILE="/var/log/arpwatch_custom.log"
ALERT_EMAIL="security@example.com"
WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
# Parse arpwatch input
while read line; do
TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')
# Log the event
echo "[$TIMESTAMP] $line" >> "$LOG_FILE"
# Parse the alert type and details
if echo "$line" | grep -q "new station"; then
ALERT_TYPE="NEW_STATION"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$line" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
# Send email alert
echo "New station detected: IP=$IP, MAC=$MAC" | \
mail -s "Arpwatch Alert: New Station" "$ALERT_EMAIL"
# Send Slack notification
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🚨 Arpwatch Alert: New station detected\\nIP: $IP\\nMAC: $MAC\\nTime: $TIMESTAMP\"}" \
"$WEBHOOK_URL"
elif echo "$line" | grep -q "changed ethernet address"; then
ALERT_TYPE="MAC_CHANGE"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
# High priority alert for MAC changes
echo "SECURITY ALERT: MAC address change detected for IP $IP" | \
mail -s "URGENT: Arpwatch Security Alert" "$ALERT_EMAIL"
# Send Slack notification
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🔥 URGENT: MAC address change detected\\nIP: $IP\\nTime: $TIMESTAMP\"}" \
"$WEBHOOK_URL"
elif echo "$line" | grep -q "flip flop"; then
ALERT_TYPE="FLIP_FLOP"
IP=$(echo "$line" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
# Potential ARP spoofing alert
echo "SECURITY ALERT: Potential ARP spoofing detected for IP $IP" | \
mail -s "CRITICAL: Potential ARP Spoofing" "$ALERT_EMAIL"
fi
done
# Make script executable
sudo chmod +x /usr/local/bin/arpwatch_alert.sh
# Configure arpwatch to use custom script
sudo arpwatch -i eth0 -e /usr/local/bin/arpwatch_alert.sh
Gestión de bases de datos
Operaciones de base de datos
# View current ARP database
sudo cat /var/lib/arpwatch/arp.dat
# Convert database to readable format
sudo awk '{print $1, $2, $3}' /var/lib/arpwatch/arp.dat
# Backup database
sudo cp /var/lib/arpwatch/arp.dat /var/lib/arpwatch/arp.dat.backup.$(date +%Y%m%d)
# Restore database
sudo cp /var/lib/arpwatch/arp.dat.backup.20231201 /var/lib/arpwatch/arp.dat
# Clear database (start fresh)
sudo systemctl stop arpwatch
sudo rm /var/lib/arpwatch/arp.dat
sudo systemctl start arpwatch
# Merge multiple database files
sudo cat /var/lib/arpwatch/eth0.dat /var/lib/arpwatch/eth1.dat | \
sort -u > /var/lib/arpwatch/merged.dat
Scripts de análisis de bases de datos
#!/usr/bin/env python3
# arpwatch_analyzer.py
import sys
import time
import socket
import struct
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import argparse
class ArpwatchAnalyzer:
def __init__(self, database_file):
self.database_file = database_file
self.entries = []
self.load_database()
def load_database(self):
"""Load and parse arpwatch database"""
try:
with open(self.database_file, 'rb') as f:
while True:
# Read arpwatch database entry (binary format)
data = f.read(16) # Each entry is 16 bytes
if len(data) < 16:
break
# Parse binary data
ip_bytes = data[0:4]
mac_bytes = data[4:10]
timestamp_bytes = data[10:14]
# Convert to readable format
ip = socket.inet_ntoa(ip_bytes)
mac = ':'.join([f'{b:02x}' for b in mac_bytes])
timestamp = struct.unpack('>I', timestamp_bytes)[0]
self.entries.append({
'ip': ip,
'mac': mac,
'timestamp': timestamp,
'datetime': datetime.fromtimestamp(timestamp)
})
except FileNotFoundError:
print(f"Database file not found: {self.database_file}")
sys.exit(1)
except Exception as e:
print(f"Error reading database: {e}")
sys.exit(1)
def get_statistics(self):
"""Generate database statistics"""
if not self.entries:
return "No entries in database"
total_entries = len(self.entries)
unique_ips = len(set(entry['ip'] for entry in self.entries))
unique_macs = len(set(entry['mac'] for entry in self.entries))
# Time range
timestamps = [entry['timestamp'] for entry in self.entries]
oldest = datetime.fromtimestamp(min(timestamps))
newest = datetime.fromtimestamp(max(timestamps))
# Most active IPs
ip_counts = Counter(entry['ip'] for entry in self.entries)
most_active_ips = ip_counts.most_common(5)
# Most active MACs
mac_counts = Counter(entry['mac'] for entry in self.entries)
most_active_macs = mac_counts.most_common(5)
stats = f"""
Arpwatch Database Statistics
============================
Total entries: {total_entries}
Unique IP addresses: {unique_ips}
Unique MAC addresses: {unique_macs}
Date range: {oldest} to {newest}
Most Active IP Addresses:
"""
for ip, count in most_active_ips:
stats += f" {ip}: {count} entries\n"
stats += "\nMost Active MAC Addresses:\n"
for mac, count in most_active_macs:
vendor = self.get_vendor(mac)
stats += f" {mac} ({vendor}): {count} entries\n"
return stats
def find_suspicious_activity(self):
"""Detect potentially suspicious ARP activity"""
suspicious = []
# Group by IP address
ip_groups = defaultdict(list)
for entry in self.entries:
ip_groups[entry['ip']].append(entry)
# Check for IP addresses with multiple MAC addresses
for ip, entries in ip_groups.items():
macs = set(entry['mac'] for entry in entries)
if len(macs) > 1:
suspicious.append({
'type': 'IP_MULTIPLE_MACS',
'ip': ip,
'macs': list(macs),
'count': len(entries),
'description': f"IP {ip} associated with {len(macs)} different MAC addresses"
})
# Group by MAC address
mac_groups = defaultdict(list)
for entry in self.entries:
mac_groups[entry['mac']].append(entry)
# Check for MAC addresses with multiple IP addresses
for mac, entries in mac_groups.items():
ips = set(entry['ip'] for entry in entries)
if len(ips) > 3: # Threshold for suspicious activity
vendor = self.get_vendor(mac)
suspicious.append({
'type': 'MAC_MULTIPLE_IPS',
'mac': mac,
'vendor': vendor,
'ips': list(ips),
'count': len(entries),
'description': f"MAC {mac} ({vendor}) associated with {len(ips)} different IP addresses"
})
# Check for rapid changes (potential ARP spoofing)
for ip, entries in ip_groups.items():
if len(entries) > 1:
sorted_entries = sorted(entries, key=lambda x: x['timestamp'])
for i in range(1, len(sorted_entries)):
time_diff = sorted_entries[i]['timestamp'] - sorted_entries[i-1]['timestamp']
if time_diff < 300 and sorted_entries[i]['mac'] != sorted_entries[i-1]['mac']: # 5 minutes
suspicious.append({
'type': 'RAPID_MAC_CHANGE',
'ip': ip,
'old_mac': sorted_entries[i-1]['mac'],
'new_mac': sorted_entries[i]['mac'],
'time_diff': time_diff,
'description': f"Rapid MAC change for IP {ip} ({time_diff}s between changes)"
})
return suspicious
def get_vendor(self, mac):
"""Get vendor information from MAC address"""
# Simple vendor lookup (in practice, you'd use a full OUI database)
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
def generate_report(self, output_file=None):
"""Generate comprehensive analysis report"""
report = f"""
Arpwatch Database Analysis Report
=================================
Generated: {datetime.now()}
Database: {self.database_file}
{self.get_statistics()}
Suspicious Activity Analysis
============================
"""
suspicious = self.find_suspicious_activity()
if suspicious:
for item in suspicious:
report += f"\n⚠️ {item['type']}: {item['description']}\n"
if 'macs' in item:
report += f" MAC addresses: {', '.join(item['macs'])}\n"
if 'ips' in item:
report += f" IP addresses: {', '.join(item['ips'])}\n"
if 'time_diff' in item:
report += f" Time difference: {item['time_diff']} seconds\n"
else:
report += "\nNo suspicious activity detected.\n"
# Recent activity (last 24 hours)
recent_threshold = time.time() - 86400 # 24 hours
recent_entries = [e for e in self.entries if e['timestamp'] > recent_threshold]
report += f"\nRecent Activity (Last 24 Hours)\n"
report += f"===============================\n"
report += f"Recent entries: {len(recent_entries)}\n"
if recent_entries:
report += "\nRecent IP/MAC pairs:\n"
for entry in sorted(recent_entries, key=lambda x: x['timestamp'], reverse=True)[:10]:
vendor = self.get_vendor(entry['mac'])
report += f" {entry['datetime']}: {entry['ip']} -> {entry['mac']} ({vendor})\n"
if output_file:
with open(output_file, 'w') as f:
f.write(report)
print(f"Report saved to: {output_file}")
else:
print(report)
def main():
parser = argparse.ArgumentParser(description='Arpwatch Database Analyzer')
parser.add_argument('database', help='Path to arpwatch database file')
parser.add_argument('--output', '-o', help='Output file for report')
parser.add_argument('--stats-only', action='store_true', help='Show only statistics')
args = parser.parse_args()
analyzer = ArpwatchAnalyzer(args.database)
if args.stats_only:
print(analyzer.get_statistics())
else:
analyzer.generate_report(args.output)
if __name__ == "__main__":
main()
Vigilancia y alerta
Script de Monitoreo en tiempo real
#!/bin/bash
# arpwatch_monitor.sh
ARPWATCH_LOG="/var/log/arpwatch.log"
ALERT_EMAIL="admin@example.com"
WHITELIST_FILE="/etc/arpwatch/whitelist.txt"
ALERT_LOG="/var/log/arpwatch_alerts.log"
# Create whitelist if it doesn't exist
if [ ! -f "$WHITELIST_FILE" ]; then
sudo touch "$WHITELIST_FILE"
echo "# Arpwatch whitelist - one MAC address per line" | sudo tee "$WHITELIST_FILE"
fi
# Function to check if MAC is whitelisted
is_whitelisted() {
local mac="$1"
grep -q "^$mac$" "$WHITELIST_FILE"
}
# Function to send alert
send_alert() {
local alert_type="$1"
local message="$2"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
# Log alert
echo "[$timestamp] $alert_type: $message" >> "$ALERT_LOG"
# Send email
echo "$message" | mail -s "Arpwatch Alert: $alert_type" "$ALERT_EMAIL"
# Send to syslog
logger -t arpwatch_monitor "$alert_type: $message"
echo "Alert sent: $alert_type"
}
# Function to analyze log entry
analyze_entry() {
local entry="$1"
if echo "$entry" | grep -q "new station"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
if ! is_whitelisted "$MAC"; then
send_alert "NEW_STATION" "New station detected: IP=$IP, MAC=$MAC"
fi
elif echo "$entry" | grep -q "changed ethernet address"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
OLD_MAC=$(echo "$entry" | sed -n 's/.*from \([0-9a-f:]\{17\}\).*/\1/p')
NEW_MAC=$(echo "$entry" | sed -n 's/.*to \([0-9a-f:]\{17\}\).*/\1/p')
send_alert "MAC_CHANGE" "MAC address changed: IP=$IP, Old MAC=$OLD_MAC, New MAC=$NEW_MAC"
elif echo "$entry" | grep -q "flip flop"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC1=$(echo "$entry" | sed -n 's/.*between \([0-9a-f:]\{17\}\).*/\1/p')
MAC2=$(echo "$entry" | sed -n 's/.*and \([0-9a-f:]\{17\}\).*/\1/p')
send_alert "FLIP_FLOP" "Flip flop detected (potential ARP spoofing): IP=$IP, MAC1=$MAC1, MAC2=$MAC2"
elif echo "$entry" | grep -q "reused old ethernet address"; then
IP=$(echo "$entry" | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+')
MAC=$(echo "$entry" | grep -o '[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}:[0-9a-f]\{2\}')
send_alert "REUSED_ADDRESS" "Reused ethernet address: IP=$IP, MAC=$MAC"
fi
}
echo "Starting Arpwatch monitor..."
echo "Monitoring log: $ARPWATCH_LOG"
echo "Alert email: $ALERT_EMAIL"
echo "Whitelist file: $WHITELIST_FILE"
echo ""
# Monitor arpwatch log file
tail -F "$ARPWATCH_LOG" | while read line; do
# Skip empty lines
if [ -n "$line" ]; then
analyze_entry "$line"
fi
done
Creación de bases de red
#!/usr/bin/env python3
# create_network_baseline.py
import subprocess
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
import argparse
class NetworkBaseline:
def __init__(self, interface='eth0', duration_hours=24):
self.interface = interface
self.duration_hours = duration_hours
self.baseline_data = {
'created': datetime.now().isoformat(),
'interface': interface,
'duration_hours': duration_hours,
'devices': {},
'statistics': {}
}
def scan_network(self):
"""Perform network scan to discover devices"""
print(f"Scanning network on interface {self.interface}...")
# Get network range
try:
# Get IP and netmask of interface
result = subprocess.run(['ip', 'addr', 'show', self.interface],
capture_output=True, text=True)
# Parse network range (simplified)
for line in result.stdout.split('\n'):
if 'inet ' in line and not '127.0.0.1' in line:
ip_info = line.strip().split()[1]
network = ip_info.split('/')[0]
# Assume /24 network for simplicity
network_base = '.'.join(network.split('.')[:-1])
network_range = f"{network_base}.0/24"
break
else:
print(f"Could not determine network range for {self.interface}")
return
print(f"Detected network range: {network_range}")
# Use nmap to scan network
nmap_cmd = ['nmap', '-sn', network_range]
result = subprocess.run(nmap_cmd, capture_output=True, text=True)
# Parse nmap output
current_ip = None
for line in result.stdout.split('\n'):
if 'Nmap scan report for' in line:
current_ip = line.split()[-1].strip('()')
elif 'MAC Address:' in line and current_ip:
mac_info = line.split('MAC Address: ')[1]
mac = mac_info.split()[0]
vendor = mac_info.split('(')[1].rstrip(')') if '(' in mac_info else 'Unknown'
self.baseline_data['devices'][current_ip] = {
'mac': mac,
'vendor': vendor,
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
'status': 'active'
}
print(f"Found device: {current_ip} -> {mac} ({vendor})")
current_ip = None
except Exception as e:
print(f"Error scanning network: {e}")
def monitor_arp_activity(self):
"""Monitor ARP activity for specified duration"""
print(f"Monitoring ARP activity for {self.duration_hours} hours...")
end_time = time.time() + (self.duration_hours * 3600)
# Start tcpdump to capture ARP packets
tcpdump_cmd = ['tcpdump', '-i', self.interface, '-n', 'arp']
try:
process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
while time.time() < end_time:
line = process.stdout.readline()
if line:
self.parse_arp_packet(line.strip())
# Check if process is still running
if process.poll() is not None:
break
process.terminate()
except KeyboardInterrupt:
print("\nMonitoring interrupted by user")
if 'process' in locals():
process.terminate()
except Exception as e:
print(f"Error monitoring ARP activity: {e}")
def parse_arp_packet(self, packet_line):
"""Parse ARP packet from tcpdump output"""
try:
# Example: "12:34:56.789012 ARP, Request who-has 192.168.1.1 tell 192.168.1.100, length 28"
# Example: "12:34:56.789012 ARP, Reply 192.168.1.1 is-at aa:bb:cc:dd:ee:ff, length 28"
if 'Reply' in packet_line and 'is-at' in packet_line:
parts = packet_line.split()
ip = None
mac = None
for i, part in enumerate(parts):
if part == 'Reply':
ip = parts[i+1]
elif part == 'is-at':
mac = parts[i+1].rstrip(',')
if ip and mac:
self.update_device_info(ip, mac)
except Exception as e:
print(f"Error parsing ARP packet: {e}")
def update_device_info(self, ip, mac):
"""Update device information in baseline"""
current_time = datetime.now().isoformat()
if ip in self.baseline_data['devices']:
# Update existing device
device = self.baseline_data['devices'][ip]
device['last_seen'] = current_time
# Check for MAC address change
if device['mac'] != mac:
print(f"MAC change detected: {ip} changed from {device['mac']} to {mac}")
device['mac_history'] = device.get('mac_history', [])
device['mac_history'].append({
'old_mac': device['mac'],
'new_mac': mac,
'timestamp': current_time
})
device['mac'] = mac
else:
# New device discovered
print(f"New device discovered: {ip} -> {mac}")
self.baseline_data['devices'][ip] = {
'mac': mac,
'vendor': self.get_vendor(mac),
'first_seen': current_time,
'last_seen': current_time,
'status': 'active'
}
def get_vendor(self, mac):
"""Get vendor information from MAC address"""
# Simplified vendor lookup
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
def generate_statistics(self):
"""Generate baseline statistics"""
devices = self.baseline_data['devices']
self.baseline_data['statistics'] = {
'total_devices': len(devices),
'active_devices': len([d for d in devices.values() if d['status'] == 'active']),
'vendors': {},
'mac_changes': 0
}
# Count vendors
vendor_count = defaultdict(int)
for device in devices.values():
vendor_count[device['vendor']] += 1
if 'mac_history' in device:
self.baseline_data['statistics']['mac_changes'] += len(device['mac_history'])
self.baseline_data['statistics']['vendors'] = dict(vendor_count)
def save_baseline(self, filename):
"""Save baseline to file"""
self.generate_statistics()
with open(filename, 'w') as f:
json.dump(self.baseline_data, f, indent=2)
print(f"Baseline saved to: {filename}")
def create_baseline(self, output_file):
"""Create complete network baseline"""
print("Creating network baseline...")
print("=" * 40)
# Initial network scan
self.scan_network()
# Monitor ARP activity
if self.duration_hours > 0:
self.monitor_arp_activity()
# Save baseline
self.save_baseline(output_file)
# Display summary
self.display_summary()
def display_summary(self):
"""Display baseline summary"""
stats = self.baseline_data['statistics']
print("\nBaseline Summary:")
print("=" * 20)
print(f"Total devices discovered: {stats['total_devices']}")
print(f"Active devices: {stats['active_devices']}")
print(f"MAC address changes detected: {stats['mac_changes']}")
print("\nVendor distribution:")
for vendor, count in stats['vendors'].items():
print(f" {vendor}: {count} devices")
print(f"\nBaseline creation completed for interface: {self.baseline_data['interface']}")
print(f"Monitoring duration: {self.baseline_data['duration_hours']} hours")
def main():
parser = argparse.ArgumentParser(description='Create network baseline for arpwatch')
parser.add_argument('--interface', '-i', default='eth0', help='Network interface to monitor')
parser.add_argument('--duration', '-d', type=int, default=1, help='Monitoring duration in hours')
parser.add_argument('--output', '-o', default='network_baseline.json', help='Output file for baseline')
parser.add_argument('--scan-only', action='store_true', help='Only perform initial scan, no monitoring')
args = parser.parse_args()
if args.scan_only:
duration = 0
else:
duration = args.duration
baseline = NetworkBaseline(args.interface, duration)
baseline.create_baseline(args.output)
if __name__ == "__main__":
main()
Aplicaciones de seguridad
ARP Spoofing Detection
#!/bin/bash
# arp_spoofing_detector.sh
INTERFACE="eth0"
THRESHOLD_SECONDS=60 # Alert if MAC changes within 60 seconds
LOG_FILE="/var/log/arp_spoofing.log"
ALERT_EMAIL="security@example.com"
# Temporary files for tracking
TEMP_DIR="/tmp/arp_detector"
mkdir -p "$TEMP_DIR"
# Function to log events
log_event() {
local level="$1"
local message="$2"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
echo "[$timestamp] [$level] $message" | tee -a "$LOG_FILE"
if [ "$level" = "ALERT" ]; then
echo "$message" | mail -s "ARP Spoofing Alert" "$ALERT_EMAIL"
logger -t arp_spoofing_detector "ALERT: $message"
fi
}
# Function to check for rapid MAC changes
check_rapid_changes() {
local ip="$1"
local mac="$2"
local current_time=$(date +%s)
local tracking_file="$TEMP_DIR/ip_$ip"
if [ -f "$tracking_file" ]; then
local last_entry=$(tail -1 "$tracking_file")
local last_time=$(echo "$last_entry" | cut -d: -f1)
local last_mac=$(echo "$last_entry" | cut -d: -f2)
local time_diff=$((current_time - last_time))
if [ "$time_diff" -lt "$THRESHOLD_SECONDS" ] && [ "$mac" != "$last_mac" ]; then
log_event "ALERT" "Rapid MAC change detected for IP $ip: $last_mac -> $mac (${time_diff}s)"
# Check if this looks like ARP spoofing
if [ "$time_diff" -lt 10 ]; then
log_event "ALERT" "POSSIBLE ARP SPOOFING: Very rapid MAC change for IP $ip"
# Additional checks
check_duplicate_mac "$mac" "$ip"
fi
fi
fi
# Update tracking file
echo "$current_time:$mac" >> "$tracking_file"
# Keep only last 10 entries
tail -10 "$tracking_file" > "$tracking_file.tmp"
mv "$tracking_file.tmp" "$tracking_file"
}
# Function to check for duplicate MAC addresses
check_duplicate_mac() {
local mac="$1"
local current_ip="$2"
# Check if this MAC is associated with other IPs
for tracking_file in "$TEMP_DIR"/ip_*; do
if [ -f "$tracking_file" ]; then
local file_ip=$(basename "$tracking_file" | sed 's/ip_//')
if [ "$file_ip" != "$current_ip" ]; then
local last_entry=$(tail -1 "$tracking_file")
local last_mac=$(echo "$last_entry" | cut -d: -f2)
if [ "$last_mac" = "$mac" ]; then
log_event "ALERT" "MAC address conflict: $mac is associated with both $current_ip and $file_ip"
fi
fi
fi
done
}
# Function to analyze ARP table
analyze_arp_table() {
log_event "INFO" "Analyzing current ARP table"
# Get current ARP table
arp -a | while read line; do
if echo "$line" | grep -q "at"; then
local hostname=$(echo "$line" | cut -d' ' -f1)
local ip=$(echo "$line" | sed 's/.*(\([^)]*\)).*/\1/')
local mac=$(echo "$line" | awk '{print $4}')
if [ -n "$ip" ] && [ -n "$mac" ]; then
check_rapid_changes "$ip" "$mac"
fi
fi
done
}
# Function to monitor ARP traffic
monitor_arp_traffic() {
log_event "INFO" "Starting ARP traffic monitoring on interface $INTERFACE"
# Use tcpdump to monitor ARP traffic
tcpdump -i "$INTERFACE" -n arp | while read line; do
if echo "$line" | grep -q "Reply.*is-at"; then
local ip=$(echo "$line" | sed -n 's/.*Reply \([0-9.]*\) is-at.*/\1/p')
local mac=$(echo "$line" | sed -n 's/.*is-at \([0-9a-f:]*\).*/\1/p')
if [ -n "$ip" ] && [ -n "$mac" ]; then
log_event "DEBUG" "ARP Reply: $ip -> $mac"
check_rapid_changes "$ip" "$mac"
fi
fi
done
}
# Cleanup function
cleanup() {
log_event "INFO" "Shutting down ARP spoofing detector"
rm -rf "$TEMP_DIR"
exit 0
}
# Set up signal handlers
trap cleanup SIGINT SIGTERM
log_event "INFO" "Starting ARP spoofing detector"
log_event "INFO" "Interface: $INTERFACE"
log_event "INFO" "Threshold: $THRESHOLD_SECONDS seconds"
log_event "INFO" "Log file: $LOG_FILE"
# Initial ARP table analysis
analyze_arp_table
# Start monitoring
monitor_arp_traffic
Network Security Dashboard
#!/usr/bin/env python3
# arpwatch_dashboard.py
import json
import time
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify
import sqlite3
import threading
app = Flask(__name__)
class ArpwatchDashboard:
def __init__(self, db_file='arpwatch_dashboard.db'):
self.db_file = db_file
self.init_database()
self.running = True
def init_database(self):
"""Initialize SQLite database for dashboard data"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS devices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
mac TEXT NOT NULL,
vendor TEXT,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
status TEXT DEFAULT 'active'
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
event_type TEXT,
ip TEXT,
mac TEXT,
old_mac TEXT,
description TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP,
alert_type TEXT,
severity TEXT,
ip TEXT,
mac TEXT,
description TEXT,
acknowledged BOOLEAN DEFAULT FALSE
)
''')
conn.commit()
conn.close()
def monitor_arpwatch_log(self, log_file='/var/log/arpwatch.log'):
"""Monitor arpwatch log file for new entries"""
try:
# Follow log file
process = subprocess.Popen(['tail', '-F', log_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
while self.running:
line = process.stdout.readline()
if line:
self.process_log_entry(line.strip())
if process.poll() is not None:
break
process.terminate()
except Exception as e:
print(f"Error monitoring log: {e}")
def process_log_entry(self, log_entry):
"""Process arpwatch log entry and update database"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
timestamp = datetime.now()
try:
if 'new station' in log_entry:
# Extract IP and MAC
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
mac_match = re.search(r'([0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2}:[0-9a-f]{2})', log_entry)
if ip_match and mac_match:
ip = ip_match.group(1)
mac = mac_match.group(1)
vendor = self.get_vendor(mac)
# Insert or update device
cursor.execute('''
INSERT OR REPLACE INTO devices
(ip, mac, vendor, first_seen, last_seen, status)
VALUES (?, ?, ?, ?, ?, 'active')
''', (ip, mac, vendor, timestamp, timestamp))
# Add event
cursor.execute('''
INSERT INTO events
(timestamp, event_type, ip, mac, description)
VALUES (?, 'new_station', ?, ?, ?)
''', (timestamp, ip, mac, f'New station: {ip} -> {mac}'))
# Add alert for new devices
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, mac, description)
VALUES (?, 'new_device', 'info', ?, ?, ?)
''', (timestamp, ip, mac, f'New device detected: {ip} ({vendor})'))
elif 'changed ethernet address' in log_entry:
# MAC address change
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
old_mac_match = re.search(r'from ([0-9a-f:]{17})', log_entry)
new_mac_match = re.search(r'to ([0-9a-f:]{17})', log_entry)
if ip_match and old_mac_match and new_mac_match:
ip = ip_match.group(1)
old_mac = old_mac_match.group(1)
new_mac = new_mac_match.group(1)
vendor = self.get_vendor(new_mac)
# Update device
cursor.execute('''
UPDATE devices
SET mac = ?, vendor = ?, last_seen = ?
WHERE ip = ?
''', (new_mac, vendor, timestamp, ip))
# Add event
cursor.execute('''
INSERT INTO events
(timestamp, event_type, ip, mac, old_mac, description)
VALUES (?, 'mac_change', ?, ?, ?, ?)
''', (timestamp, ip, new_mac, old_mac, f'MAC changed: {ip} from {old_mac} to {new_mac}'))
# Add high-priority alert
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, mac, description)
VALUES (?, 'mac_change', 'warning', ?, ?, ?)
''', (timestamp, ip, new_mac, f'MAC address changed for {ip}'))
elif 'flip flop' in log_entry:
# Potential ARP spoofing
import re
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', log_entry)
if ip_match:
ip = ip_match.group(1)
# Add critical alert
cursor.execute('''
INSERT INTO alerts
(timestamp, alert_type, severity, ip, description)
VALUES (?, 'flip_flop', 'critical', ?, ?)
''', (timestamp, ip, f'Flip flop detected for {ip} - potential ARP spoofing'))
conn.commit()
except Exception as e:
print(f"Error processing log entry: {e}")
finally:
conn.close()
def get_vendor(self, mac):
"""Get vendor from MAC address"""
oui_map = {
'00:50:56': 'VMware',
'08:00:27': 'VirtualBox',
'52:54:00': 'QEMU',
'00:0c:29': 'VMware',
'00:1b:21': 'Intel',
'00:1e:68': 'Cisco',
'00:23:69': 'Cisco',
'00:d0:c9': 'Intel',
'b8:27:eb': 'Raspberry Pi',
'dc:a6:32': 'Raspberry Pi'
}
oui = mac[:8].upper()
return oui_map.get(oui, 'Unknown')
# Global dashboard instance
dashboard = ArpwatchDashboard()
@app.route('/')
def index():
"""Main dashboard page"""
return render_template('dashboard.html')
@app.route('/api/devices')
def api_devices():
"""API endpoint for device list"""
conn = sqlite3.connect(dashboard.db_file)
cursor = conn.cursor()
cursor.execute('''
SELECT ip, mac, vendor, first_seen, last_seen, status
FROM devices
ORDER BY last_seen DESC
''')
devices = []
for row in cursor.fetchall():
devices.append({
'ip': row[0],
'mac': row[1],
'vendor': row[2],
'first_seen': row[3],
'last_seen': row[4],
'status': row[5]
})
conn.close()
return jsonify(devices)
@app.route('/api/alerts')
def api_alerts():
"""API endpoint for alerts"""
conn = sqlite3.connect(dashboard.db_file)
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, alert_type, severity, ip, mac, description, acknowledged
FROM alerts
ORDER BY timestamp DESC
LIMIT 50
''')
alerts = []
for row in cursor.fetchall():
alerts.append({
'timestamp': row[0],
'alert_type': row[1],
'severity': row[2],
'ip': row[3],
'mac': row[4],
'description': row[5],
'acknowledged': row[6]
})
conn.close()
return jsonify(alerts)
@app.route('/api/statistics')
def api_statistics():
"""API endpoint for statistics"""
conn = sqlite3.connect(dashboard.db_file)
cursor = conn.cursor()
# Get device count
cursor.execute('SELECT COUNT(*) FROM devices WHERE status = "active"')
active_devices = cursor.fetchone()[0]
# Get alert counts
cursor.execute('SELECT COUNT(*) FROM alerts WHERE acknowledged = FALSE')
unacknowledged_alerts = cursor.fetchone()[0]
# Get recent events count
yesterday = datetime.now() - timedelta(days=1)
cursor.execute('SELECT COUNT(*) FROM events WHERE timestamp > ?', (yesterday,))
recent_events = cursor.fetchone()[0]
# Get vendor distribution
cursor.execute('''
SELECT vendor, COUNT(*) as count
FROM devices
WHERE status = "active"
GROUP BY vendor
ORDER BY count DESC
''')
vendor_distribution = []
for row in cursor.fetchall():
vendor_distribution.append({
'vendor': row[0],
'count': row[1]
})
conn.close()
return jsonify({
'active_devices': active_devices,
'unacknowledged_alerts': unacknowledged_alerts,
'recent_events': recent_events,
'vendor_distribution': vendor_distribution
})
def start_monitoring():
"""Start monitoring in background thread"""
monitor_thread = threading.Thread(target=dashboard.monitor_arpwatch_log)
monitor_thread.daemon = True
monitor_thread.start()
if __name__ == '__main__':
# Start monitoring
start_monitoring()
# Start web server
app.run(host='0.0.0.0', port=5000, debug=False)
Esta hoja de trampa completa de Arpwatch proporciona todo lo necesario para la vigilancia profesional de la red, el análisis de seguridad y la detección de intrusiones, desde el monitoreo básico de ARP hasta la implementación avanzada del panel de seguridad y la detección automática de amenazas.