Scapy
Scapy is a powerful Python library for packet manipulation, network scanning, and protocol analysis. Create custom packets, send them to network targets, and analyze responses.
Installation
Linux/Ubuntu
# Install scapy
pip3 install scapy
# Install with extra capabilities
pip3 install scapy[complete]
# For packet sniffing, may need libpcap
sudo apt install libpcap-dev
# Verify installation
python3 -c "import scapy; print(scapy.__version__)"
macOS
# Via pip
pip3 install scapy
# Via Homebrew
brew install scapy
# With optional dependencies
pip3 install scapy[complete]
Windows
# Via pip
pip install scapy
# For Windows, install Npcap
# Download from: https://nmap.org/npcap/
Basic Packet Operations
Create Packets
from scapy.all import *
# Create IP packet
ip = IP(dst="192.168.1.1")
# Create TCP packet
tcp = TCP(dport=80, flags="S")
# Create ICMP packet
icmp = ICMP()
# Stack layers (IP/TCP)
packet = IP(dst="192.168.1.1")/TCP(dport=80, flags="S")
# Show packet details
packet.show()
# Get raw bytes
raw_bytes = bytes(packet)
Send Packets
from scapy.all import *
# Send packet and wait for response (Layer 3)
response = sr1(IP(dst="192.168.1.1")/ICMP())
# Send multiple packets and collect responses
results = sr(IP(dst="192.168.1.1")/ICMP(), timeout=2)
# Send raw packet (Layer 2)
send(IP(dst="192.168.1.1")/TCP(dport=80))
# Send and display results
for sent, received in results:
print(f"Sent: {sent.summary()}")
print(f"Received: {received.summary()}")
Receive/Sniff Packets
from scapy.all import *
# Sniff packets from network
packets = sniff(iface="eth0", count=10)
# Sniff with filter (BPF)
packets = sniff(iface="eth0", filter="tcp port 80", count=100)
# Sniff with callback function
def packet_callback(pkt):
if IP in pkt:
print(f"IP Source: {pkt[IP].src} -> Dest: {pkt[IP].dst}")
if TCP in pkt:
print(f"TCP Port: {pkt[TCP].sport} -> {pkt[TCP].dport}")
sniff(prn=packet_callback, count=50, iface="eth0")
# Sniff and save to file
sniff(iface="eth0", prn=lambda x: x.summary(), offline="pcap_file.pcap")
Common Network Tasks
ARP Spoofing/Discovery
from scapy.all import *
# ARP request for IP
result, unanswered = arping("192.168.1.0/24", timeout=2)
# Display results
result.summary()
# ARP spoofing (requires root)
def arp_spoof(target_ip, spoof_ip):
packet = ARP(op="is-at", pdst=target_ip, hwdst="ff:ff:ff:ff:ff:ff", psrc=spoof_ip)
send(packet, verbose=False)
# Continuous ARP spoofing
import time
while True:
arp_spoof("192.168.1.100", "192.168.1.1")
time.sleep(1)
TCP/IP Scanning
from scapy.all import *
# SYN scan (stealth port scan)
def syn_scan(host, ports):
for port in ports:
packet = IP(dst=host)/TCP(dport=port, flags="S")
response = sr1(packet, timeout=1, verbose=0)
if response is None:
print(f"Port {port}: Filtered")
elif response[TCP].flags == "SA":
print(f"Port {port}: Open")
send(IP(dst=host)/TCP(dport=port, flags="R"), verbose=0)
else:
print(f"Port {port}: Closed")
syn_scan("192.168.1.1", [22, 80, 443, 3306, 5432])
# FIN scan
def fin_scan(host, port):
packet = IP(dst=host)/TCP(dport=port, flags="F")
response = sr1(packet, timeout=2, verbose=0)
if response is None or response[TCP].flags & 1:
print(f"Port {port}: Open|Filtered")
else:
print(f"Port {port}: Closed")
DNS Resolution
from scapy.all import *
# Create DNS query
dns_query = IP(dst="8.8.8.8")/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname="example.com"))
# Send and receive
response = sr1(dns_query, verbose=0)
# Parse response
if response and DNS in response:
response[DNS].show()
# Brute force DNS names
def dns_brute_force(domain, nameserver="8.8.8.8"):
wordlist = ["www", "mail", "ftp", "localhost", "webmail", "smtp", "webdisk"]
for word in wordlist:
query = IP(dst=nameserver)/UDP(dport=53)/DNS(
rd=1, qd=DNSQR(qname=f"{word}.{domain}")
)
response = sr1(query, timeout=1, verbose=0)
if response and DNS in response:
print(f"Found: {word}.{domain}")
Traceroute
from scapy.all import *
def traceroute(host, max_hops=30):
for ttl in range(1, max_hops + 1):
packet = IP(dst=host, ttl=ttl)/ICMP()
response = sr1(packet, timeout=2, verbose=0)
if response is None:
print(f"{ttl}: Request timed out")
elif ICMP in response and response[ICMP].type == 11:
print(f"{ttl}: {response[IP].src}")
elif ICMP in response and response[ICMP].type == 0:
print(f"{ttl}: {response[IP].src} (reached destination)")
break
traceroute("example.com")
Advanced Techniques
Custom Protocol Handling
from scapy.all import *
# Create custom layer
class CustomProto(Packet):
name = "CustomProto"
fields_desc = [
IntField("custom_field", 0),
StrField("data", "")
]
# Use custom layer
packet = IP(dst="192.168.1.1")/Custom Proto(custom_field=42, data="test")
Packet Crafting
from scapy.all import *
# Raw IP packet
ip_packet = IP(
src="192.168.1.100",
dst="192.168.1.1",
proto=6, # TCP
ttl=64
)
# TCP SYN packet with options
tcp_packet = TCP(
sport=RandShort(),
dport=80,
seq=RandInt(),
ack=0,
flags="S",
window=8192
)
# HTTP GET in TCP payload
raw_data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
full_packet = IP(dst="example.com")/TCP(dport=80, flags="PA")/Raw(load=raw_data)
send(full_packet)
Packet Analysis
from scapy.all import *
# Load pcap file
packets = rdpcap("capture.pcap")
# Filter and analyze
for packet in packets:
if IP in packet:
print(f"IP: {packet[IP].src} -> {packet[IP].dst}")
if TCP in packet:
print(f"TCP: {packet[TCP].sport} -> {packet[TCP].dport}")
if Raw in packet:
print(f"Data: {packet[Raw].load[:50]}")
# Extract specific data
http_packets = [pkt for pkt in packets if pkt.haslayer(Raw)]
for pkt in http_packets:
print(pkt[Raw].load.decode('utf-8', errors='ignore'))
Practical Examples
Network Discovery
from scapy.all import *
# Discover live hosts on network
def discover_hosts(network):
results, unanswered = arping(network)
print("Active hosts:")
for sent, received in results:
print(f"{received.psrc} ({received.hwsrc})")
discover_hosts("192.168.1.0/24")
Port Scanning
from scapy.all import *
import concurrent.futures
def scan_port(host, port):
packet = IP(dst=host)/TCP(dport=port, flags="S")
response = sr1(packet, timeout=1, verbose=0)
if response and TCP in response and response[TCP].flags == "SA":
return port, "Open"
return port, "Closed"
# Parallel scanning
host = "192.168.1.1"
ports = range(1, 65535)
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(lambda p: scan_port(host, p), ports)
for port, status in results:
if status == "Open":
open_ports.append(port)
print(f"Port {port}: Open")
print(f"\nOpen ports: {open_ports}")
Packet Sniffer
from scapy.all import *
def packet_sniffer(iface, packet_count=0):
def pkt_handler(pkt):
if IP in pkt:
print(f"\n[IP] {pkt[IP].src} -> {pkt[IP].dst}")
if TCP in pkt:
print(f"[TCP] Port {pkt[TCP].sport} -> {pkt[TCP].dport}")
elif UDP in pkt:
print(f"[UDP] Port {pkt[UDP].sport} -> {pkt[UDP].dport}")
if Raw in pkt:
print(f"[Data] {pkt[Raw].load[:100]}")
sniff(iface=iface, prn=pkt_handler, count=packet_count)
# Run sniffer on eth0
packet_sniffer("eth0", packet_count=100)
Common Filters
from scapy.all import *
# Sniff HTTP traffic
sniff(filter="tcp port 80", prn=lambda x: x.show())
# Sniff DNS queries
sniff(filter="udp port 53", prn=lambda x: x.show())
# Sniff SSH traffic
sniff(filter="tcp port 22", prn=lambda x: x.show())
# Sniff all traffic
sniff(iface="eth0")
# Save to pcap file
sniff(filter="tcp", offline="output.pcap", count=100)
Best Practices
- Run with appropriate privileges (sudo for sniffing)
- Use timeouts to prevent hanging
- Test on authorized networks only
- Handle exceptions for robustness
- Close resources properly
- Document all scripts
- Follow ethical guidelines
Last updated: 2025-03-30