Ir al contenido

Scapy

Scapy is a powerful Python library for packet manipulation, network scanning, and protocol analysis. Create custom packets, send them to network targets, and analyze responses.

Installation

Linux/Ubuntu

# Install scapy
pip3 install scapy

# Install with extra capabilities
pip3 install scapy[complete]

# For packet sniffing, may need libpcap
sudo apt install libpcap-dev

# Verify installation
python3 -c "import scapy; print(scapy.__version__)"

macOS

# Via pip
pip3 install scapy

# Via Homebrew
brew install scapy

# With optional dependencies
pip3 install scapy[complete]

Windows

# Via pip
pip install scapy

# For Windows, install Npcap
# Download from: https://nmap.org/npcap/

Basic Packet Operations

Create Packets

from scapy.all import *

# Create IP packet
ip = IP(dst="192.168.1.1")

# Create TCP packet
tcp = TCP(dport=80, flags="S")

# Create ICMP packet
icmp = ICMP()

# Stack layers (IP/TCP)
packet = IP(dst="192.168.1.1")/TCP(dport=80, flags="S")

# Show packet details
packet.show()

# Get raw bytes
raw_bytes = bytes(packet)

Send Packets

from scapy.all import *

# Send packet and wait for response (Layer 3)
response = sr1(IP(dst="192.168.1.1")/ICMP())

# Send multiple packets and collect responses
results = sr(IP(dst="192.168.1.1")/ICMP(), timeout=2)

# Send raw packet (Layer 2)
send(IP(dst="192.168.1.1")/TCP(dport=80))

# Send and display results
for sent, received in results:
    print(f"Sent: {sent.summary()}")
    print(f"Received: {received.summary()}")

Receive/Sniff Packets

from scapy.all import *

# Sniff packets from network
packets = sniff(iface="eth0", count=10)

# Sniff with filter (BPF)
packets = sniff(iface="eth0", filter="tcp port 80", count=100)

# Sniff with callback function
def packet_callback(pkt):
    if IP in pkt:
        print(f"IP Source: {pkt[IP].src} -> Dest: {pkt[IP].dst}")
    if TCP in pkt:
        print(f"TCP Port: {pkt[TCP].sport} -> {pkt[TCP].dport}")

sniff(prn=packet_callback, count=50, iface="eth0")

# Sniff and save to file
sniff(iface="eth0", prn=lambda x: x.summary(), offline="pcap_file.pcap")

Common Network Tasks

ARP Spoofing/Discovery

from scapy.all import *

# ARP request for IP
result, unanswered = arping("192.168.1.0/24", timeout=2)

# Display results
result.summary()

# ARP spoofing (requires root)
def arp_spoof(target_ip, spoof_ip):
    packet = ARP(op="is-at", pdst=target_ip, hwdst="ff:ff:ff:ff:ff:ff", psrc=spoof_ip)
    send(packet, verbose=False)

# Continuous ARP spoofing
import time
while True:
    arp_spoof("192.168.1.100", "192.168.1.1")
    time.sleep(1)

TCP/IP Scanning

from scapy.all import *

# SYN scan (stealth port scan)
def syn_scan(host, ports):
    for port in ports:
        packet = IP(dst=host)/TCP(dport=port, flags="S")
        response = sr1(packet, timeout=1, verbose=0)
        if response is None:
            print(f"Port {port}: Filtered")
        elif response[TCP].flags == "SA":
            print(f"Port {port}: Open")
            send(IP(dst=host)/TCP(dport=port, flags="R"), verbose=0)
        else:
            print(f"Port {port}: Closed")

syn_scan("192.168.1.1", [22, 80, 443, 3306, 5432])

# FIN scan
def fin_scan(host, port):
    packet = IP(dst=host)/TCP(dport=port, flags="F")
    response = sr1(packet, timeout=2, verbose=0)
    if response is None or response[TCP].flags & 1:
        print(f"Port {port}: Open|Filtered")
    else:
        print(f"Port {port}: Closed")

DNS Resolution

from scapy.all import *

# Create DNS query
dns_query = IP(dst="8.8.8.8")/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname="example.com"))

# Send and receive
response = sr1(dns_query, verbose=0)

# Parse response
if response and DNS in response:
    response[DNS].show()

# Brute force DNS names
def dns_brute_force(domain, nameserver="8.8.8.8"):
    wordlist = ["www", "mail", "ftp", "localhost", "webmail", "smtp", "webdisk"]
    for word in wordlist:
        query = IP(dst=nameserver)/UDP(dport=53)/DNS(
            rd=1, qd=DNSQR(qname=f"{word}.{domain}")
        )
        response = sr1(query, timeout=1, verbose=0)
        if response and DNS in response:
            print(f"Found: {word}.{domain}")

