
Feuille de chaleur Zeek
Aperçu général
Zeek (anciennement appelé Bro) est une plate-forme de surveillance de la sécurité du réseau à source ouverte qui fournit une analyse complète du trafic réseau et des capacités de détection d'intrusion. Contrairement aux systèmes traditionnels de détection d'intrusion fondés sur la signature, Zeek se concentre sur la surveillance de la sécurité du réseau par l'inspection profonde des paquets, l'analyse des protocoles et la détection comportementale. Il génère des journaux détaillés de l'activité du réseau et peut détecter des attaques sophistiquées à travers son langage de script flexible et des analyseurs de protocole étendus.
Installation et configuration
Installation Ubuntu/Debian
# Install from official repositories
sudo apt update
sudo apt install -y zeek zeek-aux
# Install from Zeek Security repository (latest version)
echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_20.04/ /' | sudo tee /etc/apt/sources.list.d/security:zeek.list
curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_20.04/Release.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null
sudo apt update
sudo apt install -y zeek
# Verify installation
zeek --version
which zeek
# Set up environment
echo 'export PATH=/opt/zeek/bin:$PATH' >> ~/.bashrc
source ~/.bashrc
# Create zeek user for security
sudo useradd -r -s /bin/false zeek
sudo mkdir -p /opt/zeek/logs /opt/zeek/spool
sudo chown -R zeek:zeek /opt/zeek/logs /opt/zeek/spool
CentOS/RHEL Installation
# Install EPEL repository
sudo yum install -y epel-release
# Install dependencies
sudo yum install -y cmake make gcc gcc-c++ flex bison libpcap-devel openssl-devel python3-devel swig zlib-devel
# Install from source (recommended for latest features)
cd /tmp
wget https://download.zeek.org/zeek-4.2.1.tar.gz
tar -xzf zeek-4.2.1.tar.gz
cd zeek-4.2.1
# Configure and compile
./configure --prefix=/opt/zeek
make -j$(nproc)
sudo make install
# Add to PATH
echo 'export PATH=/opt/zeek/bin:$PATH' >> ~/.bashrc
source ~/.bashrc
# Create systemd service
sudo tee /etc/systemd/system/zeek.service << EOF
[Unit]
Description=Zeek Network Security Monitor
After=network.target
[Service]
Type=forking
User=zeek
Group=zeek
ExecStart=/opt/zeek/bin/zeekctl start
ExecStop=/opt/zeek/bin/zeekctl stop
ExecReload=/opt/zeek/bin/zeekctl restart
PIDFile=/opt/zeek/spool/zeek.pid
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable zeek
```_
### Installation Docker
```bash
# Pull official Zeek image
docker pull zeek/zeek:latest
# Run Zeek container with network monitoring
docker run -d \
--name zeek-monitor \
--network host \
--cap-add NET_ADMIN \
--cap-add NET_RAW \
-v /opt/zeek/logs:/opt/zeek/logs \
-v /opt/zeek/etc:/opt/zeek/etc \
zeek/zeek:latest
# Run interactive Zeek session
docker run -it --rm \
--network host \
--cap-add NET_ADMIN \
--cap-add NET_RAW \
zeek/zeek:latest /bin/bash
# Custom Dockerfile for production
cat > Dockerfile << EOF
FROM zeek/zeek:latest
# Install additional tools
RUN apt-get update && apt-get install -y \\
python3-pip \\
jq \\
curl \\
vim
# Install Python packages for log analysis
RUN pip3 install pandas matplotlib seaborn
# Copy custom scripts
COPY scripts/ /opt/zeek/share/zeek/site/
# Set working directory
WORKDIR /opt/zeek
# Expose ports for management
EXPOSE 47760 47761
CMD ["/opt/zeek/bin/zeekctl", "deploy"]
EOF
docker build -t custom-zeek .
```_
## Configuration de base
### Configuration du réseau
```bash
# Edit network configuration
sudo vim /opt/zeek/etc/networks.cfg
# Define local networks
10.0.0.0/8 Private
172.16.0.0/12 Private
192.168.0.0/16 Private
127.0.0.0/8 Loopback
# Edit node configuration
sudo vim /opt/zeek/etc/node.cfg
# Single node configuration
[zeek]
type=standalone
host=localhost
interface=eth0
# Cluster configuration example
[manager]
type=manager
host=192.168.1.10
[proxy-1]
type=proxy
host=192.168.1.11
[worker-1]
type=worker
host=192.168.1.12
interface=eth0
[worker-2]
type=worker
host=192.168.1.13
interface=eth1
# Configure zeekctl
sudo vim /opt/zeek/etc/zeekctl.cfg
# Basic zeekctl configuration
LogRotationInterval = 3600
LogExpireInterval = 0
StatsLogEnable = 1
StatsLogExpireInterval = 0
StatusCmdShowAll = 0
CrashExpireInterval = 0
SitePolicyManager = manager
SitePolicyWorker = worker
Configuration de la politique du site
# Edit site policy
sudo vim /opt/zeek/share/zeek/site/local.zeek
# Basic site configuration
@load base/frameworks/software
@load base/frameworks/files
@load base/frameworks/notice
@load base/frameworks/logging
# Load additional scripts
@load protocols/conn/known-hosts
@load protocols/conn/known-services
@load protocols/dhcp/software
@load protocols/dns/detect-external-names
@load protocols/ftp/detect
@load protocols/ftp/software
@load protocols/http/detect-sqli
@load protocols/http/detect-webapps
@load protocols/http/software
@load protocols/mysql/software
@load protocols/smtp/software
@load protocols/ssh/detect-bruteforcing
@load protocols/ssh/geo-data
@load protocols/ssh/interesting-hostnames
@load protocols/ssh/software
@load protocols/ssl/known-certs
@load protocols/ssl/validate-certs
# Custom configuration
redef ignore_checksums = T;
redef HTTP::default_capture_password = T;
redef FTP::default_capture_password = T;
# Define local subnets
redef Site::local_nets = {
10.0.0.0/8,
172.16.0.0/12,
192.168.0.0/16,
};
# Email configuration for notices
redef Notice::mail_dest = "admin@example.com";
redef Notice::mail_subject_prefix = "[Zeek Notice]";
# File extraction configuration
redef FileExtract::prefix = "/opt/zeek/extract/";
redef FileExtract::default_limit = 10485760; # 10MB
Opérations de base
Gestion de ZeekControl
# Initialize Zeek
sudo zeekctl
# ZeekControl commands
zeekctl> install # Install configuration
zeekctl> check # Check configuration
zeekctl> start # Start Zeek
zeekctl> stop # Stop Zeek
zeekctl> restart # Restart Zeek
zeekctl> status # Check status
zeekctl> deploy # Deploy configuration changes
# Command line operations
zeekctl install
zeekctl check
zeekctl start
zeekctl status
# Check for errors
zeekctl diag
# Update and deploy
zeekctl install
zeekctl deploy
# Monitor processes
zeekctl top
zeekctl ps
Exécuter Zeek manuellement
# Analyze live traffic
sudo zeek -i eth0
# Analyze PCAP file
zeek -r capture.pcap
# Run with specific scripts
zeek -r capture.pcap protocols/http/detect-sqli
# Run with custom script
zeek -r capture.pcap custom-script.zeek
# Generate specific logs
zeek -r capture.pcap LogAscii::use_json=T
# Verbose output
zeek -r capture.pcap -v
# Debug mode
zeek -r capture.pcap -d
# Set log directory
zeek -r capture.pcap Log::default_logdir=/tmp/zeek-logs
Gestion des journaux
# View current logs
ls -la /opt/zeek/logs/current/
# Common log files
conn.log # Connection summaries
dns.log # DNS queries and responses
http.log # HTTP requests and responses
ssl.log # SSL/TLS handshake info
files.log # File transfers
notice.log # Security notices
weird.log # Unusual network activity
# Tail logs in real-time
tail -f /opt/zeek/logs/current/conn.log
tail -f /opt/zeek/logs/current/http.log
tail -f /opt/zeek/logs/current/notice.log
# Rotate logs manually
zeekctl cron
# Archive old logs
zeekctl cron enable
Analyse du journal
Analyse du journal de connexion
# View connection log
cat /opt/zeek/logs/current/conn.log
# Connection log fields:
# ts: timestamp
# uid: unique connection identifier
# id.orig_h: originator IP
# id.orig_p: originator port
# id.resp_h: responder IP
# id.resp_p: responder port
# proto: protocol
# service: service type
# duration: connection duration
# orig_bytes: bytes from originator
# resp_bytes: bytes from responder
# conn_state: connection state
# local_orig: local originator
# local_resp: local responder
# Filter connections by IP
grep "192.168.1.100" /opt/zeek/logs/current/conn.log
# Filter by port
grep ":80\s" /opt/zeek/logs/current/conn.log
grep ":443\s" /opt/zeek/logs/current/conn.log
# Filter by protocol
grep "tcp" /opt/zeek/logs/current/conn.log
grep "udp" /opt/zeek/logs/current/conn.log
# Find long connections
awk '$9 > 3600' /opt/zeek/logs/current/conn.log
# Find large data transfers
awk '$10 > 1000000' /opt/zeek/logs/current/conn.log
Analyse du journal HTTP
# View HTTP log
cat /opt/zeek/logs/current/http.log
# HTTP log fields:
# ts: timestamp
# uid: unique connection identifier
# id.orig_h: client IP
# id.resp_h: server IP
# trans_depth: transaction depth
# method: HTTP method
# host: hostname
# uri: URI
# referrer: referrer
# version: HTTP version
# user_agent: user agent
# request_body_len: request body length
# response_body_len: response body length
# status_code: HTTP status code
# status_msg: status message
# Filter by status code
grep "404" /opt/zeek/logs/current/http.log
grep "500" /opt/zeek/logs/current/http.log
# Filter by method
grep "POST" /opt/zeek/logs/current/http.log
grep "GET" /opt/zeek/logs/current/http.log
# Find suspicious user agents
grep -i "bot\|crawler\|scanner" /opt/zeek/logs/current/http.log
# Find large requests/responses
awk '$12 > 1000000' /opt/zeek/logs/current/http.log
awk '$13 > 1000000' /opt/zeek/logs/current/http.log
# Extract unique hosts
awk '{print $7}' /opt/zeek/logs/current/http.log | sort | uniq -c | sort -nr
DNS Analyse du journal
# View DNS log
cat /opt/zeek/logs/current/dns.log
# DNS log fields:
# ts: timestamp
# uid: unique connection identifier
# id.orig_h: client IP
# id.resp_h: DNS server IP
# proto: protocol (udp/tcp)
# trans_id: transaction ID
# rtt: round trip time
# query: DNS query
# qclass: query class
# qclass_name: query class name
# qtype: query type
# qtype_name: query type name
# rcode: response code
# rcode_name: response code name
# AA: authoritative answer
# TC: truncated
# RD: recursion desired
# RA: recursion available
# Z: reserved
# answers: DNS answers
# TTLs: time to live values
# Find DNS queries for specific domain
grep "example.com" /opt/zeek/logs/current/dns.log
# Find failed DNS queries
grep "NXDOMAIN" /opt/zeek/logs/current/dns.log
# Find suspicious domains
grep -E "\.(tk|ml|ga|cf)$" /opt/zeek/logs/current/dns.log
# Extract unique queried domains
awk '{print $9}' /opt/zeek/logs/current/dns.log | sort | uniq -c | sort -nr
# Find DNS tunneling (large TXT records)
grep "TXT" /opt/zeek/logs/current/dns.log | awk 'length($16) > 100'
SSL/TLS Analyse du journal
# View SSL log
cat /opt/zeek/logs/current/ssl.log
# SSL log fields:
# ts: timestamp
# uid: unique connection identifier
# id.orig_h: client IP
# id.resp_h: server IP
# version: SSL/TLS version
# cipher: cipher suite
# curve: elliptic curve
# server_name: server name (SNI)
# resumed: session resumed
# last_alert: last alert
# next_protocol: next protocol
# established: connection established
# cert_chain_fuids: certificate chain file UIDs
# client_cert_chain_fuids: client certificate chain
# subject: certificate subject
# issuer: certificate issuer
# client_subject: client certificate subject
# client_issuer: client certificate issuer
# validation_status: certificate validation status
# Find SSL/TLS versions
awk '{print $5}' /opt/zeek/logs/current/ssl.log | sort | uniq -c
# Find weak ciphers
grep -E "(RC4|DES|MD5)" /opt/zeek/logs/current/ssl.log
# Find certificate validation failures
grep "unable to get local issuer certificate" /opt/zeek/logs/current/ssl.log
# Extract server names (SNI)
awk '{print $8}' /opt/zeek/logs/current/ssl.log | sort | uniq -c | sort -nr
# Find self-signed certificates
grep "self signed certificate" /opt/zeek/logs/current/ssl.log
Analyse avancée
Scripts Zeek personnalisés
# Custom detection script: detect-port-scan.zeek
@load base/frameworks/notice
module PortScan;
export {
redef enum Notice::Type += {
Port_Scan,
};
# Configuration
const scan_threshold = 20 &redef;
const scan_window = 5min &redef;
}
# Track connection attempts per source IP
global scan_tracker: table[addr] of set[port] &create_expire=scan_window;
event connection_attempt(c: connection)
{
local orig = c$id$orig_h;
local resp_port = c$id$resp_p;
# Skip local traffic
if (Site::is_local_addr(orig))
return;
# Add port to tracker
if (orig !in scan_tracker)
scan_tracker[orig] = set();
add scan_tracker[orig][resp_port];
# Check threshold
if (|scan_tracker[orig]| >= scan_threshold)
{
NOTICE([$note=Port_Scan,
$msg=fmt("Port scan detected from %s (%d ports)", orig, |scan_tracker[orig]|),
$src=orig,
$identifier=cat(orig)]);
# Reset tracker to avoid repeated notices
delete scan_tracker[orig];
}
}
event zeek_init()
{
print "Port scan detection loaded";
}
# Custom detection script: detect-data-exfiltration.zeek
@load base/frameworks/notice
@load base/frameworks/sumstats
module DataExfiltration;
export {
redef enum Notice::Type += {
Large_Upload,
Suspicious_Upload,
};
# Configuration
const upload_threshold = 100MB &redef;
const upload_window = 1hr &redef;
}
event zeek_init()
{
# Track upload volumes per source IP
local r1: SumStats::Reducer = [$stream="upload.bytes", $apply=set(SumStats::SUM)];
SumStats::create([$name="upload-volume",
$epoch=upload_window,
$reducers=set(r1),
$threshold_val(key: SumStats::Key, result: SumStats::Result) = {
return result["upload.bytes"]$sum;
},
$threshold=upload_threshold,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) = {
local src = key$host;
local bytes = result["upload.bytes"]$sum;
NOTICE([$note=Large_Upload,
$msg=fmt("Large upload detected from %s (%s bytes)", src, bytes),
$src=src]);
}]);
}
event connection_state_remove(c: connection)
{
# Track outbound data transfers
if (Site::is_local_addr(c$id$orig_h) && !Site::is_local_addr(c$id$resp_h))
{
if (c?$orig_bytes && c$orig_bytes > 0)
{
SumStats::observe("upload.bytes", [$host=c$id$orig_h], [$num=c$orig_bytes]);
}
}
}
# Custom detection script: detect-dns-tunneling.zeek
@load base/frameworks/notice
module DNSTunneling;
export {
redef enum Notice::Type += {
DNS_Tunneling,
Suspicious_DNS_Query,
};
# Configuration
const max_query_length = 100 &redef;
const max_queries_per_minute = 50 &redef;
}
# Track DNS queries per source
global dns_query_count: table[addr] of count &create_expire=1min &default=0;
event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count)
{
local orig = c$id$orig_h;
# Check query length
if (|query| > max_query_length)
{
NOTICE([$note=Suspicious_DNS_Query,
$msg=fmt("Unusually long DNS query from %s: %s", orig, query),
$src=orig,
$sub=query]);
}
# Track query frequency
++dns_query_count[orig];
if (dns_query_count[orig] > max_queries_per_minute)
{
NOTICE([$note=DNS_Tunneling,
$msg=fmt("Possible DNS tunneling from %s (%d queries/min)", orig, dns_query_count[orig]),
$src=orig]);
}
# Check for suspicious patterns
if (/[0-9a-f]{32,}/ in query)
{
NOTICE([$note=Suspicious_DNS_Query,
$msg=fmt("Possible encoded data in DNS query from %s: %s", orig, query),
$src=orig,
$sub=query]);
}
}
Scénarios d'analyse des journaux
#!/usr/bin/env python3
"""
Zeek Log Analyzer
Comprehensive analysis of Zeek logs with statistics and visualizations
"""
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import argparse
import ipaddress
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import numpy as np
class ZeekLogAnalyzer:
def __init__(self, log_directory="/opt/zeek/logs/current"):
self.log_dir = log_directory
self.dataframes = {}
def load_logs(self, log_types=['conn', 'http', 'dns', 'ssl']):
"""Load Zeek logs into pandas DataFrames"""
for log_type in log_types:
try:
log_file = f"{self.log_dir}/{log_type}.log"
# Read log file, skip comments
with open(log_file, 'r') as f:
lines = [line for line in f if not line.startswith('#')]
if lines:
# Get column names from header
with open(log_file, 'r') as f:
for line in f:
if line.startswith('#fields'):
columns = line.strip().split('\t')[1:]
break
# Create DataFrame
data = []
for line in lines:
data.append(line.strip().split('\t'))
df = pd.DataFrame(data, columns=columns)
# Convert timestamp
if 'ts' in df.columns:
df['ts'] = pd.to_datetime(df['ts'].astype(float), unit='s')
self.dataframes[log_type] = df
print(f"Loaded {len(df)} records from {log_type}.log")
except Exception as e:
print(f"Error loading {log_type}.log: {e}")
def analyze_connections(self):
"""Analyze connection patterns"""
if 'conn' not in self.dataframes:
print("Connection log not loaded")
return
df = self.dataframes['conn']
print("\n=== Connection Analysis ===")
# Basic statistics
total_connections = len(df)
unique_sources = df['id.orig_h'].nunique()
unique_destinations = df['id.resp_h'].nunique()
print(f"Total connections: {total_connections:,}")
print(f"Unique source IPs: {unique_sources:,}")
print(f"Unique destination IPs: {unique_destinations:,}")
# Protocol distribution
print("\nProtocol Distribution:")
protocol_counts = df['proto'].value_counts()
for proto, count in protocol_counts.head(10).items():
percentage = (count / total_connections) * 100
print(f" {proto}: {count:,} ({percentage:.1f}%)")
# Service distribution
print("\nTop Services:")
service_counts = df['service'].value_counts()
for service, count in service_counts.head(10).items():
if service != '-':
percentage = (count / total_connections) * 100
print(f" {service}: {count:,} ({percentage:.1f}%)")
# Connection states
print("\nConnection States:")
state_counts = df['conn_state'].value_counts()
for state, count in state_counts.head(10).items():
percentage = (count / total_connections) * 100
print(f" {state}: {count:,} ({percentage:.1f}%)")
# Top talkers (by connection count)
print("\nTop Source IPs (by connection count):")
top_sources = df['id.orig_h'].value_counts().head(10)
for ip, count in top_sources.items():
print(f" {ip}: {count:,} connections")
# Top destinations
print("\nTop Destination IPs (by connection count):")
top_destinations = df['id.resp_h'].value_counts().head(10)
for ip, count in top_destinations.items():
print(f" {ip}: {count:,} connections")
return {
'total_connections': total_connections,
'unique_sources': unique_sources,
'unique_destinations': unique_destinations,
'protocol_distribution': protocol_counts.to_dict(),
'service_distribution': service_counts.to_dict(),
'top_sources': top_sources.to_dict(),
'top_destinations': top_destinations.to_dict()
}
def analyze_http_traffic(self):
"""Analyze HTTP traffic patterns"""
if 'http' not in self.dataframes:
print("HTTP log not loaded")
return
df = self.dataframes['http']
print("\n=== HTTP Traffic Analysis ===")
# Basic statistics
total_requests = len(df)
unique_hosts = df['host'].nunique()
unique_user_agents = df['user_agent'].nunique()
print(f"Total HTTP requests: {total_requests:,}")
print(f"Unique hosts: {unique_hosts:,}")
print(f"Unique user agents: {unique_user_agents:,}")
# Method distribution
print("\nHTTP Methods:")
method_counts = df['method'].value_counts()
for method, count in method_counts.items():
percentage = (count / total_requests) * 100
print(f" {method}: {count:,} ({percentage:.1f}%)")
# Status code distribution
print("\nHTTP Status Codes:")
status_counts = df['status_code'].value_counts()
for status, count in status_counts.head(10).items():
percentage = (count / total_requests) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
# Top hosts
print("\nTop Requested Hosts:")
top_hosts = df['host'].value_counts().head(10)
for host, count in top_hosts.items():
if host != '-':
print(f" {host}: {count:,} requests")
# Top user agents
print("\nTop User Agents:")
top_user_agents = df['user_agent'].value_counts().head(5)
for ua, count in top_user_agents.items():
if ua != '-' and len(ua) < 100:
print(f" {ua}: {count:,} requests")
# Suspicious patterns
print("\nSuspicious Patterns:")
# Large requests/responses
if 'request_body_len' in df.columns:
df['request_body_len'] = pd.to_numeric(df['request_body_len'], errors='coerce')
large_requests = df[df['request_body_len'] > 1000000]
print(f" Large requests (>1MB): {len(large_requests)}")
if 'response_body_len' in df.columns:
df['response_body_len'] = pd.to_numeric(df['response_body_len'], errors='coerce')
large_responses = df[df['response_body_len'] > 10000000]
print(f" Large responses (>10MB): {len(large_responses)}")
# Error responses
error_responses = df[df['status_code'].isin(['400', '401', '403', '404', '500', '502', '503'])]
print(f" Error responses: {len(error_responses)} ({len(error_responses)/total_requests*100:.1f}%)")
return {
'total_requests': total_requests,
'unique_hosts': unique_hosts,
'method_distribution': method_counts.to_dict(),
'status_distribution': status_counts.to_dict(),
'top_hosts': top_hosts.to_dict()
}
def analyze_dns_traffic(self):
"""Analyze DNS traffic patterns"""
if 'dns' not in self.dataframes:
print("DNS log not loaded")
return
df = self.dataframes['dns']
print("\n=== DNS Traffic Analysis ===")
# Basic statistics
total_queries = len(df)
unique_queries = df['query'].nunique()
print(f"Total DNS queries: {total_queries:,}")
print(f"Unique queries: {unique_queries:,}")
# Query type distribution
print("\nDNS Query Types:")
qtype_counts = df['qtype_name'].value_counts()
for qtype, count in qtype_counts.head(10).items():
percentage = (count / total_queries) * 100
print(f" {qtype}: {count:,} ({percentage:.1f}%)")
# Response code distribution
print("\nDNS Response Codes:")
rcode_counts = df['rcode_name'].value_counts()
for rcode, count in rcode_counts.items():
percentage = (count / total_queries) * 100
print(f" {rcode}: {count:,} ({percentage:.1f}%)")
# Top queried domains
print("\nTop Queried Domains:")
top_queries = df['query'].value_counts().head(10)
for query, count in top_queries.items():
if query != '-':
print(f" {query}: {count:,} queries")
# Suspicious patterns
print("\nSuspicious DNS Patterns:")
# Long queries (possible tunneling)
long_queries = df[df['query'].str.len() > 50]
print(f" Long queries (>50 chars): {len(long_queries)}")
# Failed queries
failed_queries = df[df['rcode_name'] == 'NXDOMAIN']
print(f" Failed queries (NXDOMAIN): {len(failed_queries)} ({len(failed_queries)/total_queries*100:.1f}%)")
# Suspicious TLDs
suspicious_tlds = ['.tk', '.ml', '.ga', '.cf']
suspicious_domains = df[df['query'].str.contains('|'.join(suspicious_tlds), na=False)]
print(f" Suspicious TLD queries: {len(suspicious_domains)}")
return {
'total_queries': total_queries,
'unique_queries': unique_queries,
'qtype_distribution': qtype_counts.to_dict(),
'rcode_distribution': rcode_counts.to_dict(),
'top_queries': top_queries.to_dict()
}
def analyze_ssl_traffic(self):
"""Analyze SSL/TLS traffic patterns"""
if 'ssl' not in self.dataframes:
print("SSL log not loaded")
return
df = self.dataframes['ssl']
print("\n=== SSL/TLS Traffic Analysis ===")
# Basic statistics
total_connections = len(df)
unique_servers = df['server_name'].nunique()
print(f"Total SSL/TLS connections: {total_connections:,}")
print(f"Unique server names: {unique_servers:,}")
# Version distribution
print("\nSSL/TLS Versions:")
version_counts = df['version'].value_counts()
for version, count in version_counts.items():
percentage = (count / total_connections) * 100
print(f" {version}: {count:,} ({percentage:.1f}%)")
# Top server names
print("\nTop Server Names (SNI):")
top_servers = df['server_name'].value_counts().head(10)
for server, count in top_servers.items():
if server != '-':
print(f" {server}: {count:,} connections")
# Certificate validation status
if 'validation_status' in df.columns:
print("\nCertificate Validation Status:")
validation_counts = df['validation_status'].value_counts()
for status, count in validation_counts.items():
if status != '-':
percentage = (count / total_connections) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
# Security analysis
print("\nSecurity Analysis:")
# Weak versions
weak_versions = df[df['version'].isin(['SSLv2', 'SSLv3', 'TLSv10'])]
print(f" Weak SSL/TLS versions: {len(weak_versions)} ({len(weak_versions)/total_connections*100:.1f}%)")
# Self-signed certificates
if 'validation_status' in df.columns:
self_signed = df[df['validation_status'].str.contains('self signed', na=False)]
print(f" Self-signed certificates: {len(self_signed)}")
return {
'total_connections': total_connections,
'unique_servers': unique_servers,
'version_distribution': version_counts.to_dict(),
'top_servers': top_servers.to_dict()
}
def generate_visualizations(self, output_dir="zeek_analysis"):
"""Generate visualization charts"""
import os
os.makedirs(output_dir, exist_ok=True)
# Set style
plt.style.use('seaborn-v0_8')
# Connection analysis charts
if 'conn' in self.dataframes:
df = self.dataframes['conn']
# Protocol distribution pie chart
plt.figure(figsize=(10, 6))
protocol_counts = df['proto'].value_counts().head(8)
plt.pie(protocol_counts.values, labels=protocol_counts.index, autopct='%1.1f%%')
plt.title('Protocol Distribution')
plt.savefig(f'{output_dir}/protocol_distribution.png', dpi=300, bbox_inches='tight')
plt.close()
# Connection timeline
if 'ts' in df.columns:
plt.figure(figsize=(12, 6))
df['hour'] = df['ts'].dt.hour
hourly_counts = df['hour'].value_counts().sort_index()
plt.bar(hourly_counts.index, hourly_counts.values)
plt.xlabel('Hour of Day')
plt.ylabel('Number of Connections')
plt.title('Connection Activity by Hour')
plt.savefig(f'{output_dir}/connection_timeline.png', dpi=300, bbox_inches='tight')
plt.close()
# HTTP analysis charts
if 'http' in self.dataframes:
df = self.dataframes['http']
# Status code distribution
plt.figure(figsize=(10, 6))
status_counts = df['status_code'].value_counts().head(10)
plt.bar(status_counts.index, status_counts.values)
plt.xlabel('HTTP Status Code')
plt.ylabel('Count')
plt.title('HTTP Status Code Distribution')
plt.xticks(rotation=45)
plt.savefig(f'{output_dir}/http_status_codes.png', dpi=300, bbox_inches='tight')
plt.close()
print(f"Visualizations saved to {output_dir}/")
def generate_report(self, output_file="zeek_analysis_report.json"):
"""Generate comprehensive analysis report"""
report = {
'analysis_time': datetime.now().isoformat(),
'log_directory': self.log_dir,
'summary': {}
}
# Run all analyses
if 'conn' in self.dataframes:
report['connection_analysis'] = self.analyze_connections()
if 'http' in self.dataframes:
report['http_analysis'] = self.analyze_http_traffic()
if 'dns' in self.dataframes:
report['dns_analysis'] = self.analyze_dns_traffic()
if 'ssl' in self.dataframes:
report['ssl_analysis'] = self.analyze_ssl_traffic()
# Save report
with open(output_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\nAnalysis report saved to {output_file}")
return report
def main():
parser = argparse.ArgumentParser(description='Zeek Log Analyzer')
parser.add_argument('--log-dir', default='/opt/zeek/logs/current',
help='Zeek log directory')
parser.add_argument('--output-dir', default='zeek_analysis',
help='Output directory for visualizations')
parser.add_argument('--report', default='zeek_analysis_report.json',
help='Output file for analysis report')
parser.add_argument('--visualizations', action='store_true',
help='Generate visualization charts')
args = parser.parse_args()
# Initialize analyzer
analyzer = ZeekLogAnalyzer(args.log_dir)
# Load logs
analyzer.load_logs()
# Generate report
analyzer.generate_report(args.report)
# Generate visualizations if requested
if args.visualizations:
analyzer.generate_visualizations(args.output_dir)
if __name__ == "__main__":
main()
Scénarios de chasse aux menaces
#!/bin/bash
# Zeek Threat Hunting Script
# Automated threat detection using Zeek logs
LOG_DIR="/opt/zeek/logs/current"
OUTPUT_DIR="/tmp/zeek-hunting"
DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p "$OUTPUT_DIR"
echo "=== Zeek Threat Hunting Report - $DATE ===" > "$OUTPUT_DIR/threat_report_$DATE.txt"
# Function to log findings
log_finding() {
echo "$1" | tee -a "$OUTPUT_DIR/threat_report_$DATE.txt"
}
log_finding "Starting threat hunting analysis..."
# 1. Port Scan Detection
log_finding "\n=== Port Scan Detection ==="
if [ -f "$LOG_DIR/conn.log" ]; then
# Find IPs connecting to many different ports
awk '{print $3, $6}' "$LOG_DIR/conn.log" | \
grep -v "id.orig_h" | \
sort | uniq | \
awk '{count[$1]++} END {for (ip in count) if (count[ip] > 20) print ip, count[ip]}' | \
sort -k2 -nr > "$OUTPUT_DIR/potential_port_scans_$DATE.txt"
if [ -s "$OUTPUT_DIR/potential_port_scans_$DATE.txt" ]; then
log_finding "Potential port scans detected:"
head -10 "$OUTPUT_DIR/potential_port_scans_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No port scans detected"
fi
fi
# 2. DNS Tunneling Detection
log_finding "\n=== DNS Tunneling Detection ==="
if [ -f "$LOG_DIR/dns.log" ]; then
# Find unusually long DNS queries
awk '{if (length($9) > 50) print $3, $9, length($9)}' "$LOG_DIR/dns.log" | \
grep -v "id.orig_h" > "$OUTPUT_DIR/long_dns_queries_$DATE.txt"
if [ -s "$OUTPUT_DIR/long_dns_queries_$DATE.txt" ]; then
log_finding "Suspicious long DNS queries detected:"
head -5 "$OUTPUT_DIR/long_dns_queries_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No suspicious DNS queries detected"
fi
# Find high-frequency DNS queries from single source
awk '{print $3}' "$LOG_DIR/dns.log" | \
grep -v "id.orig_h" | \
sort | uniq -c | sort -nr | \
awk '$1 > 100 {print $2, $1}' > "$OUTPUT_DIR/high_freq_dns_$DATE.txt"
if [ -s "$OUTPUT_DIR/high_freq_dns_$DATE.txt" ]; then
log_finding "High-frequency DNS queries detected:"
head -5 "$OUTPUT_DIR/high_freq_dns_$DATE.txt" | while read line; do
log_finding " $line"
done
fi
fi
# 3. Data Exfiltration Detection
log_finding "\n=== Data Exfiltration Detection ==="
if [ -f "$LOG_DIR/conn.log" ]; then
# Find large outbound data transfers
awk '$10 > 10000000 && $3 ~ /^192\.168\./ && $4 !~ /^192\.168\./ {print $3, $4, $10}' "$LOG_DIR/conn.log" | \
sort -k3 -nr > "$OUTPUT_DIR/large_uploads_$DATE.txt"
if [ -s "$OUTPUT_DIR/large_uploads_$DATE.txt" ]; then
log_finding "Large outbound transfers detected:"
head -5 "$OUTPUT_DIR/large_uploads_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No large outbound transfers detected"
fi
fi
# 4. Suspicious HTTP Activity
log_finding "\n=== Suspicious HTTP Activity ==="
if [ -f "$LOG_DIR/http.log" ]; then
# Find SQL injection attempts
grep -i "union\|select\|insert\|delete\|drop\|exec" "$LOG_DIR/http.log" > "$OUTPUT_DIR/sqli_attempts_$DATE.txt"
if [ -s "$OUTPUT_DIR/sqli_attempts_$DATE.txt" ]; then
log_finding "Potential SQL injection attempts detected:"
wc -l "$OUTPUT_DIR/sqli_attempts_$DATE.txt" | awk '{print " " $1 " attempts"}'
else
log_finding "No SQL injection attempts detected"
fi
# Find XSS attempts
grep -i "script\|javascript\|onerror\|onload" "$LOG_DIR/http.log" > "$OUTPUT_DIR/xss_attempts_$DATE.txt"
if [ -s "$OUTPUT_DIR/xss_attempts_$DATE.txt" ]; then
log_finding "Potential XSS attempts detected:"
wc -l "$OUTPUT_DIR/xss_attempts_$DATE.txt" | awk '{print " " $1 " attempts"}'
else
log_finding "No XSS attempts detected"
fi
# Find directory traversal attempts
grep -E "\.\./|\.\.\\|%2e%2e" "$LOG_DIR/http.log" > "$OUTPUT_DIR/directory_traversal_$DATE.txt"
if [ -s "$OUTPUT_DIR/directory_traversal_$DATE.txt" ]; then
log_finding "Directory traversal attempts detected:"
wc -l "$OUTPUT_DIR/directory_traversal_$DATE.txt" | awk '{print " " $1 " attempts"}'
else
log_finding "No directory traversal attempts detected"
fi
fi
# 5. SSL/TLS Security Issues
log_finding "\n=== SSL/TLS Security Issues ==="
if [ -f "$LOG_DIR/ssl.log" ]; then
# Find weak SSL/TLS versions
grep -E "SSLv2|SSLv3|TLSv10" "$LOG_DIR/ssl.log" > "$OUTPUT_DIR/weak_ssl_$DATE.txt"
if [ -s "$OUTPUT_DIR/weak_ssl_$DATE.txt" ]; then
log_finding "Weak SSL/TLS versions detected:"
awk '{print $5}' "$OUTPUT_DIR/weak_ssl_$DATE.txt" | sort | uniq -c | while read line; do
log_finding " $line"
done
else
log_finding "No weak SSL/TLS versions detected"
fi
# Find certificate validation failures
grep -i "unable to get local issuer\|self signed\|certificate verify failed" "$LOG_DIR/ssl.log" > "$OUTPUT_DIR/cert_issues_$DATE.txt"
if [ -s "$OUTPUT_DIR/cert_issues_$DATE.txt" ]; then
log_finding "Certificate validation issues detected:"
wc -l "$OUTPUT_DIR/cert_issues_$DATE.txt" | awk '{print " " $1 " issues"}'
else
log_finding "No certificate validation issues detected"
fi
fi
# 6. Malware Communication Detection
log_finding "\n=== Malware Communication Detection ==="
if [ -f "$LOG_DIR/conn.log" ]; then
# Find connections to known malicious ports
grep -E ":6667|:6668|:6669|:1337|:31337|:4444|:5555" "$LOG_DIR/conn.log" > "$OUTPUT_DIR/suspicious_ports_$DATE.txt"
if [ -s "$OUTPUT_DIR/suspicious_ports_$DATE.txt" ]; then
log_finding "Connections to suspicious ports detected:"
awk '{print $6}' "$OUTPUT_DIR/suspicious_ports_$DATE.txt" | sort | uniq -c | while read line; do
log_finding " $line"
done
else
log_finding "No connections to suspicious ports detected"
fi
fi
# 7. Brute Force Detection
log_finding "\n=== Brute Force Detection ==="
if [ -f "$LOG_DIR/conn.log" ]; then
# Find multiple failed SSH connections
awk '$6 == "22" && $11 != "SF" {print $3}' "$LOG_DIR/conn.log" | \
sort | uniq -c | sort -nr | \
awk '$1 > 10 {print $2, $1}' > "$OUTPUT_DIR/ssh_brute_force_$DATE.txt"
if [ -s "$OUTPUT_DIR/ssh_brute_force_$DATE.txt" ]; then
log_finding "Potential SSH brute force attacks detected:"
head -5 "$OUTPUT_DIR/ssh_brute_force_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No SSH brute force attacks detected"
fi
fi
# Generate summary
log_finding "\n=== Summary ==="
log_finding "Threat hunting analysis completed at $(date)"
log_finding "Results saved to: $OUTPUT_DIR/"
# Create IOC list
{
echo "# Indicators of Compromise - $DATE"
echo "# Generated by Zeek Threat Hunting"
echo ""
if [ -s "$OUTPUT_DIR/potential_port_scans_$DATE.txt" ]; then
echo "# Port Scan Sources"
awk '{print $1}' "$OUTPUT_DIR/potential_port_scans_$DATE.txt"
echo ""
fi
if [ -s "$OUTPUT_DIR/high_freq_dns_$DATE.txt" ]; then
echo "# High-Frequency DNS Sources"
awk '{print $1}' "$OUTPUT_DIR/high_freq_dns_$DATE.txt"
echo ""
fi
if [ -s "$OUTPUT_DIR/large_uploads_$DATE.txt" ]; then
echo "# Large Upload Sources"
awk '{print $1}' "$OUTPUT_DIR/large_uploads_$DATE.txt"
echo ""
fi
} > "$OUTPUT_DIR/iocs_$DATE.txt"
log_finding "IOCs saved to: $OUTPUT_DIR/iocs_$DATE.txt"
echo "Threat hunting analysis complete. Check $OUTPUT_DIR/ for detailed results."
Intégration et automatisation
ELK Intégration des piles
# Install Filebeat for log shipping
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.5.0-linux-x86_64.tar.gz
tar xzvf filebeat-8.5.0-linux-x86_64.tar.gz
cd filebeat-8.5.0-linux-x86_64/
# Configure Filebeat for Zeek logs
cat > filebeat.yml << EOF
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/zeek/logs/current/*.log
exclude_files: ['\.gz$']
fields:
logtype: zeek
fields_under_root: true
multiline.pattern: '^#'
multiline.negate: true
multiline.match: after
output.elasticsearch:
hosts: ["localhost:9200"]
index: "zeek-logs-%{+yyyy.MM.dd}"
setup.template.name: "zeek-logs"
setup.template.pattern: "zeek-logs-*"
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
EOF
# Start Filebeat
sudo ./filebeat -e -c filebeat.yml
Intégration de spunk
# Configure Splunk Universal Forwarder for Zeek logs
# Install Splunk Universal Forwarder first
# Create inputs.conf
sudo tee /opt/splunkforwarder/etc/system/local/inputs.conf << EOF
[monitor:///opt/zeek/logs/current/*.log]
disabled = false
sourcetype = zeek
index = zeek
host_segment = 4
[monitor:///opt/zeek/logs/current/conn.log]
sourcetype = zeek:conn
[monitor:///opt/zeek/logs/current/http.log]
sourcetype = zeek:http
[monitor:///opt/zeek/logs/current/dns.log]
sourcetype = zeek:dns
[monitor:///opt/zeek/logs/current/ssl.log]
sourcetype = zeek:ssl
[monitor:///opt/zeek/logs/current/notice.log]
sourcetype = zeek:notice
EOF
# Create props.conf for field extraction
sudo tee /opt/splunkforwarder/etc/system/local/props.conf << EOF
[zeek]
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\r\n]+)
TIME_PREFIX = ^
TIME_FORMAT = %s.%6N
TRUNCATE = 0
[zeek:conn]
EXTRACT-zeek_conn = ^(?<ts>[^\t]+)\t(?<uid>[^\t]+)\t(?<orig_h>[^\t]+)\t(?<orig_p>[^\t]+)\t(?<resp_h>[^\t]+)\t(?<resp_p>[^\t]+)\t(?<proto>[^\t]+)\t(?<service>[^\t]+)\t(?<duration>[^\t]+)\t(?<orig_bytes>[^\t]+)\t(?<resp_bytes>[^\t]+)\t(?<conn_state>[^\t]+)
[zeek:http]
EXTRACT-zeek_http = ^(?<ts>[^\t]+)\t(?<uid>[^\t]+)\t(?<orig_h>[^\t]+)\t(?<orig_p>[^\t]+)\t(?<resp_h>[^\t]+)\t(?<resp_p>[^\t]+)\t(?<trans_depth>[^\t]+)\t(?<method>[^\t]+)\t(?<host>[^\t]+)\t(?<uri>[^\t]+)\t(?<referrer>[^\t]+)\t(?<version>[^\t]+)\t(?<user_agent>[^\t]+)\t(?<request_body_len>[^\t]+)\t(?<response_body_len>[^\t]+)\t(?<status_code>[^\t]+)
[zeek:dns]
EXTRACT-zeek_dns = ^(?<ts>[^\t]+)\t(?<uid>[^\t]+)\t(?<orig_h>[^\t]+)\t(?<orig_p>[^\t]+)\t(?<resp_h>[^\t]+)\t(?<resp_p>[^\t]+)\t(?<proto>[^\t]+)\t(?<trans_id>[^\t]+)\t(?<rtt>[^\t]+)\t(?<query>[^\t]+)\t(?<qclass>[^\t]+)\t(?<qclass_name>[^\t]+)\t(?<qtype>[^\t]+)\t(?<qtype_name>[^\t]+)\t(?<rcode>[^\t]+)\t(?<rcode_name>[^\t]+)
EOF
# Restart Splunk Universal Forwarder
sudo /opt/splunkforwarder/bin/splunk restart
Scénario d'intégration SIEM
#!/usr/bin/env python3
"""
Zeek SIEM Integration
Real-time log parsing and alert generation for SIEM systems
"""
import json
import time
import socket
import syslog
import requests
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import re
class ZeekSIEMIntegrator:
def __init__(self, config_file="siem_config.json"):
self.config = self.load_config(config_file)
self.alert_rules = self.load_alert_rules()
def load_config(self, config_file):
"""Load SIEM integration configuration"""
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
# Default configuration
return {
"siem_type": "syslog",
"syslog_server": "localhost",
"syslog_port": 514,
"api_endpoint": None,
"api_key": None,
"log_directory": "/opt/zeek/logs/current",
"alert_threshold": {
"port_scan": 20,
"dns_queries": 100,
"large_transfer": 100000000
}
}
def load_alert_rules(self):
"""Load alert detection rules"""
return {
"port_scan": {
"pattern": r"multiple_ports",
"severity": "medium",
"description": "Port scan detected"
},
"sql_injection": {
"pattern": r"(union|select|insert|delete|drop|exec)",
"severity": "high",
"description": "SQL injection attempt detected"
},
"xss_attempt": {
"pattern": r"(script|javascript|onerror|onload)",
"severity": "medium",
"description": "XSS attempt detected"
},
"dns_tunneling": {
"pattern": r"long_dns_query",
"severity": "high",
"description": "Possible DNS tunneling detected"
},
"malware_communication": {
"pattern": r"(6667|6668|6669|1337|31337|4444|5555)",
"severity": "high",
"description": "Malware communication detected"
}
}
def send_syslog_alert(self, alert):
"""Send alert via syslog"""
try:
# Create syslog message
message = f"ZEEK_ALERT: {alert['description']} - Source: {alert.get('source_ip', 'unknown')} - Severity: {alert['severity']}"
# Send to syslog server
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(message.encode(), (self.config['syslog_server'], self.config['syslog_port']))
sock.close()
print(f"Syslog alert sent: {message}")
except Exception as e:
print(f"Error sending syslog alert: {e}")
def send_api_alert(self, alert):
"""Send alert via REST API"""
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f"Bearer {self.config['api_key']}"
}
payload = {
'timestamp': alert['timestamp'],
'severity': alert['severity'],
'description': alert['description'],
'source_ip': alert.get('source_ip'),
'destination_ip': alert.get('destination_ip'),
'details': alert.get('details', {})
}
response = requests.post(
self.config['api_endpoint'],
headers=headers,
json=payload,
timeout=10
)
if response.status_code == 200:
print(f"API alert sent successfully: {alert['description']}")
else:
print(f"API alert failed: {response.status_code} - {response.text}")
except Exception as e:
print(f"Error sending API alert: {e}")
def send_alert(self, alert):
"""Send alert to configured SIEM system"""
if self.config['siem_type'] == 'syslog':
self.send_syslog_alert(alert)
elif self.config['siem_type'] == 'api' and self.config['api_endpoint']:
self.send_api_alert(alert)
else:
print(f"Alert: {alert}")
def analyze_connection_log(self, line):
"""Analyze connection log for suspicious activity"""
fields = line.strip().split('\t')
if len(fields) < 12 or fields[0].startswith('#'):
return
try:
ts, uid, orig_h, orig_p, resp_h, resp_p, proto, service, duration, orig_bytes, resp_bytes, conn_state = fields[:12]
# Check for large data transfers
if orig_bytes.isdigit() and int(orig_bytes) > self.config['alert_threshold']['large_transfer']:
alert = {
'timestamp': datetime.now().isoformat(),
'severity': 'medium',
'description': 'Large outbound data transfer detected',
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'bytes_transferred': orig_bytes,
'protocol': proto,
'service': service
}
}
self.send_alert(alert)
except (ValueError, IndexError) as e:
print(f"Error parsing connection log: {e}")
def analyze_http_log(self, line):
"""Analyze HTTP log for web attacks"""
fields = line.strip().split('\t')
if len(fields) < 16 or fields[0].startswith('#'):
return
try:
ts, uid, orig_h, orig_p, resp_h, resp_p, trans_depth, method, host, uri, referrer, version, user_agent, request_body_len, response_body_len, status_code = fields[:16]
# Check for SQL injection
if re.search(self.alert_rules['sql_injection']['pattern'], uri, re.IGNORECASE):
alert = {
'timestamp': datetime.now().isoformat(),
'severity': self.alert_rules['sql_injection']['severity'],
'description': self.alert_rules['sql_injection']['description'],
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'method': method,
'host': host,
'uri': uri,
'user_agent': user_agent
}
}
self.send_alert(alert)
# Check for XSS attempts
if re.search(self.alert_rules['xss_attempt']['pattern'], uri, re.IGNORECASE):
alert = {
'timestamp': datetime.now().isoformat(),
'severity': self.alert_rules['xss_attempt']['severity'],
'description': self.alert_rules['xss_attempt']['description'],
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'method': method,
'host': host,
'uri': uri,
'user_agent': user_agent
}
}
self.send_alert(alert)
except (ValueError, IndexError) as e:
print(f"Error parsing HTTP log: {e}")
def analyze_dns_log(self, line):
"""Analyze DNS log for tunneling and suspicious queries"""
fields = line.strip().split('\t')
if len(fields) < 16 or fields[0].startswith('#'):
return
try:
ts, uid, orig_h, orig_p, resp_h, resp_p, proto, trans_id, rtt, query, qclass, qclass_name, qtype, qtype_name, rcode, rcode_name = fields[:16]
# Check for long DNS queries (possible tunneling)
if len(query) > 50:
alert = {
'timestamp': datetime.now().isoformat(),
'severity': 'high',
'description': 'Possible DNS tunneling detected (long query)',
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'query': query,
'query_length': len(query),
'query_type': qtype_name
}
}
self.send_alert(alert)
# Check for suspicious TLDs
suspicious_tlds = ['.tk', '.ml', '.ga', '.cf']
if any(tld in query for tld in suspicious_tlds):
alert = {
'timestamp': datetime.now().isoformat(),
'severity': 'medium',
'description': 'Query to suspicious TLD detected',
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'query': query,
'query_type': qtype_name
}
}
self.send_alert(alert)
except (ValueError, IndexError) as e:
print(f"Error parsing DNS log: {e}")
class ZeekLogHandler(FileSystemEventHandler):
def __init__(self, integrator):
self.integrator = integrator
self.log_processors = {
'conn.log': integrator.analyze_connection_log,
'http.log': integrator.analyze_http_log,
'dns.log': integrator.analyze_dns_log
}
def on_modified(self, event):
if event.is_directory:
return
filename = event.src_path.split('/')[-1]
if filename in self.log_processors:
try:
with open(event.src_path, 'r') as f:
# Read only new lines (simple approach)
f.seek(0, 2) # Go to end
file_size = f.tell()
# Read last few lines
f.seek(max(0, file_size - 1024))
lines = f.readlines()
# Process last line (most recent)
if lines:
self.log_processors[filename](lines[-1])
except Exception as e:
print(f"Error processing {filename}: {e}")
def main():
# Initialize SIEM integrator
integrator = ZeekSIEMIntegrator()
# Set up file monitoring
event_handler = ZeekLogHandler(integrator)
observer = Observer()
observer.schedule(event_handler, integrator.config['log_directory'], recursive=False)
print(f"Starting Zeek SIEM integration...")
print(f"Monitoring: {integrator.config['log_directory']}")
print(f"SIEM Type: {integrator.config['siem_type']}")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
print("Stopping Zeek SIEM integration...")
observer.join()
if __name__ == "__main__":
main()
Cette Zeek tricheheet offre une couverture étendue de la surveillance de la sécurité du réseau, l'analyse du trafic, l'analyse des journaux, le développement de scripts personnalisés, la chasse aux menaces et l'intégration SIEM. Les scripts et exemples inclus permettent la surveillance de sécurité du réseau professionnel et la réponse incidente à l'aide de la plateforme Zeek.