
Zeek Cheatsheet¶
Überblick¶
Zeek (früher als Bro bekannt) ist eine leistungsstarke, Open-Source-Netzwerk-Sicherheitsüberwachungsplattform, die umfassende Netzwerk-Verkehrsanalyse und Intrusionserkennungsfunktionen bietet. Im Gegensatz zu herkömmlichen Signatur-basierten Intrusions-Detektionssystemen konzentriert sich Zeek auf die Netzwerksicherheitsüberwachung durch Tiefpaketinspektion, Protokollanalyse und Verhaltenserkennung. Es erzeugt detaillierte Protokolle der Netzwerkaktivität und kann anspruchsvolle Angriffe durch seine flexible Skriptsprache und umfangreiche Protokollanalysatoren erkennen.
Installation und Inbetriebnahme¶
Ubuntu/Debian Installation¶
```bash
Install from official repositories¶
sudo apt update sudo apt install -y zeek zeek-aux
Install from Zeek Security repository (latest version)¶
echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_20.04/ /' | sudo tee /etc/apt/sources.list.d/security:zeek.list | | curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_20.04/Release.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null | | sudo apt update sudo apt install -y zeek
Verify installation¶
zeek --version which zeek
Set up environment¶
echo 'export PATH=/opt/zeek/bin:$PATH' >> ~/.bashrc source ~/.bashrc
Create zeek user for security¶
sudo useradd -r -s /bin/false zeek sudo mkdir -p /opt/zeek/logs /opt/zeek/spool sudo chown -R zeek:zeek /opt/zeek/logs /opt/zeek/spool ```_
CentOS/RHEL Installation¶
```bash
Install EPEL repository¶
sudo yum install -y epel-release
Install dependencies¶
sudo yum install -y cmake make gcc gcc-c++ flex bison libpcap-devel openssl-devel python3-devel swig zlib-devel
Install from source (recommended for latest features)¶
cd /tmp wget https://download.zeek.org/zeek-4.2.1.tar.gz tar -xzf zeek-4.2.1.tar.gz cd zeek-4.2.1
Configure and compile¶
./configure --prefix=/opt/zeek make -j$(nproc) sudo make install
Add to PATH¶
echo 'export PATH=/opt/zeek/bin:$PATH' >> ~/.bashrc source ~/.bashrc
Create systemd service¶
sudo tee /etc/systemd/system/zeek.service << EOF [Unit] Description=Zeek Network Security Monitor After=network.target
[Service] Type=forking User=zeek Group=zeek ExecStart=/opt/zeek/bin/zeekctl start ExecStop=/opt/zeek/bin/zeekctl stop ExecReload=/opt/zeek/bin/zeekctl restart PIDFile=/opt/zeek/spool/zeek.pid
[Install] WantedBy=multi-user.target EOF
sudo systemctl daemon-reload sudo systemctl enable zeek ```_
Docker Installation¶
```bash
Pull official Zeek image¶
docker pull zeek/zeek:latest
Run Zeek container with network monitoring¶
docker run -d \ --name zeek-monitor \ --network host \ --cap-add NET_ADMIN \ --cap-add NET_RAW \ -v /opt/zeek/logs:/opt/zeek/logs \ -v /opt/zeek/etc:/opt/zeek/etc \ zeek/zeek:latest
Run interactive Zeek session¶
docker run -it --rm \ --network host \ --cap-add NET_ADMIN \ --cap-add NET_RAW \ zeek/zeek:latest /bin/bash
Custom Dockerfile for production¶
cat > Dockerfile << EOF FROM zeek/zeek:latest
Install additional tools¶
RUN apt-get update && apt-get install -y \ python3-pip \ jq \ curl \ vim
Install Python packages for log analysis¶
RUN pip3 install pandas matplotlib seaborn
Copy custom scripts¶
COPY scripts/ /opt/zeek/share/zeek/site/
Set working directory¶
WORKDIR /opt/zeek
Expose ports for management¶
EXPOSE 47760 47761
CMD ["/opt/zeek/bin/zeekctl", "deploy"] EOF
docker build -t custom-zeek . ```_
Grundkonfiguration¶
Netzwerkkonfiguration¶
```bash
Edit network configuration¶
sudo vim /opt/zeek/etc/networks.cfg
Define local networks¶
10.0.0.0/8 Private 172.16.0.0/12 Private 192.168.0.0/16 Private 127.0.0.0/8 Loopback
Edit node configuration¶
sudo vim /opt/zeek/etc/node.cfg
Single node configuration¶
[zeek] type=standalone host=localhost interface=eth0
Cluster configuration example¶
[manager] type=manager host=192.168.1.10
[proxy-1] type=proxy host=192.168.1.11
[worker-1] type=worker host=192.168.1.12 interface=eth0
[worker-2] type=worker host=192.168.1.13 interface=eth1
Configure zeekctl¶
sudo vim /opt/zeek/etc/zeekctl.cfg
Basic zeekctl configuration¶
LogRotationInterval = 3600 LogExpireInterval = 0 StatsLogEnable = 1 StatsLogExpireInterval = 0 StatusCmdShowAll = 0 CrashExpireInterval = 0 SitePolicyManager = manager SitePolicyWorker = worker ```_
Konfiguration der Site Policy¶
```bash
Edit site policy¶
sudo vim /opt/zeek/share/zeek/site/local.zeek
Basic site configuration¶
@load base/frameworks/software @load base/frameworks/files @load base/frameworks/notice @load base/frameworks/logging
Load additional scripts¶
@load protocols/conn/known-hosts @load protocols/conn/known-services @load protocols/dhcp/software @load protocols/dns/detect-external-names @load protocols/ftp/detect @load protocols/ftp/software @load protocols/http/detect-sqli @load protocols/http/detect-webapps @load protocols/http/software @load protocols/mysql/software @load protocols/smtp/software @load protocols/ssh/detect-bruteforcing @load protocols/ssh/geo-data @load protocols/ssh/interesting-hostnames @load protocols/ssh/software @load protocols/ssl/known-certs @load protocols/ssl/validate-certs
Custom configuration¶
redef ignore_checksums = T; redef HTTP::default_capture_password = T; redef FTP::default_capture_password = T;
Define local subnets¶
redef Site::local_nets = { 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, };
Email configuration for notices¶
redef Notice::mail_dest = "admin@example.com"; redef Notice::mail_subject_prefix = "[Zeek Notice]";
File extraction configuration¶
redef FileExtract::prefix = "/opt/zeek/extract/"; redef FileExtract::default_limit = 10485760; # 10MB ```_
Grundgeschäfte¶
ZeekControl Management¶
```bash
Initialize Zeek¶
sudo zeekctl
ZeekControl commands¶
zeekctl> install # Install configuration zeekctl> check # Check configuration zeekctl> start # Start Zeek zeekctl> stop # Stop Zeek zeekctl> restart # Restart Zeek zeekctl> status # Check status zeekctl> deploy # Deploy configuration changes
Command line operations¶
zeekctl install zeekctl check zeekctl start zeekctl status
Check for errors¶
zeekctl diag
Update and deploy¶
zeekctl install zeekctl deploy
Monitor processes¶
zeekctl top zeekctl ps ```_
Laufen Zeek manuell¶
```bash
Analyze live traffic¶
sudo zeek -i eth0
Analyze PCAP file¶
zeek -r capture.pcap
Run with specific scripts¶
zeek -r capture.pcap protocols/http/detect-sqli
Run with custom script¶
zeek -r capture.pcap custom-script.zeek
Generate specific logs¶
zeek -r capture.pcap LogAscii::use_json=T
Verbose output¶
zeek -r capture.pcap -v
Debug mode¶
zeek -r capture.pcap -d
Set log directory¶
zeek -r capture.pcap Log::default_logdir=/tmp/zeek-logs ```_
Verwaltung¶
```bash
View current logs¶
ls -la /opt/zeek/logs/current/
Common log files¶
conn.log # Connection summaries dns.log # DNS queries and responses http.log # HTTP requests and responses ssl.log # SSL/TLS handshake info files.log # File transfers notice.log # Security notices weird.log # Unusual network activity
Tail logs in real-time¶
tail -f /opt/zeek/logs/current/conn.log tail -f /opt/zeek/logs/current/http.log tail -f /opt/zeek/logs/current/notice.log
Rotate logs manually¶
zeekctl cron
Archive old logs¶
zeekctl cron enable ```_
Analyse der Ergebnisse¶
Verbindungslogik¶
```bash
View connection log¶
cat /opt/zeek/logs/current/conn.log
Connection log fields:¶
ts: timestamp¶
uid: unique connection identifier¶
id.orig_h: originator IP¶
id.orig_p: originator port¶
id.resp_h: responder IP¶
id.resp_p: responder port¶
proto: protocol¶
service: service type¶
duration: connection duration¶
orig_bytes: bytes from originator¶
resp_bytes: bytes from responder¶
conn_state: connection state¶
local_orig: local originator¶
local_resp: local responder¶
Filter connections by IP¶
grep "192.168.1.100" /opt/zeek/logs/current/conn.log
Filter by port¶
grep ":80\s" /opt/zeek/logs/current/conn.log grep ":443\s" /opt/zeek/logs/current/conn.log
Filter by protocol¶
grep "tcp" /opt/zeek/logs/current/conn.log grep "udp" /opt/zeek/logs/current/conn.log
Find long connections¶
awk '$9 > 3600' /opt/zeek/logs/current/conn.log
Find large data transfers¶
awk '$10 > 1000000' /opt/zeek/logs/current/conn.log ```_
HTTP Log Analyse¶
```bash
View HTTP log¶
cat /opt/zeek/logs/current/http.log
HTTP log fields:¶
ts: timestamp¶
uid: unique connection identifier¶
id.orig_h: client IP¶
id.resp_h: server IP¶
trans_depth: transaction depth¶
method: HTTP method¶
host: hostname¶
uri: URI¶
referrer: referrer¶
version: HTTP version¶
user_agent: user agent¶
request_body_len: request body length¶
response_body_len: response body length¶
status_code: HTTP status code¶
status_msg: status message¶
Filter by status code¶
grep "404" /opt/zeek/logs/current/http.log grep "500" /opt/zeek/logs/current/http.log
Filter by method¶
grep "POST" /opt/zeek/logs/current/http.log grep "GET" /opt/zeek/logs/current/http.log
Find suspicious user agents¶
| | grep -i "bot | crawler | scanner" /opt/zeek/logs/current/http.log | |
Find large requests/responses¶
awk '\(12 > 1000000' /opt/zeek/logs/current/http.log awk '\)13 > 1000000' /opt/zeek/logs/current/http.log
Extract unique hosts¶
| | awk '{print $7}' /opt/zeek/logs/current/http.log | sort | uniq -c | sort -nr | | ```_
DNS Analyse der Ergebnisse¶
```bash
View DNS log¶
cat /opt/zeek/logs/current/dns.log
DNS log fields:¶
ts: timestamp¶
uid: unique connection identifier¶
id.orig_h: client IP¶
id.resp_h: DNS server IP¶
proto: protocol (udp/tcp)¶
trans_id: transaction ID¶
rtt: round trip time¶
query: DNS query¶
qclass: query class¶
qclass_name: query class name¶
qtype: query type¶
qtype_name: query type name¶
rcode: response code¶
rcode_name: response code name¶
AA: authoritative answer¶
TC: truncated¶
RD: recursion desired¶
RA: recursion available¶
Z: reserved¶
answers: DNS answers¶
TTLs: time to live values¶
Find DNS queries for specific domain¶
grep "example.com" /opt/zeek/logs/current/dns.log
Find failed DNS queries¶
grep "NXDOMAIN" /opt/zeek/logs/current/dns.log
Find suspicious domains¶
| | grep -E ".(tk | ml | ga | cf)$" /opt/zeek/logs/current/dns.log | |
Extract unique queried domains¶
| | awk '{print $9}' /opt/zeek/logs/current/dns.log | sort | uniq -c | sort -nr | |
Find DNS tunneling (large TXT records)¶
grep "TXT" /opt/zeek/logs/current/dns.log | awk 'length($16) > 100' ```_
SSL/TLS Analyse der Ergebnisse¶
```bash
View SSL log¶
cat /opt/zeek/logs/current/ssl.log
SSL log fields:¶
ts: timestamp¶
uid: unique connection identifier¶
id.orig_h: client IP¶
id.resp_h: server IP¶
version: SSL/TLS version¶
cipher: cipher suite¶
curve: elliptic curve¶
server_name: server name (SNI)¶
resumed: session resumed¶
last_alert: last alert¶
next_protocol: next protocol¶
established: connection established¶
cert_chain_fuids: certificate chain file UIDs¶
client_cert_chain_fuids: client certificate chain¶
subject: certificate subject¶
issuer: certificate issuer¶
client_subject: client certificate subject¶
client_issuer: client certificate issuer¶
validation_status: certificate validation status¶
Find SSL/TLS versions¶
| | awk '{print $5}' /opt/zeek/logs/current/ssl.log | sort | uniq -c | |
Find weak ciphers¶
| | grep -E "(RC4 | DES | MD5)" /opt/zeek/logs/current/ssl.log | |
Find certificate validation failures¶
grep "unable to get local issuer certificate" /opt/zeek/logs/current/ssl.log
Extract server names (SNI)¶
| | awk '{print $8}' /opt/zeek/logs/current/ssl.log | sort | uniq -c | sort -nr | |
Find self-signed certificates¶
grep "self signed certificate" /opt/zeek/logs/current/ssl.log ```_
Erweiterte Analyse¶
Benutzerdefinierte Zeek Scripts¶
```zeek
Custom detection script: detect-port-scan.zeek¶
@load base/frameworks/notice
module PortScan;
export { redef enum Notice::Type += { Port_Scan, };
# Configuration
const scan_threshold = 20 &redef;
const scan_window = 5min &redef;
}
Track connection attempts per source IP¶
global scan_tracker: table[addr] of set[port] &create;_expire=scan_window;
event connection_attempt(c: connection) { local orig = c\(id\)orig_h; local resp_port = c\(id\)resp_p;
# Skip local traffic
if (Site::is_local_addr(orig))
return;
# Add port to tracker
if (orig !in scan_tracker)
scan_tracker[orig] = set();
add scan_tracker[orig][resp_port];
# Check threshold
| | if ( | scan_tracker[orig] | >= scan_threshold) | | { NOTICE([$note=Port_Scan, | | $msg=fmt("Port scan detected from %s (%d ports)", orig, | scan_tracker[orig] | ), | | $src=orig, $identifier=cat(orig)]);
# Reset tracker to avoid repeated notices
delete scan_tracker[orig];
}
}
event zeek_init() { print "Port scan detection loaded"; } ```_
```zeek
Custom detection script: detect-data-exfiltration.zeek¶
@load base/frameworks/notice @load base/frameworks/sumstats
module DataExfiltration;
export { redef enum Notice::Type += { Large_Upload, Suspicious_Upload, };
# Configuration
const upload_threshold = 100MB &redef;
const upload_window = 1hr &redef;
}
event zeek_init() { # Track upload volumes per source IP local r1: SumStats::Reducer = [$stream="upload.bytes", \(apply=set(SumStats::SUM)]; SumStats::create([\)name="upload-volume", $epoch=upload_window, $reducers=set(r1), \(threshold_val(key: SumStats::Key, result: SumStats::Result) = { return result["upload.bytes"]\)sum; }, \(threshold=upload_threshold, \(threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local src = key\)host; local bytes = result["upload.bytes"]\)sum;
NOTICE([$note=Large_Upload,
$msg=fmt("Large upload detected from %s (%s bytes)", src, bytes),
$src=src]);
}]);
}
event connection_state_remove(c: connection) { # Track outbound data transfers if (Site::is_local_addr(c\(id\)orig_h) && !Site::is_local_addr(c\(id\)resp_h)) { if (c?\(orig_bytes && c\)orig_bytes > 0) { SumStats::observe("upload.bytes", [\(host=c\)id\(orig_h], [\)num=c$orig_bytes]); } } } ```_
```zeek
Custom detection script: detect-dns-tunneling.zeek¶
@load base/frameworks/notice
module DNSTunneling;
export { redef enum Notice::Type += { DNS_Tunneling, Suspicious_DNS_Query, };
# Configuration
const max_query_length = 100 &redef;
const max_queries_per_minute = 50 &redef;
}
Track DNS queries per source¶
global dns_query_count: table[addr] of count &create;_expire=1min &default;=0;
event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count) { local orig = c\(id\)orig_h;
# Check query length
| | if ( | query | > max_query_length) | | { NOTICE([$note=Suspicious_DNS_Query, $msg=fmt("Unusually long DNS query from %s: %s", orig, query), $src=orig, $sub=query]); }
# Track query frequency
++dns_query_count[orig];
if (dns_query_count[orig] > max_queries_per_minute)
{
NOTICE([$note=DNS_Tunneling,
$msg=fmt("Possible DNS tunneling from %s (%d queries/min)", orig, dns_query_count[orig]),
$src=orig]);
}
# Check for suspicious patterns
if (/[0-9a-f]{32,}/ in query)
{
NOTICE([$note=Suspicious_DNS_Query,
$msg=fmt("Possible encoded data in DNS query from %s: %s", orig, query),
$src=orig,
$sub=query]);
}
} ```_
Log-Analyse Skripte¶
```python
!/usr/bin/env python3¶
""" Zeek Log Analyzer Comprehensive analysis of Zeek logs with statistics and visualizations """
import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import json import argparse import ipaddress from datetime import datetime, timedelta from collections import defaultdict, Counter import numpy as np
class ZeekLogAnalyzer: def init(self, log_directory="/opt/zeek/logs/current"): self.log_dir = log_directory self.dataframes = {}
def load_logs(self, log_types=['conn', 'http', 'dns', 'ssl']):
"""Load Zeek logs into pandas DataFrames"""
for log_type in log_types:
try:
log_file = f"{self.log_dir}/{log_type}.log"
# Read log file, skip comments
with open(log_file, 'r') as f:
lines = [line for line in f if not line.startswith('#')]
if lines:
# Get column names from header
with open(log_file, 'r') as f:
for line in f:
if line.startswith('#fields'):
columns = line.strip().split('\t')[1:]
break
# Create DataFrame
data = []
for line in lines:
data.append(line.strip().split('\t'))
df = pd.DataFrame(data, columns=columns)
# Convert timestamp
if 'ts' in df.columns:
df['ts'] = pd.to_datetime(df['ts'].astype(float), unit='s')
self.dataframes[log_type] = df
print(f"Loaded {len(df)} records from {log_type}.log")
except Exception as e:
print(f"Error loading {log_type}.log: {e}")
def analyze_connections(self):
"""Analyze connection patterns"""
if 'conn' not in self.dataframes:
print("Connection log not loaded")
return
df = self.dataframes['conn']
print("\n=== Connection Analysis ===")
# Basic statistics
total_connections = len(df)
unique_sources = df['id.orig_h'].nunique()
unique_destinations = df['id.resp_h'].nunique()
print(f"Total connections: {total_connections:,}")
print(f"Unique source IPs: {unique_sources:,}")
print(f"Unique destination IPs: {unique_destinations:,}")
# Protocol distribution
print("\nProtocol Distribution:")
protocol_counts = df['proto'].value_counts()
for proto, count in protocol_counts.head(10).items():
percentage = (count / total_connections) * 100
print(f" {proto}: {count:,} ({percentage:.1f}%)")
# Service distribution
print("\nTop Services:")
service_counts = df['service'].value_counts()
for service, count in service_counts.head(10).items():
if service != '-':
percentage = (count / total_connections) * 100
print(f" {service}: {count:,} ({percentage:.1f}%)")
# Connection states
print("\nConnection States:")
state_counts = df['conn_state'].value_counts()
for state, count in state_counts.head(10).items():
percentage = (count / total_connections) * 100
print(f" {state}: {count:,} ({percentage:.1f}%)")
# Top talkers (by connection count)
print("\nTop Source IPs (by connection count):")
top_sources = df['id.orig_h'].value_counts().head(10)
for ip, count in top_sources.items():
print(f" {ip}: {count:,} connections")
# Top destinations
print("\nTop Destination IPs (by connection count):")
top_destinations = df['id.resp_h'].value_counts().head(10)
for ip, count in top_destinations.items():
print(f" {ip}: {count:,} connections")
return {
'total_connections': total_connections,
'unique_sources': unique_sources,
'unique_destinations': unique_destinations,
'protocol_distribution': protocol_counts.to_dict(),
'service_distribution': service_counts.to_dict(),
'top_sources': top_sources.to_dict(),
'top_destinations': top_destinations.to_dict()
}
def analyze_http_traffic(self):
"""Analyze HTTP traffic patterns"""
if 'http' not in self.dataframes:
print("HTTP log not loaded")
return
df = self.dataframes['http']
print("\n=== HTTP Traffic Analysis ===")
# Basic statistics
total_requests = len(df)
unique_hosts = df['host'].nunique()
unique_user_agents = df['user_agent'].nunique()
print(f"Total HTTP requests: {total_requests:,}")
print(f"Unique hosts: {unique_hosts:,}")
print(f"Unique user agents: {unique_user_agents:,}")
# Method distribution
print("\nHTTP Methods:")
method_counts = df['method'].value_counts()
for method, count in method_counts.items():
percentage = (count / total_requests) * 100
print(f" {method}: {count:,} ({percentage:.1f}%)")
# Status code distribution
print("\nHTTP Status Codes:")
status_counts = df['status_code'].value_counts()
for status, count in status_counts.head(10).items():
percentage = (count / total_requests) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
# Top hosts
print("\nTop Requested Hosts:")
top_hosts = df['host'].value_counts().head(10)
for host, count in top_hosts.items():
if host != '-':
print(f" {host}: {count:,} requests")
# Top user agents
print("\nTop User Agents:")
top_user_agents = df['user_agent'].value_counts().head(5)
for ua, count in top_user_agents.items():
if ua != '-' and len(ua) < 100:
print(f" {ua}: {count:,} requests")
# Suspicious patterns
print("\nSuspicious Patterns:")
# Large requests/responses
if 'request_body_len' in df.columns:
df['request_body_len'] = pd.to_numeric(df['request_body_len'], errors='coerce')
large_requests = df[df['request_body_len'] > 1000000]
print(f" Large requests (>1MB): {len(large_requests)}")
if 'response_body_len' in df.columns:
df['response_body_len'] = pd.to_numeric(df['response_body_len'], errors='coerce')
large_responses = df[df['response_body_len'] > 10000000]
print(f" Large responses (>10MB): {len(large_responses)}")
# Error responses
error_responses = df[df['status_code'].isin(['400', '401', '403', '404', '500', '502', '503'])]
print(f" Error responses: {len(error_responses)} ({len(error_responses)/total_requests*100:.1f}%)")
return {
'total_requests': total_requests,
'unique_hosts': unique_hosts,
'method_distribution': method_counts.to_dict(),
'status_distribution': status_counts.to_dict(),
'top_hosts': top_hosts.to_dict()
}
def analyze_dns_traffic(self):
"""Analyze DNS traffic patterns"""
if 'dns' not in self.dataframes:
print("DNS log not loaded")
return
df = self.dataframes['dns']
print("\n=== DNS Traffic Analysis ===")
# Basic statistics
total_queries = len(df)
unique_queries = df['query'].nunique()
print(f"Total DNS queries: {total_queries:,}")
print(f"Unique queries: {unique_queries:,}")
# Query type distribution
print("\nDNS Query Types:")
qtype_counts = df['qtype_name'].value_counts()
for qtype, count in qtype_counts.head(10).items():
percentage = (count / total_queries) * 100
print(f" {qtype}: {count:,} ({percentage:.1f}%)")
# Response code distribution
print("\nDNS Response Codes:")
rcode_counts = df['rcode_name'].value_counts()
for rcode, count in rcode_counts.items():
percentage = (count / total_queries) * 100
print(f" {rcode}: {count:,} ({percentage:.1f}%)")
# Top queried domains
print("\nTop Queried Domains:")
top_queries = df['query'].value_counts().head(10)
for query, count in top_queries.items():
if query != '-':
print(f" {query}: {count:,} queries")
# Suspicious patterns
print("\nSuspicious DNS Patterns:")
# Long queries (possible tunneling)
long_queries = df[df['query'].str.len() > 50]
print(f" Long queries (>50 chars): {len(long_queries)}")
# Failed queries
failed_queries = df[df['rcode_name'] == 'NXDOMAIN']
print(f" Failed queries (NXDOMAIN): {len(failed_queries)} ({len(failed_queries)/total_queries*100:.1f}%)")
# Suspicious TLDs
suspicious_tlds = ['.tk', '.ml', '.ga', '.cf']
suspicious_domains = df[df['query'].str.contains('|'.join(suspicious_tlds), na=False)]
print(f" Suspicious TLD queries: {len(suspicious_domains)}")
return {
'total_queries': total_queries,
'unique_queries': unique_queries,
'qtype_distribution': qtype_counts.to_dict(),
'rcode_distribution': rcode_counts.to_dict(),
'top_queries': top_queries.to_dict()
}
def analyze_ssl_traffic(self):
"""Analyze SSL/TLS traffic patterns"""
if 'ssl' not in self.dataframes:
print("SSL log not loaded")
return
df = self.dataframes['ssl']
print("\n=== SSL/TLS Traffic Analysis ===")
# Basic statistics
total_connections = len(df)
unique_servers = df['server_name'].nunique()
print(f"Total SSL/TLS connections: {total_connections:,}")
print(f"Unique server names: {unique_servers:,}")
# Version distribution
print("\nSSL/TLS Versions:")
version_counts = df['version'].value_counts()
for version, count in version_counts.items():
percentage = (count / total_connections) * 100
print(f" {version}: {count:,} ({percentage:.1f}%)")
# Top server names
print("\nTop Server Names (SNI):")
top_servers = df['server_name'].value_counts().head(10)
for server, count in top_servers.items():
if server != '-':
print(f" {server}: {count:,} connections")
# Certificate validation status
if 'validation_status' in df.columns:
print("\nCertificate Validation Status:")
validation_counts = df['validation_status'].value_counts()
for status, count in validation_counts.items():
if status != '-':
percentage = (count / total_connections) * 100
print(f" {status}: {count:,} ({percentage:.1f}%)")
# Security analysis
print("\nSecurity Analysis:")
# Weak versions
weak_versions = df[df['version'].isin(['SSLv2', 'SSLv3', 'TLSv10'])]
print(f" Weak SSL/TLS versions: {len(weak_versions)} ({len(weak_versions)/total_connections*100:.1f}%)")
# Self-signed certificates
if 'validation_status' in df.columns:
self_signed = df[df['validation_status'].str.contains('self signed', na=False)]
print(f" Self-signed certificates: {len(self_signed)}")
return {
'total_connections': total_connections,
'unique_servers': unique_servers,
'version_distribution': version_counts.to_dict(),
'top_servers': top_servers.to_dict()
}
def generate_visualizations(self, output_dir="zeek_analysis"):
"""Generate visualization charts"""
import os
os.makedirs(output_dir, exist_ok=True)
# Set style
plt.style.use('seaborn-v0_8')
# Connection analysis charts
if 'conn' in self.dataframes:
df = self.dataframes['conn']
# Protocol distribution pie chart
plt.figure(figsize=(10, 6))
protocol_counts = df['proto'].value_counts().head(8)
plt.pie(protocol_counts.values, labels=protocol_counts.index, autopct='%1.1f%%')
plt.title('Protocol Distribution')
plt.savefig(f'{output_dir}/protocol_distribution.png', dpi=300, bbox_inches='tight')
plt.close()
# Connection timeline
if 'ts' in df.columns:
plt.figure(figsize=(12, 6))
df['hour'] = df['ts'].dt.hour
hourly_counts = df['hour'].value_counts().sort_index()
plt.bar(hourly_counts.index, hourly_counts.values)
plt.xlabel('Hour of Day')
plt.ylabel('Number of Connections')
plt.title('Connection Activity by Hour')
plt.savefig(f'{output_dir}/connection_timeline.png', dpi=300, bbox_inches='tight')
plt.close()
# HTTP analysis charts
if 'http' in self.dataframes:
df = self.dataframes['http']
# Status code distribution
plt.figure(figsize=(10, 6))
status_counts = df['status_code'].value_counts().head(10)
plt.bar(status_counts.index, status_counts.values)
plt.xlabel('HTTP Status Code')
plt.ylabel('Count')
plt.title('HTTP Status Code Distribution')
plt.xticks(rotation=45)
plt.savefig(f'{output_dir}/http_status_codes.png', dpi=300, bbox_inches='tight')
plt.close()
print(f"Visualizations saved to {output_dir}/")
def generate_report(self, output_file="zeek_analysis_report.json"):
"""Generate comprehensive analysis report"""
report = {
'analysis_time': datetime.now().isoformat(),
'log_directory': self.log_dir,
'summary': {}
}
# Run all analyses
if 'conn' in self.dataframes:
report['connection_analysis'] = self.analyze_connections()
if 'http' in self.dataframes:
report['http_analysis'] = self.analyze_http_traffic()
if 'dns' in self.dataframes:
report['dns_analysis'] = self.analyze_dns_traffic()
if 'ssl' in self.dataframes:
report['ssl_analysis'] = self.analyze_ssl_traffic()
# Save report
with open(output_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\nAnalysis report saved to {output_file}")
return report
def main(): parser = argparse.ArgumentParser(description='Zeek Log Analyzer') parser.add_argument('--log-dir', default='/opt/zeek/logs/current', help='Zeek log directory') parser.add_argument('--output-dir', default='zeek_analysis', help='Output directory for visualizations') parser.add_argument('--report', default='zeek_analysis_report.json', help='Output file for analysis report') parser.add_argument('--visualizations', action='store_true', help='Generate visualization charts')
args = parser.parse_args()
# Initialize analyzer
analyzer = ZeekLogAnalyzer(args.log_dir)
# Load logs
analyzer.load_logs()
# Generate report
analyzer.generate_report(args.report)
# Generate visualizations if requested
if args.visualizations:
analyzer.generate_visualizations(args.output_dir)
if name == "main": main() ```_
Threat Hunting Skripts¶
```bash
!/bin/bash¶
Zeek Threat Hunting Script¶
Automated threat detection using Zeek logs¶
LOG_DIR="/opt/zeek/logs/current" OUTPUT_DIR="/tmp/zeek-hunting" DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p "$OUTPUT_DIR"
echo "=== Zeek Threat Hunting Report - \(DATE ===" > "\)OUTPUT_DIR/threat_report_$DATE.txt"
Function to log findings¶
log_finding() { echo "\(1" | tee -a "\)OUTPUT_DIR/threat_report_$DATE.txt" }
log_finding "Starting threat hunting analysis..."
1. Port Scan Detection¶
log_finding "\n=== Port Scan Detection ===" if [ -f "\(LOG_DIR/conn.log" ]; then # Find IPs connecting to many different ports awk '{print (3, \(6}' "\)LOG_DIR/conn.log" | \ grep -v "id.orig_h" | \ | | sort | uniq | \ | | awk '{count[\)1]++} END {for (ip in count) if (count[ip] > 20) print ip, count[ip]}' | \ sort -k2 -nr > "\)OUTPUT_DIR/potential_port_scans_$DATE.txt"
if [ -s "$OUTPUT_DIR/potential_port_scans_$DATE.txt" ]; then
log_finding "Potential port scans detected:"
head -10 "$OUTPUT_DIR/potential_port_scans_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No port scans detected"
fi
fi
2. DNS Tunneling Detection¶
log_finding "\n=== DNS Tunneling Detection ===" if [ -f "\(LOG_DIR/dns.log" ]; then # Find unusually long DNS queries awk '{if (length(\)9) > 50) print \(3, \(9, length(\)9)}' "\)LOG_DIR/dns.log" | \ grep -v "id.orig_h" > "\(OUTPUT_DIR/long_dns_queries_\)DATE.txt"
if [ -s "$OUTPUT_DIR/long_dns_queries_$DATE.txt" ]; then
log_finding "Suspicious long DNS queries detected:"
head -5 "$OUTPUT_DIR/long_dns_queries_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No suspicious DNS queries detected"
fi
# Find high-frequency DNS queries from single source
awk '{print $3}' "$LOG_DIR/dns.log" | \
grep -v "id.orig_h" | \
| | sort | uniq -c | sort -nr | | | awk '$1 > 100 {print \(2, \(1}' > "\)OUTPUT_DIR/high_freq_dns_\)DATE.txt"
if [ -s "$OUTPUT_DIR/high_freq_dns_$DATE.txt" ]; then
log_finding "High-frequency DNS queries detected:"
head -5 "$OUTPUT_DIR/high_freq_dns_$DATE.txt" | while read line; do
log_finding " $line"
done
fi
fi
3. Data Exfiltration Detection¶
log_finding "\n=== Data Exfiltration Detection ===" if [ -f "\(LOG_DIR/conn.log" ]; then # Find large outbound data transfers awk '\)10 > 10000000 && $3 ~ /^192.168./ && $4 !~ /^192.168./ {print \(3, (4, \(10}' "\)LOG_DIR/conn.log" | \ sort -k3 -nr > "\)OUTPUT_DIR/large_uploads_\)DATE.txt"
if [ -s "$OUTPUT_DIR/large_uploads_$DATE.txt" ]; then
log_finding "Large outbound transfers detected:"
head -5 "$OUTPUT_DIR/large_uploads_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No large outbound transfers detected"
fi
fi
4. Suspicious HTTP Activity¶
log_finding "\n=== Suspicious HTTP Activity ===" if [ -f "\(LOG_DIR/http.log" ]; then # Find SQL injection attempts | | grep -i "union\ | select\ | insert\ | delete\ | drop\ | exec" "\)LOG_DIR/http.log" > "\(OUTPUT_DIR/sqli_attempts_\)DATE.txt" | |
if [ -s "$OUTPUT_DIR/sqli_attempts_$DATE.txt" ]; then
log_finding "Potential SQL injection attempts detected:"
wc -l "$OUTPUT_DIR/sqli_attempts_$DATE.txt" | awk '{print " " $1 " attempts"}'
else
log_finding "No SQL injection attempts detected"
fi
# Find XSS attempts
| | grep -i "script | javascript | onerror | onload" "\(LOG_DIR/http.log" > "\)OUTPUT_DIR/xss_attempts_$DATE.txt" | |
if [ -s "$OUTPUT_DIR/xss_attempts_$DATE.txt" ]; then
log_finding "Potential XSS attempts detected:"
wc -l "$OUTPUT_DIR/xss_attempts_$DATE.txt" | awk '{print " " $1 " attempts"}'
else
log_finding "No XSS attempts detected"
fi
# Find directory traversal attempts
| | grep -E "../ | ..\ | %2e%2e" "\(LOG_DIR/http.log" > "\)OUTPUT_DIR/directory_traversal_$DATE.txt" | |
if [ -s "$OUTPUT_DIR/directory_traversal_$DATE.txt" ]; then
log_finding "Directory traversal attempts detected:"
wc -l "$OUTPUT_DIR/directory_traversal_$DATE.txt" | awk '{print " " $1 " attempts"}'
else
log_finding "No directory traversal attempts detected"
fi
fi
5. SSL/TLS Security Issues¶
log_finding "\n=== SSL/TLS Security Issues ===" if [ -f "\(LOG_DIR/ssl.log" ]; then # Find weak SSL/TLS versions | | grep -E "SSLv2 | SSLv3 | TLSv10" "\)LOG_DIR/ssl.log" > "\(OUTPUT_DIR/weak_ssl_\)DATE.txt" | |
if [ -s "$OUTPUT_DIR/weak_ssl_$DATE.txt" ]; then
log_finding "Weak SSL/TLS versions detected:"
| | awk '{print \(5}' "\)OUTPUT_DIR/weak_ssl_$DATE.txt" | sort | uniq -c | while read line; do | | log_finding " $line" done else log_finding "No weak SSL/TLS versions detected" fi
# Find certificate validation failures
| | grep -i "unable to get local issuer | self signed | certificate verify failed" "\(LOG_DIR/ssl.log" > "\)OUTPUT_DIR/cert_issues_$DATE.txt" | |
if [ -s "$OUTPUT_DIR/cert_issues_$DATE.txt" ]; then
log_finding "Certificate validation issues detected:"
wc -l "$OUTPUT_DIR/cert_issues_$DATE.txt" | awk '{print " " $1 " issues"}'
else
log_finding "No certificate validation issues detected"
fi
fi
6. Malware Communication Detection¶
log_finding "\n=== Malware Communication Detection ===" if [ -f "\(LOG_DIR/conn.log" ]; then # Find connections to known malicious ports | | grep -E ":6667 | :6668 | :6669 | :1337 | :31337 | :4444 | :5555" "\)LOG_DIR/conn.log" > "\(OUTPUT_DIR/suspicious_ports_\)DATE.txt" | |
if [ -s "$OUTPUT_DIR/suspicious_ports_$DATE.txt" ]; then
log_finding "Connections to suspicious ports detected:"
| | awk '{print \(6}' "\)OUTPUT_DIR/suspicious_ports_$DATE.txt" | sort | uniq -c | while read line; do | | log_finding " $line" done else log_finding "No connections to suspicious ports detected" fi fi
7. Brute Force Detection¶
log_finding "\n=== Brute Force Detection ===" if [ -f "\(LOG_DIR/conn.log" ]; then # Find multiple failed SSH connections awk '\)6 == "22" && \(11 != "SF" {print \(3}' "\)LOG_DIR/conn.log" | \ | | sort | uniq -c | sort -nr | \ | | awk '\)1 > 10 {print \(2, \(1}' > "\)OUTPUT_DIR/ssh_brute_force_\)DATE.txt"
if [ -s "$OUTPUT_DIR/ssh_brute_force_$DATE.txt" ]; then
log_finding "Potential SSH brute force attacks detected:"
head -5 "$OUTPUT_DIR/ssh_brute_force_$DATE.txt" | while read line; do
log_finding " $line"
done
else
log_finding "No SSH brute force attacks detected"
fi
fi
Generate summary¶
log_finding "\n=== Summary ===" log_finding "Threat hunting analysis completed at $(date)" log_finding "Results saved to: $OUTPUT_DIR/"
Create IOC list¶
{ echo "# Indicators of Compromise - $DATE" echo "# Generated by Zeek Threat Hunting" echo ""
if [ -s "$OUTPUT_DIR/potential_port_scans_$DATE.txt" ]; then
echo "# Port Scan Sources"
awk '{print $1}' "$OUTPUT_DIR/potential_port_scans_$DATE.txt"
echo ""
fi
if [ -s "$OUTPUT_DIR/high_freq_dns_$DATE.txt" ]; then
echo "# High-Frequency DNS Sources"
awk '{print $1}' "$OUTPUT_DIR/high_freq_dns_$DATE.txt"
echo ""
fi
if [ -s "$OUTPUT_DIR/large_uploads_$DATE.txt" ]; then
echo "# Large Upload Sources"
awk '{print $1}' "$OUTPUT_DIR/large_uploads_$DATE.txt"
echo ""
fi
} > "\(OUTPUT_DIR/iocs_\)DATE.txt"
log_finding "IOCs saved to: \(OUTPUT_DIR/iocs_\)DATE.txt"
echo "Threat hunting analysis complete. Check $OUTPUT_DIR/ for detailed results." ```_
Integration und Automatisierung¶
ELK Stack Integration¶
```bash
Install Filebeat for log shipping¶
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.5.0-linux-x86_64.tar.gz tar xzvf filebeat-8.5.0-linux-x86_64.tar.gz cd filebeat-8.5.0-linux-x86_64/
Configure Filebeat for Zeek logs¶
cat > filebeat.yml << EOF filebeat.inputs: - type: log enabled: true paths: - /opt/zeek/logs/current/*.log exclude_files: ['.gz$'] fields: logtype: zeek fields_under_root: true multiline.pattern: '^#' multiline.negate: true multiline.match: after
output.elasticsearch: hosts: ["localhost:9200"] index: "zeek-logs-%{+yyyy.MM.dd}"
setup.template.name: "zeek-logs" setup.template.pattern: "zeek-logs-*"
logging.level: info logging.to_files: true logging.files: path: /var/log/filebeat name: filebeat keepfiles: 7 permissions: 0644 EOF
Start Filebeat¶
sudo ./filebeat -e -c filebeat.yml ```_
Integration von Splunk¶
```bash
Configure Splunk Universal Forwarder for Zeek logs¶
Install Splunk Universal Forwarder first¶
Create inputs.conf¶
sudo tee /opt/splunkforwarder/etc/system/local/inputs.conf << EOF [monitor:///opt/zeek/logs/current/*.log] disabled = false sourcetype = zeek index = zeek host_segment = 4
[monitor:///opt/zeek/logs/current/conn.log] sourcetype = zeek:conn
[monitor:///opt/zeek/logs/current/http.log] sourcetype = zeek:http
[monitor:///opt/zeek/logs/current/dns.log] sourcetype = zeek:dns
[monitor:///opt/zeek/logs/current/ssl.log] sourcetype = zeek:ssl
[monitor:///opt/zeek/logs/current/notice.log] sourcetype = zeek:notice EOF
Create props.conf for field extraction¶
sudo tee /opt/splunkforwarder/etc/system/local/props.conf << EOF [zeek] SHOULD_LINEMERGE = false LINE_BREAKER = ([\r\n]+) TIME_PREFIX = ^ TIME_FORMAT = %s.%6N TRUNCATE = 0
[zeek:conn]
EXTRACT-zeek_conn = (?
[zeek:http]
EXTRACT-zeek_http = (?
[zeek:dns]
EXTRACT-zeek_dns = (?
Restart Splunk Universal Forwarder¶
sudo /opt/splunkforwarder/bin/splunk restart ```_
SIEM Integrationsskript¶
```python
!/usr/bin/env python3¶
""" Zeek SIEM Integration Real-time log parsing and alert generation for SIEM systems """
import json import time import socket import syslog import requests from datetime import datetime from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import re
class ZeekSIEMIntegrator: def init(self, config_file="siem_config.json"): self.config = self.load_config(config_file) self.alert_rules = self.load_alert_rules()
def load_config(self, config_file):
"""Load SIEM integration configuration"""
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
# Default configuration
return {
"siem_type": "syslog",
"syslog_server": "localhost",
"syslog_port": 514,
"api_endpoint": None,
"api_key": None,
"log_directory": "/opt/zeek/logs/current",
"alert_threshold": {
"port_scan": 20,
"dns_queries": 100,
"large_transfer": 100000000
}
}
def load_alert_rules(self):
"""Load alert detection rules"""
return {
"port_scan": {
"pattern": r"multiple_ports",
"severity": "medium",
"description": "Port scan detected"
},
"sql_injection": {
| | "pattern": r"(union | select | insert | delete | drop | exec)", | | "severity": "high", "description": "SQL injection attempt detected" }, "xss_attempt": { | | "pattern": r"(script | javascript | onerror | onload)", | | "severity": "medium", "description": "XSS attempt detected" }, "dns_tunneling": { "pattern": r"long_dns_query", "severity": "high", "description": "Possible DNS tunneling detected" }, "malware_communication": { | | "pattern": r"(6667 | 6668 | 6669 | 1337 | 31337 | 4444 | 5555)", | | "severity": "high", "description": "Malware communication detected" } }
def send_syslog_alert(self, alert):
"""Send alert via syslog"""
try:
# Create syslog message
message = f"ZEEK_ALERT: {alert['description']} - Source: {alert.get('source_ip', 'unknown')} - Severity: {alert['severity']}"
# Send to syslog server
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(message.encode(), (self.config['syslog_server'], self.config['syslog_port']))
sock.close()
print(f"Syslog alert sent: {message}")
except Exception as e:
print(f"Error sending syslog alert: {e}")
def send_api_alert(self, alert):
"""Send alert via REST API"""
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f"Bearer {self.config['api_key']}"
}
payload = {
'timestamp': alert['timestamp'],
'severity': alert['severity'],
'description': alert['description'],
'source_ip': alert.get('source_ip'),
'destination_ip': alert.get('destination_ip'),
'details': alert.get('details', {})
}
response = requests.post(
self.config['api_endpoint'],
headers=headers,
json=payload,
timeout=10
)
if response.status_code == 200:
print(f"API alert sent successfully: {alert['description']}")
else:
print(f"API alert failed: {response.status_code} - {response.text}")
except Exception as e:
print(f"Error sending API alert: {e}")
def send_alert(self, alert):
"""Send alert to configured SIEM system"""
if self.config['siem_type'] == 'syslog':
self.send_syslog_alert(alert)
elif self.config['siem_type'] == 'api' and self.config['api_endpoint']:
self.send_api_alert(alert)
else:
print(f"Alert: {alert}")
def analyze_connection_log(self, line):
"""Analyze connection log for suspicious activity"""
fields = line.strip().split('\t')
if len(fields) < 12 or fields[0].startswith('#'):
return
try:
ts, uid, orig_h, orig_p, resp_h, resp_p, proto, service, duration, orig_bytes, resp_bytes, conn_state = fields[:12]
# Check for large data transfers
if orig_bytes.isdigit() and int(orig_bytes) > self.config['alert_threshold']['large_transfer']:
alert = {
'timestamp': datetime.now().isoformat(),
'severity': 'medium',
'description': 'Large outbound data transfer detected',
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'bytes_transferred': orig_bytes,
'protocol': proto,
'service': service
}
}
self.send_alert(alert)
except (ValueError, IndexError) as e:
print(f"Error parsing connection log: {e}")
def analyze_http_log(self, line):
"""Analyze HTTP log for web attacks"""
fields = line.strip().split('\t')
if len(fields) < 16 or fields[0].startswith('#'):
return
try:
ts, uid, orig_h, orig_p, resp_h, resp_p, trans_depth, method, host, uri, referrer, version, user_agent, request_body_len, response_body_len, status_code = fields[:16]
# Check for SQL injection
if re.search(self.alert_rules['sql_injection']['pattern'], uri, re.IGNORECASE):
alert = {
'timestamp': datetime.now().isoformat(),
'severity': self.alert_rules['sql_injection']['severity'],
'description': self.alert_rules['sql_injection']['description'],
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'method': method,
'host': host,
'uri': uri,
'user_agent': user_agent
}
}
self.send_alert(alert)
# Check for XSS attempts
if re.search(self.alert_rules['xss_attempt']['pattern'], uri, re.IGNORECASE):
alert = {
'timestamp': datetime.now().isoformat(),
'severity': self.alert_rules['xss_attempt']['severity'],
'description': self.alert_rules['xss_attempt']['description'],
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'method': method,
'host': host,
'uri': uri,
'user_agent': user_agent
}
}
self.send_alert(alert)
except (ValueError, IndexError) as e:
print(f"Error parsing HTTP log: {e}")
def analyze_dns_log(self, line):
"""Analyze DNS log for tunneling and suspicious queries"""
fields = line.strip().split('\t')
if len(fields) < 16 or fields[0].startswith('#'):
return
try:
ts, uid, orig_h, orig_p, resp_h, resp_p, proto, trans_id, rtt, query, qclass, qclass_name, qtype, qtype_name, rcode, rcode_name = fields[:16]
# Check for long DNS queries (possible tunneling)
if len(query) > 50:
alert = {
'timestamp': datetime.now().isoformat(),
'severity': 'high',
'description': 'Possible DNS tunneling detected (long query)',
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'query': query,
'query_length': len(query),
'query_type': qtype_name
}
}
self.send_alert(alert)
# Check for suspicious TLDs
suspicious_tlds = ['.tk', '.ml', '.ga', '.cf']
if any(tld in query for tld in suspicious_tlds):
alert = {
'timestamp': datetime.now().isoformat(),
'severity': 'medium',
'description': 'Query to suspicious TLD detected',
'source_ip': orig_h,
'destination_ip': resp_h,
'details': {
'query': query,
'query_type': qtype_name
}
}
self.send_alert(alert)
except (ValueError, IndexError) as e:
print(f"Error parsing DNS log: {e}")
class ZeekLogHandler(FileSystemEventHandler): def init(self, integrator): self.integrator = integrator self.log_processors = { 'conn.log': integrator.analyze_connection_log, 'http.log': integrator.analyze_http_log, 'dns.log': integrator.analyze_dns_log }
def on_modified(self, event):
if event.is_directory:
return
filename = event.src_path.split('/')[-1]
if filename in self.log_processors:
try:
with open(event.src_path, 'r') as f:
# Read only new lines (simple approach)
f.seek(0, 2) # Go to end
file_size = f.tell()
# Read last few lines
f.seek(max(0, file_size - 1024))
lines = f.readlines()
# Process last line (most recent)
if lines:
self.log_processors[filename](lines[-1])
except Exception as e:
print(f"Error processing {filename}: {e}")
def main(): # Initialize SIEM integrator integrator = ZeekSIEMIntegrator()
# Set up file monitoring
event_handler = ZeekLogHandler(integrator)
observer = Observer()
observer.schedule(event_handler, integrator.config['log_directory'], recursive=False)
print(f"Starting Zeek SIEM integration...")
print(f"Monitoring: {integrator.config['log_directory']}")
print(f"SIEM Type: {integrator.config['siem_type']}")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
print("Stopping Zeek SIEM integration...")
observer.join()
if name == "main": main() ```_
Diese umfassende Zeek cheatsheet bietet umfassende Abdeckung von Netzwerk-Sicherheitsüberwachung, Verkehrsanalyse, Log-Analyse, benutzerdefinierte Skriptentwicklung, Bedrohungsjagd und SIEM-Integration. Die mitgelieferten Skripte und Beispiele ermöglichen eine professionelle Netzwerksicherheitsüberwachung und Vorfallreaktion auf der Zeek-Plattform.