Traceroute

from scapy.all import *

def traceroute(host, max_hops=30):
    for ttl in range(1, max_hops + 1):
        packet = IP(dst=host, ttl=ttl)/ICMP()
        response = sr1(packet, timeout=2, verbose=0)

        if response is None:
            print(f"{ttl}: Request timed out")
        elif ICMP in response and response[ICMP].type == 11:
            print(f"{ttl}: {response[IP].src}")
        elif ICMP in response and response[ICMP].type == 0:
            print(f"{ttl}: {response[IP].src} (reached destination)")
            break

traceroute("example.com")

Advanced Techniques

Custom Protocol Handling

from scapy.all import *

# Create custom layer
class CustomProto(Packet):
    name = "CustomProto"
    fields_desc = [
        IntField("custom_field", 0),
        StrField("data", "")
    ]

# Use custom layer
packet = IP(dst="192.168.1.1")/Custom Proto(custom_field=42, data="test")

Packet Crafting

from scapy.all import *

# Raw IP packet
ip_packet = IP(
    src="192.168.1.100",
    dst="192.168.1.1",
    proto=6,  # TCP
    ttl=64
)

# TCP SYN packet with options
tcp_packet = TCP(
    sport=RandShort(),
    dport=80,
    seq=RandInt(),
    ack=0,
    flags="S",
    window=8192
)

# HTTP GET in TCP payload
raw_data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"

full_packet = IP(dst="example.com")/TCP(dport=80, flags="PA")/Raw(load=raw_data)
send(full_packet)

Packet Analysis

from scapy.all import *

# Load pcap file
packets = rdpcap("capture.pcap")

# Filter and analyze
for packet in packets:
    if IP in packet:
        print(f"IP: {packet[IP].src} -> {packet[IP].dst}")
    if TCP in packet:
        print(f"TCP: {packet[TCP].sport} -> {packet[TCP].dport}")
    if Raw in packet:
        print(f"Data: {packet[Raw].load[:50]}")

# Extract specific data
http_packets = [pkt for pkt in packets if pkt.haslayer(Raw)]
for pkt in http_packets:
    print(pkt[Raw].load.decode('utf-8', errors='ignore'))

Practical Examples

Network Discovery

from scapy.all import *

# Discover live hosts on network
def discover_hosts(network):
    results, unanswered = arping(network)
    print("Active hosts:")
    for sent, received in results:
        print(f"{received.psrc} ({received.hwsrc})")

discover_hosts("192.168.1.0/24")

Port Scanning

from scapy.all import *
import concurrent.futures

def scan_port(host, port):
    packet = IP(dst=host)/TCP(dport=port, flags="S")
    response = sr1(packet, timeout=1, verbose=0)
    if response and TCP in response and response[TCP].flags == "SA":
        return port, "Open"
    return port, "Closed"

# Parallel scanning
host = "192.168.1.1"
ports = range(1, 65535)
open_ports = []

with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
    results = executor.map(lambda p: scan_port(host, p), ports)
    for port, status in results:
        if status == "Open":
            open_ports.append(port)
            print(f"Port {port}: Open")

print(f"\nOpen ports: {open_ports}")

Packet Sniffer

from scapy.all import *

def packet_sniffer(iface, packet_count=0):
    def pkt_handler(pkt):
        if IP in pkt:
            print(f"\n[IP] {pkt[IP].src} -> {pkt[IP].dst}")
            if TCP in pkt:
                print(f"[TCP] Port {pkt[TCP].sport} -> {pkt[TCP].dport}")
            elif UDP in pkt:
                print(f"[UDP] Port {pkt[UDP].sport} -> {pkt[UDP].dport}")
            if Raw in pkt:
                print(f"[Data] {pkt[Raw].load[:100]}")

    sniff(iface=iface, prn=pkt_handler, count=packet_count)

# Run sniffer on eth0
packet_sniffer("eth0", packet_count=100)

Common Filters

from scapy.all import *

# Sniff HTTP traffic
sniff(filter="tcp port 80", prn=lambda x: x.show())

# Sniff DNS queries
sniff(filter="udp port 53", prn=lambda x: x.show())

# Sniff SSH traffic
sniff(filter="tcp port 22", prn=lambda x: x.show())

# Sniff all traffic
sniff(iface="eth0")

# Save to pcap file
sniff(filter="tcp", offline="output.pcap", count=100)

Best Practices

  • Run with appropriate privileges (sudo for sniffing)
  • Use timeouts to prevent hanging
  • Test on authorized networks only
  • Handle exceptions for robustness
  • Close resources properly
  • Document all scripts
  • Follow ethical guidelines

Last updated: 2025-03-30