Zum Inhalt
Zeek Logo

Zeek Cheatsheet

< onClick={() => {\cHFFFF} const content = document.querySelector('.markdown').inner Text: navigator.clipboard.writeText(Inhalt); Alarm ('Content kopiert in die Zwischenablage!'); }} Stillegung: HintergrundFarbe: '#10b981', Farbe: 'weiß', Grenze: 'nein', Polsterung: '0,5rem 1rem', Grenzübergang Radius: 0,375rem, Cursor: 'pointer', Schriftart Größe: '0.875rem', schriftartWeight: '500 ' }} > 📋 Kopieren in Clipboard < onClick={() => {\cHFFFF} Fenster.print(); }} Stillegung: HintergrundFarbe: '#3b82f6', Farbe: 'weiß', Grenze: 'nein', Polsterung: '0,5rem 1rem', Grenzübergang Radius: 0,375rem, Cursor: 'pointer', Schriftart Größe: '0.875rem', schriftartWeight: '500 ' }} > 🖨️ Drucken/PDF

Überblick

Zeek (früher als Bro bekannt) ist eine leistungsstarke, Open-Source-Netzwerk-Sicherheitsüberwachungsplattform, die umfassende Netzwerk-Verkehrsanalyse und Intrusionserkennungsfunktionen bietet. Im Gegensatz zu herkömmlichen Signatur-basierten Intrusions-Detektionssystemen konzentriert sich Zeek auf die Netzwerksicherheitsüberwachung durch Tiefpaketinspektion, Protokollanalyse und Verhaltenserkennung. Es erzeugt detaillierte Protokolle der Netzwerkaktivität und kann anspruchsvolle Angriffe durch seine flexible Skriptsprache und umfangreiche Protokollanalysatoren erkennen.

Installation und Inbetriebnahme

Ubuntu/Debian Installation

```bash

Install from official repositories

sudo apt update sudo apt install -y zeek zeek-aux

Install from Zeek Security repository (latest version)

echo 'deb http://download.opensuse.org/repositories/security:/zeek/xUbuntu_20.04/ /' | sudo tee /etc/apt/sources.list.d/security:zeek.list | curl -fsSL https://download.opensuse.org/repositories/security:zeek/xUbuntu_20.04/Release.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null | sudo apt update sudo apt install -y zeek

Verify installation

zeek --version which zeek

Set up environment

echo 'export PATH=/opt/zeek/bin:$PATH' >> ~/.bashrc source ~/.bashrc

Create zeek user for security

sudo useradd -r -s /bin/false zeek sudo mkdir -p /opt/zeek/logs /opt/zeek/spool sudo chown -R zeek:zeek /opt/zeek/logs /opt/zeek/spool ```_

CentOS/RHEL Installation

```bash

Install EPEL repository

sudo yum install -y epel-release

Install dependencies

sudo yum install -y cmake make gcc gcc-c++ flex bison libpcap-devel openssl-devel python3-devel swig zlib-devel

Install from source (recommended for latest features)

cd /tmp wget https://download.zeek.org/zeek-4.2.1.tar.gz tar -xzf zeek-4.2.1.tar.gz cd zeek-4.2.1

Configure and compile

./configure --prefix=/opt/zeek make -j$(nproc) sudo make install

Add to PATH

echo 'export PATH=/opt/zeek/bin:$PATH' >> ~/.bashrc source ~/.bashrc

Create systemd service

sudo tee /etc/systemd/system/zeek.service << EOF [Unit] Description=Zeek Network Security Monitor After=network.target

[Service] Type=forking User=zeek Group=zeek ExecStart=/opt/zeek/bin/zeekctl start ExecStop=/opt/zeek/bin/zeekctl stop ExecReload=/opt/zeek/bin/zeekctl restart PIDFile=/opt/zeek/spool/zeek.pid

[Install] WantedBy=multi-user.target EOF

sudo systemctl daemon-reload sudo systemctl enable zeek ```_

Docker Installation

```bash

Pull official Zeek image

docker pull zeek/zeek:latest

Run Zeek container with network monitoring

docker run -d \ --name zeek-monitor \ --network host \ --cap-add NET_ADMIN \ --cap-add NET_RAW \ -v /opt/zeek/logs:/opt/zeek/logs \ -v /opt/zeek/etc:/opt/zeek/etc \ zeek/zeek:latest

Run interactive Zeek session

docker run -it --rm \ --network host \ --cap-add NET_ADMIN \ --cap-add NET_RAW \ zeek/zeek:latest /bin/bash

Custom Dockerfile for production

cat > Dockerfile << EOF FROM zeek/zeek:latest

Install additional tools

RUN apt-get update && apt-get install -y \ python3-pip \ jq \ curl \ vim

Install Python packages for log analysis

RUN pip3 install pandas matplotlib seaborn

Copy custom scripts

COPY scripts/ /opt/zeek/share/zeek/site/

Set working directory

WORKDIR /opt/zeek

Expose ports for management

EXPOSE 47760 47761

CMD ["/opt/zeek/bin/zeekctl", "deploy"] EOF

docker build -t custom-zeek . ```_

Grundkonfiguration

Netzwerkkonfiguration

```bash

Edit network configuration

sudo vim /opt/zeek/etc/networks.cfg

Define local networks

10.0.0.0/8 Private 172.16.0.0/12 Private 192.168.0.0/16 Private 127.0.0.0/8 Loopback

Edit node configuration

sudo vim /opt/zeek/etc/node.cfg

Single node configuration

[zeek] type=standalone host=localhost interface=eth0

Cluster configuration example

[manager] type=manager host=192.168.1.10

[proxy-1] type=proxy host=192.168.1.11

[worker-1] type=worker host=192.168.1.12 interface=eth0

[worker-2] type=worker host=192.168.1.13 interface=eth1

Configure zeekctl

sudo vim /opt/zeek/etc/zeekctl.cfg

Basic zeekctl configuration

LogRotationInterval = 3600 LogExpireInterval = 0 StatsLogEnable = 1 StatsLogExpireInterval = 0 StatusCmdShowAll = 0 CrashExpireInterval = 0 SitePolicyManager = manager SitePolicyWorker = worker ```_

Konfiguration der Site Policy

```bash

Edit site policy

sudo vim /opt/zeek/share/zeek/site/local.zeek

Basic site configuration

@load base/frameworks/software @load base/frameworks/files @load base/frameworks/notice @load base/frameworks/logging

Load additional scripts

@load protocols/conn/known-hosts @load protocols/conn/known-services @load protocols/dhcp/software @load protocols/dns/detect-external-names @load protocols/ftp/detect @load protocols/ftp/software @load protocols/http/detect-sqli @load protocols/http/detect-webapps @load protocols/http/software @load protocols/mysql/software @load protocols/smtp/software @load protocols/ssh/detect-bruteforcing @load protocols/ssh/geo-data @load protocols/ssh/interesting-hostnames @load protocols/ssh/software @load protocols/ssl/known-certs @load protocols/ssl/validate-certs

Custom configuration

redef ignore_checksums = T; redef HTTP::default_capture_password = T; redef FTP::default_capture_password = T;

Define local subnets

redef Site::local_nets = { 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, };

Email configuration for notices

redef Notice::mail_dest = "admin@example.com"; redef Notice::mail_subject_prefix = "[Zeek Notice]";

File extraction configuration

redef FileExtract::prefix = "/opt/zeek/extract/"; redef FileExtract::default_limit = 10485760; # 10MB ```_

Grundgeschäfte

ZeekControl Management

```bash

Initialize Zeek

sudo zeekctl

ZeekControl commands

zeekctl> install # Install configuration zeekctl> check # Check configuration zeekctl> start # Start Zeek zeekctl> stop # Stop Zeek zeekctl> restart # Restart Zeek zeekctl> status # Check status zeekctl> deploy # Deploy configuration changes

Command line operations

zeekctl install zeekctl check zeekctl start zeekctl status

Check for errors

zeekctl diag

Update and deploy

zeekctl install zeekctl deploy

Monitor processes

zeekctl top zeekctl ps ```_

Laufen Zeek manuell

```bash

Analyze live traffic

sudo zeek -i eth0

Analyze PCAP file

zeek -r capture.pcap

Run with specific scripts

zeek -r capture.pcap protocols/http/detect-sqli

Run with custom script

zeek -r capture.pcap custom-script.zeek

Generate specific logs

zeek -r capture.pcap LogAscii::use_json=T

Verbose output

zeek -r capture.pcap -v

Debug mode

zeek -r capture.pcap -d

Set log directory

zeek -r capture.pcap Log::default_logdir=/tmp/zeek-logs ```_

Verwaltung

```bash

View current logs

ls -la /opt/zeek/logs/current/

Common log files

conn.log # Connection summaries dns.log # DNS queries and responses http.log # HTTP requests and responses ssl.log # SSL/TLS handshake info files.log # File transfers notice.log # Security notices weird.log # Unusual network activity

Tail logs in real-time

tail -f /opt/zeek/logs/current/conn.log tail -f /opt/zeek/logs/current/http.log tail -f /opt/zeek/logs/current/notice.log

Rotate logs manually

zeekctl cron

Archive old logs

zeekctl cron enable ```_

Analyse der Ergebnisse

Verbindungslogik

```bash

View connection log

cat /opt/zeek/logs/current/conn.log

Connection log fields:

ts: timestamp

uid: unique connection identifier

id.orig_h: originator IP

id.orig_p: originator port

id.resp_h: responder IP

id.resp_p: responder port

proto: protocol

service: service type

duration: connection duration

orig_bytes: bytes from originator

resp_bytes: bytes from responder

conn_state: connection state

local_orig: local originator

local_resp: local responder

Filter connections by IP

grep "192.168.1.100" /opt/zeek/logs/current/conn.log

Filter by port

grep ":80\s" /opt/zeek/logs/current/conn.log grep ":443\s" /opt/zeek/logs/current/conn.log

Filter by protocol

grep "tcp" /opt/zeek/logs/current/conn.log grep "udp" /opt/zeek/logs/current/conn.log

Find long connections

awk '$9 > 3600' /opt/zeek/logs/current/conn.log

Find large data transfers

awk '$10 > 1000000' /opt/zeek/logs/current/conn.log ```_

HTTP Log Analyse

```bash

View HTTP log

cat /opt/zeek/logs/current/http.log

HTTP log fields:

ts: timestamp

uid: unique connection identifier

id.orig_h: client IP

id.resp_h: server IP

trans_depth: transaction depth

method: HTTP method

host: hostname

uri: URI

referrer: referrer

version: HTTP version

user_agent: user agent

request_body_len: request body length

response_body_len: response body length

status_code: HTTP status code

status_msg: status message

Filter by status code

grep "404" /opt/zeek/logs/current/http.log grep "500" /opt/zeek/logs/current/http.log

Filter by method

grep "POST" /opt/zeek/logs/current/http.log grep "GET" /opt/zeek/logs/current/http.log

Find suspicious user agents

| grep -i "bot\ | crawler\ | scanner" /opt/zeek/logs/current/http.log |

Find large requests/responses

awk '$12 > 1000000' /opt/zeek/logs/current/http.log awk '$13 > 1000000' /opt/zeek/logs/current/http.log

Extract unique hosts

| awk '{print $7}' /opt/zeek/logs/current/http.log | sort | uniq -c | sort -nr | ```_

DNS Analyse der Ergebnisse

```bash

View DNS log

cat /opt/zeek/logs/current/dns.log

DNS log fields:

ts: timestamp

uid: unique connection identifier

id.orig_h: client IP

id.resp_h: DNS server IP

proto: protocol (udp/tcp)

trans_id: transaction ID

rtt: round trip time

query: DNS query

qclass: query class

qclass_name: query class name

qtype: query type

qtype_name: query type name

rcode: response code

rcode_name: response code name

AA: authoritative answer

TC: truncated

RD: recursion desired

RA: recursion available

Z: reserved

answers: DNS answers

TTLs: time to live values

Find DNS queries for specific domain

grep "example.com" /opt/zeek/logs/current/dns.log

Find failed DNS queries

grep "NXDOMAIN" /opt/zeek/logs/current/dns.log

Find suspicious domains

| grep -E ".(tk | ml | ga | cf)$" /opt/zeek/logs/current/dns.log |

Extract unique queried domains

| awk '{print $9}' /opt/zeek/logs/current/dns.log | sort | uniq -c | sort -nr |

Find DNS tunneling (large TXT records)

grep "TXT" /opt/zeek/logs/current/dns.log | awk 'length($16) > 100' ```_

SSL/TLS Analyse der Ergebnisse

```bash

View SSL log

cat /opt/zeek/logs/current/ssl.log

SSL log fields:

ts: timestamp

uid: unique connection identifier

id.orig_h: client IP

id.resp_h: server IP

version: SSL/TLS version

cipher: cipher suite

curve: elliptic curve

server_name: server name (SNI)

resumed: session resumed

last_alert: last alert

next_protocol: next protocol

established: connection established

cert_chain_fuids: certificate chain file UIDs

client_cert_chain_fuids: client certificate chain

subject: certificate subject

issuer: certificate issuer

client_subject: client certificate subject

client_issuer: client certificate issuer

validation_status: certificate validation status

Find SSL/TLS versions

| awk '{print $5}' /opt/zeek/logs/current/ssl.log | sort | uniq -c |

Find weak ciphers

| grep -E "(RC4 | DES | MD5)" /opt/zeek/logs/current/ssl.log |

Find certificate validation failures

grep "unable to get local issuer certificate" /opt/zeek/logs/current/ssl.log

Extract server names (SNI)

| awk '{print $8}' /opt/zeek/logs/current/ssl.log | sort | uniq -c | sort -nr |

Find self-signed certificates

grep "self signed certificate" /opt/zeek/logs/current/ssl.log ```_

Erweiterte Analyse

Benutzerdefinierte Zeek Scripts

```zeek

Custom detection script: detect-port-scan.zeek

@load base/frameworks/notice

module PortScan;

export { redef enum Notice::Type += { Port_Scan, };

# Configuration
const scan_threshold = 20 &redef;
const scan_window = 5min &redef;

}

Track connection attempts per source IP

global scan_tracker: table[addr] of set[port] &create;_expire=scan_window;

event connection_attempt(c: connection) { local orig = c$id$orig_h; local resp_port = c$id$resp_p;

# Skip local traffic
if (Site::is_local_addr(orig))
    return;

# Add port to tracker
if (orig !in scan_tracker)
    scan_tracker[orig] = set();

add scan_tracker[orig][resp_port];

# Check threshold

| if ( | scan_tracker[orig] | >= scan_threshold) | { NOTICE([$note=Port_Scan, | $msg=fmt("Port scan detected from %s (%d ports)", orig, | scan_tracker[orig] | ), | $src=orig, $identifier=cat(orig)]);

    # Reset tracker to avoid repeated notices
    delete scan_tracker[orig];
}

}

event zeek_init() { print "Port scan detection loaded"; } ```_

```zeek

Custom detection script: detect-data-exfiltration.zeek

@load base/frameworks/notice @load base/frameworks/sumstats

module DataExfiltration;

export { redef enum Notice::Type += { Large_Upload, Suspicious_Upload, };

# Configuration
const upload_threshold = 100MB &redef;
const upload_window = 1hr &redef;

}

event zeek_init() { # Track upload volumes per source IP local r1: SumStats::Reducer = [$stream="upload.bytes", $apply=set(SumStats::SUM)]; SumStats::create([$name="upload-volume", $epoch=upload_window, $reducers=set(r1), $threshold_val(key: SumStats::Key, result: SumStats::Result) = { return result["upload.bytes"]$sum; }, $threshold=upload_threshold, $threshold_crossed(key: SumStats::Key, result: SumStats::Result) = { local src = key$host; local bytes = result["upload.bytes"]$sum;

                      NOTICE([$note=Large_Upload,
                              $msg=fmt("Large upload detected from %s (%s bytes)", src, bytes),
                              $src=src]);
                  }]);

}

event connection_state_remove(c: connection) { # Track outbound data transfers if (Site::is_local_addr(c$id$orig_h) && !Site::is_local_addr(c$id$resp_h)) { if (c?$orig_bytes && c$orig_bytes > 0) { SumStats::observe("upload.bytes", [$host=c$id$orig_h], [$num=c$orig_bytes]); } } } ```_

```zeek

Custom detection script: detect-dns-tunneling.zeek

@load base/frameworks/notice

module DNSTunneling;

export { redef enum Notice::Type += { DNS_Tunneling, Suspicious_DNS_Query, };

# Configuration
const max_query_length = 100 &redef;
const max_queries_per_minute = 50 &redef;

}

Track DNS queries per source

global dns_query_count: table[addr] of count &create;_expire=1min &default;=0;

event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count) { local orig = c$id$orig_h;

# Check query length

| if ( | query | > max_query_length) | { NOTICE([$note=Suspicious_DNS_Query, $msg=fmt("Unusually long DNS query from %s: %s", orig, query), $src=orig, $sub=query]); }

# Track query frequency
++dns_query_count[orig];

if (dns_query_count[orig] > max_queries_per_minute)
{
    NOTICE([$note=DNS_Tunneling,
            $msg=fmt("Possible DNS tunneling from %s (%d queries/min)", orig, dns_query_count[orig]),
            $src=orig]);
}

# Check for suspicious patterns
if (/[0-9a-f]{32,}/ in query)
{
    NOTICE([$note=Suspicious_DNS_Query,
            $msg=fmt("Possible encoded data in DNS query from %s: %s", orig, query),
            $src=orig,
            $sub=query]);
}

} ```_

Log-Analyse Skripte

```python

!/usr/bin/env python3

""" Zeek Log Analyzer Comprehensive analysis of Zeek logs with statistics and visualizations """

import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import json import argparse import ipaddress from datetime import datetime, timedelta from collections import defaultdict, Counter import numpy as np

class ZeekLogAnalyzer: def init(self, log_directory="/opt/zeek/logs/current"): self.log_dir = log_directory self.dataframes = {}

def load_logs(self, log_types=['conn', 'http', 'dns', 'ssl']):
    """Load Zeek logs into pandas DataFrames"""
    for log_type in log_types:
        try:
            log_file = f"{self.log_dir}/{log_type}.log"

            # Read log file, skip comments
            with open(log_file, 'r') as f:
                lines = [line for line in f if not line.startswith('#')]

            if lines:
                # Get column names from header
                with open(log_file, 'r') as f:
                    for line in f:
                        if line.startswith('#fields'):
                            columns = line.strip().split('\t')[1:]
                            break

                # Create DataFrame
                data = []
                for line in lines:
                    data.append(line.strip().split('\t'))

                df = pd.DataFrame(data, columns=columns)

                # Convert timestamp
                if 'ts' in df.columns:
                    df['ts'] = pd.to_datetime(df['ts'].astype(float), unit='s')

                self.dataframes[log_type] = df
                print(f"Loaded {len(df)} records from {log_type}.log")

        except Exception as e:
            print(f"Error loading {log_type}.log: {e}")

def analyze_connections(self):
    """Analyze connection patterns"""
    if 'conn' not in self.dataframes:
        print("Connection log not loaded")
        return

    df = self.dataframes['conn']

    print("\n=== Connection Analysis ===")

    # Basic statistics
    total_connections = len(df)
    unique_sources = df['id.orig_h'].nunique()
    unique_destinations = df['id.resp_h'].nunique()

    print(f"Total connections: {total_connections:,}")
    print(f"Unique source IPs: {unique_sources:,}")
    print(f"Unique destination IPs: {unique_destinations:,}")

    # Protocol distribution
    print("\nProtocol Distribution:")
    protocol_counts = df['proto'].value_counts()
    for proto, count in protocol_counts.head(10).items():
        percentage = (count / total_connections) * 100
        print(f"  {proto}: {count:,} ({percentage:.1f}%)")

    # Service distribution
    print("\nTop Services:")
    service_counts = df['service'].value_counts()
    for service, count in service_counts.head(10).items():
        if service != '-':
            percentage = (count / total_connections) * 100
            print(f"  {service}: {count:,} ({percentage:.1f}%)")

    # Connection states
    print("\nConnection States:")
    state_counts = df['conn_state'].value_counts()
    for state, count in state_counts.head(10).items():
        percentage = (count / total_connections) * 100
        print(f"  {state}: {count:,} ({percentage:.1f}%)")

    # Top talkers (by connection count)
    print("\nTop Source IPs (by connection count):")
    top_sources = df['id.orig_h'].value_counts().head(10)
    for ip, count in top_sources.items():
        print(f"  {ip}: {count:,} connections")

    # Top destinations
    print("\nTop Destination IPs (by connection count):")
    top_destinations = df['id.resp_h'].value_counts().head(10)
    for ip, count in top_destinations.items():
        print(f"  {ip}: {count:,} connections")

    return {
        'total_connections': total_connections,
        'unique_sources': unique_sources,
        'unique_destinations': unique_destinations,
        'protocol_distribution': protocol_counts.to_dict(),
        'service_distribution': service_counts.to_dict(),
        'top_sources': top_sources.to_dict(),
        'top_destinations': top_destinations.to_dict()
    }

def analyze_http_traffic(self):
    """Analyze HTTP traffic patterns"""
    if 'http' not in self.dataframes:
        print("HTTP log not loaded")
        return

    df = self.dataframes['http']

    print("\n=== HTTP Traffic Analysis ===")

    # Basic statistics
    total_requests = len(df)
    unique_hosts = df['host'].nunique()
    unique_user_agents = df['user_agent'].nunique()

    print(f"Total HTTP requests: {total_requests:,}")
    print(f"Unique hosts: {unique_hosts:,}")
    print(f"Unique user agents: {unique_user_agents:,}")

    # Method distribution
    print("\nHTTP Methods:")
    method_counts = df['method'].value_counts()
    for method, count in method_counts.items():
        percentage = (count / total_requests) * 100
        print(f"  {method}: {count:,} ({percentage:.1f}%)")

    # Status code distribution
    print("\nHTTP Status Codes:")
    status_counts = df['status_code'].value_counts()
    for status, count in status_counts.head(10).items():
        percentage = (count / total_requests) * 100
        print(f"  {status}: {count:,} ({percentage:.1f}%)")

    # Top hosts
    print("\nTop Requested Hosts:")
    top_hosts = df['host'].value_counts().head(10)
    for host, count in top_hosts.items():
        if host != '-':
            print(f"  {host}: {count:,} requests")

    # Top user agents
    print("\nTop User Agents:")
    top_user_agents = df['user_agent'].value_counts().head(5)
    for ua, count in top_user_agents.items():
        if ua != '-' and len(ua) < 100:
            print(f"  {ua}: {count:,} requests")

    # Suspicious patterns
    print("\nSuspicious Patterns:")

    # Large requests/responses
    if 'request_body_len' in df.columns:
        df['request_body_len'] = pd.to_numeric(df['request_body_len'], errors='coerce')
        large_requests = df[df['request_body_len'] > 1000000]
        print(f"  Large requests (>1MB): {len(large_requests)}")

    if 'response_body_len' in df.columns:
        df['response_body_len'] = pd.to_numeric(df['response_body_len'], errors='coerce')
        large_responses = df[df['response_body_len'] > 10000000]
        print(f"  Large responses (>10MB): {len(large_responses)}")

    # Error responses
    error_responses = df[df['status_code'].isin(['400', '401', '403', '404', '500', '502', '503'])]
    print(f"  Error responses: {len(error_responses)} ({len(error_responses)/total_requests*100:.1f}%)")

    return {
        'total_requests': total_requests,
        'unique_hosts': unique_hosts,
        'method_distribution': method_counts.to_dict(),
        'status_distribution': status_counts.to_dict(),
        'top_hosts': top_hosts.to_dict()
    }

def analyze_dns_traffic(self):
    """Analyze DNS traffic patterns"""
    if 'dns' not in self.dataframes:
        print("DNS log not loaded")
        return

    df = self.dataframes['dns']

    print("\n=== DNS Traffic Analysis ===")

    # Basic statistics
    total_queries = len(df)
    unique_queries = df['query'].nunique()

    print(f"Total DNS queries: {total_queries:,}")
    print(f"Unique queries: {unique_queries:,}")

    # Query type distribution
    print("\nDNS Query Types:")
    qtype_counts = df['qtype_name'].value_counts()
    for qtype, count in qtype_counts.head(10).items():
        percentage = (count / total_queries) * 100
        print(f"  {qtype}: {count:,} ({percentage:.1f}%)")

    # Response code distribution
    print("\nDNS Response Codes:")
    rcode_counts = df['rcode_name'].value_counts()
    for rcode, count in rcode_counts.items():
        percentage = (count / total_queries) * 100
        print(f"  {rcode}: {count:,} ({percentage:.1f}%)")

    # Top queried domains
    print("\nTop Queried Domains:")
    top_queries = df['query'].value_counts().head(10)
    for query, count in top_queries.items():
        if query != '-':
            print(f"  {query}: {count:,} queries")

    # Suspicious patterns
    print("\nSuspicious DNS Patterns:")

    # Long queries (possible tunneling)
    long_queries = df[df['query'].str.len() > 50]
    print(f"  Long queries (>50 chars): {len(long_queries)}")

    # Failed queries
    failed_queries = df[df['rcode_name'] == 'NXDOMAIN']
    print(f"  Failed queries (NXDOMAIN): {len(failed_queries)} ({len(failed_queries)/total_queries*100:.1f}%)")

    # Suspicious TLDs
    suspicious_tlds = ['.tk', '.ml', '.ga', '.cf']
    suspicious_domains = df[df['query'].str.contains('|'.join(suspicious_tlds), na=False)]
    print(f"  Suspicious TLD queries: {len(suspicious_domains)}")

    return {
        'total_queries': total_queries,
        'unique_queries': unique_queries,
        'qtype_distribution': qtype_counts.to_dict(),
        'rcode_distribution': rcode_counts.to_dict(),
        'top_queries': top_queries.to_dict()
    }

def analyze_ssl_traffic(self):
    """Analyze SSL/TLS traffic patterns"""
    if 'ssl' not in self.dataframes:
        print("SSL log not loaded")
        return

    df = self.dataframes['ssl']

    print("\n=== SSL/TLS Traffic Analysis ===")

    # Basic statistics
    total_connections = len(df)
    unique_servers = df['server_name'].nunique()

    print(f"Total SSL/TLS connections: {total_connections:,}")
    print(f"Unique server names: {unique_servers:,}")

    # Version distribution
    print("\nSSL/TLS Versions:")
    version_counts = df['version'].value_counts()
    for version, count in version_counts.items():
        percentage = (count / total_connections) * 100
        print(f"  {version}: {count:,} ({percentage:.1f}%)")

    # Top server names
    print("\nTop Server Names (SNI):")
    top_servers = df['server_name'].value_counts().head(10)
    for server, count in top_servers.items():
        if server != '-':
            print(f"  {server}: {count:,} connections")

    # Certificate validation status
    if 'validation_status' in df.columns:
        print("\nCertificate Validation Status:")
        validation_counts = df['validation_status'].value_counts()
        for status, count in validation_counts.items():
            if status != '-':
                percentage = (count / total_connections) * 100
                print(f"  {status}: {count:,} ({percentage:.1f}%)")

    # Security analysis
    print("\nSecurity Analysis:")

    # Weak versions
    weak_versions = df[df['version'].isin(['SSLv2', 'SSLv3', 'TLSv10'])]
    print(f"  Weak SSL/TLS versions: {len(weak_versions)} ({len(weak_versions)/total_connections*100:.1f}%)")

    # Self-signed certificates
    if 'validation_status' in df.columns:
        self_signed = df[df['validation_status'].str.contains('self signed', na=False)]
        print(f"  Self-signed certificates: {len(self_signed)}")

    return {
        'total_connections': total_connections,
        'unique_servers': unique_servers,
        'version_distribution': version_counts.to_dict(),
        'top_servers': top_servers.to_dict()
    }

def generate_visualizations(self, output_dir="zeek_analysis"):
    """Generate visualization charts"""
    import os
    os.makedirs(output_dir, exist_ok=True)

    # Set style
    plt.style.use('seaborn-v0_8')

    # Connection analysis charts
    if 'conn' in self.dataframes:
        df = self.dataframes['conn']

        # Protocol distribution pie chart
        plt.figure(figsize=(10, 6))
        protocol_counts = df['proto'].value_counts().head(8)
        plt.pie(protocol_counts.values, labels=protocol_counts.index, autopct='%1.1f%%')
        plt.title('Protocol Distribution')
        plt.savefig(f'{output_dir}/protocol_distribution.png', dpi=300, bbox_inches='tight')
        plt.close()

        # Connection timeline
        if 'ts' in df.columns:
            plt.figure(figsize=(12, 6))
            df['hour'] = df['ts'].dt.hour
            hourly_counts = df['hour'].value_counts().sort_index()
            plt.bar(hourly_counts.index, hourly_counts.values)
            plt.xlabel('Hour of Day')
            plt.ylabel('Number of Connections')
            plt.title('Connection Activity by Hour')
            plt.savefig(f'{output_dir}/connection_timeline.png', dpi=300, bbox_inches='tight')
            plt.close()

    # HTTP analysis charts
    if 'http' in self.dataframes:
        df = self.dataframes['http']

        # Status code distribution
        plt.figure(figsize=(10, 6))
        status_counts = df['status_code'].value_counts().head(10)
        plt.bar(status_counts.index, status_counts.values)
        plt.xlabel('HTTP Status Code')
        plt.ylabel('Count')
        plt.title('HTTP Status Code Distribution')
        plt.xticks(rotation=45)
        plt.savefig(f'{output_dir}/http_status_codes.png', dpi=300, bbox_inches='tight')
        plt.close()

    print(f"Visualizations saved to {output_dir}/")

def generate_report(self, output_file="zeek_analysis_report.json"):
    """Generate comprehensive analysis report"""
    report = {
        'analysis_time': datetime.now().isoformat(),
        'log_directory': self.log_dir,
        'summary': {}
    }

    # Run all analyses
    if 'conn' in self.dataframes:
        report['connection_analysis'] = self.analyze_connections()

    if 'http' in self.dataframes:
        report['http_analysis'] = self.analyze_http_traffic()

    if 'dns' in self.dataframes:
        report['dns_analysis'] = self.analyze_dns_traffic()

    if 'ssl' in self.dataframes:
        report['ssl_analysis'] = self.analyze_ssl_traffic()

    # Save report
    with open(output_file, 'w') as f:
        json.dump(report, f, indent=2, default=str)

    print(f"\nAnalysis report saved to {output_file}")
    return report

def main(): parser = argparse.ArgumentParser(description='Zeek Log Analyzer') parser.add_argument('--log-dir', default='/opt/zeek/logs/current', help='Zeek log directory') parser.add_argument('--output-dir', default='zeek_analysis', help='Output directory for visualizations') parser.add_argument('--report', default='zeek_analysis_report.json', help='Output file for analysis report') parser.add_argument('--visualizations', action='store_true', help='Generate visualization charts')

args = parser.parse_args()

# Initialize analyzer
analyzer = ZeekLogAnalyzer(args.log_dir)

# Load logs
analyzer.load_logs()

# Generate report
analyzer.generate_report(args.report)

# Generate visualizations if requested
if args.visualizations:
    analyzer.generate_visualizations(args.output_dir)

if name == "main": main() ```_

Threat Hunting Skripts

```bash

!/bin/bash

Zeek Threat Hunting Script

Automated threat detection using Zeek logs

LOG_DIR="/opt/zeek/logs/current" OUTPUT_DIR="/tmp/zeek-hunting" DATE=$(date +%Y%m%d_%H%M%S)

mkdir -p "$OUTPUT_DIR"

echo "=== Zeek Threat Hunting Report - $DATE ===" > "$OUTPUT_DIR/threat_report_$DATE.txt"

Function to log findings

log_finding() { echo "$1" | tee -a "$OUTPUT_DIR/threat_report_$DATE.txt" }

log_finding "Starting threat hunting analysis..."

1. Port Scan Detection

log_finding "\n=== Port Scan Detection ===" if [ -f "$LOG_DIR/conn.log" ]; then # Find IPs connecting to many different ports awk '{print $3, $6}' "$LOG_DIR/conn.log" | \ grep -v "id.orig_h" | \ | sort | uniq | \ | awk '{count[$1]++} END {for (ip in count) if (count[ip] > 20) print ip, count[ip]}' | \ sort -k2 -nr > "$OUTPUT_DIR/potential_port_scans_$DATE.txt"

if [ -s "$OUTPUT_DIR/potential_port_scans_$DATE.txt" ]; then
    log_finding "Potential port scans detected:"
    head -10 "$OUTPUT_DIR/potential_port_scans_$DATE.txt" | while read line; do
        log_finding "  $line"
    done
else
    log_finding "No port scans detected"
fi

fi

2. DNS Tunneling Detection

log_finding "\n=== DNS Tunneling Detection ===" if [ -f "$LOG_DIR/dns.log" ]; then # Find unusually long DNS queries awk '{if (length($9) > 50) print $3, $9, length($9)}' "$LOG_DIR/dns.log" | \ grep -v "id.orig_h" > "$OUTPUT_DIR/long_dns_queries_$DATE.txt"

if [ -s "$OUTPUT_DIR/long_dns_queries_$DATE.txt" ]; then
    log_finding "Suspicious long DNS queries detected:"
    head -5 "$OUTPUT_DIR/long_dns_queries_$DATE.txt" | while read line; do
        log_finding "  $line"
    done
else
    log_finding "No suspicious DNS queries detected"
fi

# Find high-frequency DNS queries from single source
awk '{print $3}' "$LOG_DIR/dns.log" | \
grep -v "id.orig_h" | \

| sort | uniq -c | sort -nr | \ | awk '$1 > 100 {print $2, $1}' > "$OUTPUT_DIR/high_freq_dns_$DATE.txt"

if [ -s "$OUTPUT_DIR/high_freq_dns_$DATE.txt" ]; then
    log_finding "High-frequency DNS queries detected:"
    head -5 "$OUTPUT_DIR/high_freq_dns_$DATE.txt" | while read line; do
        log_finding "  $line"
    done
fi

fi

3. Data Exfiltration Detection

log_finding "\n=== Data Exfiltration Detection ===" if [ -f "$LOG_DIR/conn.log" ]; then # Find large outbound data transfers awk '$10 > 10000000 && $3 ~ /^192.168./ && $4 !~ /^192.168./ {print $3, $4, $10}' "$LOG_DIR/conn.log" | \ sort -k3 -nr > "$OUTPUT_DIR/large_uploads_$DATE.txt"

if [ -s "$OUTPUT_DIR/large_uploads_$DATE.txt" ]; then
    log_finding "Large outbound transfers detected:"
    head -5 "$OUTPUT_DIR/large_uploads_$DATE.txt" | while read line; do
        log_finding "  $line"
    done
else
    log_finding "No large outbound transfers detected"
fi

fi

4. Suspicious HTTP Activity

log_finding "\n=== Suspicious HTTP Activity ===" if [ -f "$LOG_DIR/http.log" ]; then # Find SQL injection attempts | grep -i "union\ | select\ | insert\ | delete\ | drop\ | exec" "$LOG_DIR/http.log" > "$OUTPUT_DIR/sqli_attempts_$DATE.txt" |

if [ -s "$OUTPUT_DIR/sqli_attempts_$DATE.txt" ]; then
    log_finding "Potential SQL injection attempts detected:"
    wc -l "$OUTPUT_DIR/sqli_attempts_$DATE.txt" | awk '{print "  " $1 " attempts"}'
else
    log_finding "No SQL injection attempts detected"
fi

# Find XSS attempts

| grep -i "script\ | javascript\ | onerror\ | onload" "$LOG_DIR/http.log" > "$OUTPUT_DIR/xss_attempts_$DATE.txt" |

if [ -s "$OUTPUT_DIR/xss_attempts_$DATE.txt" ]; then
    log_finding "Potential XSS attempts detected:"
    wc -l "$OUTPUT_DIR/xss_attempts_$DATE.txt" | awk '{print "  " $1 " attempts"}'
else
    log_finding "No XSS attempts detected"
fi

# Find directory traversal attempts

| grep -E "../ | ..\ | %2e%2e" "$LOG_DIR/http.log" > "$OUTPUT_DIR/directory_traversal_$DATE.txt" |

if [ -s "$OUTPUT_DIR/directory_traversal_$DATE.txt" ]; then
    log_finding "Directory traversal attempts detected:"
    wc -l "$OUTPUT_DIR/directory_traversal_$DATE.txt" | awk '{print "  " $1 " attempts"}'
else
    log_finding "No directory traversal attempts detected"
fi

fi

5. SSL/TLS Security Issues

log_finding "\n=== SSL/TLS Security Issues ===" if [ -f "$LOG_DIR/ssl.log" ]; then # Find weak SSL/TLS versions | grep -E "SSLv2 | SSLv3 | TLSv10" "$LOG_DIR/ssl.log" > "$OUTPUT_DIR/weak_ssl_$DATE.txt" |

if [ -s "$OUTPUT_DIR/weak_ssl_$DATE.txt" ]; then
    log_finding "Weak SSL/TLS versions detected:"

| awk '{print $5}' "$OUTPUT_DIR/weak_ssl_$DATE.txt" | sort | uniq -c | while read line; do | log_finding " $line" done else log_finding "No weak SSL/TLS versions detected" fi

# Find certificate validation failures

| grep -i "unable to get local issuer\ | self signed\ | certificate verify failed" "$LOG_DIR/ssl.log" > "$OUTPUT_DIR/cert_issues_$DATE.txt" |

if [ -s "$OUTPUT_DIR/cert_issues_$DATE.txt" ]; then
    log_finding "Certificate validation issues detected:"
    wc -l "$OUTPUT_DIR/cert_issues_$DATE.txt" | awk '{print "  " $1 " issues"}'
else
    log_finding "No certificate validation issues detected"
fi

fi

6. Malware Communication Detection

log_finding "\n=== Malware Communication Detection ===" if [ -f "$LOG_DIR/conn.log" ]; then # Find connections to known malicious ports | grep -E ":6667 | :6668 | :6669 | :1337 | :31337 | :4444 | :5555" "$LOG_DIR/conn.log" > "$OUTPUT_DIR/suspicious_ports_$DATE.txt" |

if [ -s "$OUTPUT_DIR/suspicious_ports_$DATE.txt" ]; then
    log_finding "Connections to suspicious ports detected:"

| awk '{print $6}' "$OUTPUT_DIR/suspicious_ports_$DATE.txt" | sort | uniq -c | while read line; do | log_finding " $line" done else log_finding "No connections to suspicious ports detected" fi fi

7. Brute Force Detection

log_finding "\n=== Brute Force Detection ===" if [ -f "$LOG_DIR/conn.log" ]; then # Find multiple failed SSH connections awk '$6 == "22" && $11 != "SF" {print $3}' "$LOG_DIR/conn.log" | \ | sort | uniq -c | sort -nr | \ | awk '$1 > 10 {print $2, $1}' > "$OUTPUT_DIR/ssh_brute_force_$DATE.txt"

if [ -s "$OUTPUT_DIR/ssh_brute_force_$DATE.txt" ]; then
    log_finding "Potential SSH brute force attacks detected:"
    head -5 "$OUTPUT_DIR/ssh_brute_force_$DATE.txt" | while read line; do
        log_finding "  $line"
    done
else
    log_finding "No SSH brute force attacks detected"
fi

fi

Generate summary

log_finding "\n=== Summary ===" log_finding "Threat hunting analysis completed at $(date)" log_finding "Results saved to: $OUTPUT_DIR/"

Create IOC list

{ echo "# Indicators of Compromise - $DATE" echo "# Generated by Zeek Threat Hunting" echo ""

if [ -s "$OUTPUT_DIR/potential_port_scans_$DATE.txt" ]; then
    echo "# Port Scan Sources"
    awk '{print $1}' "$OUTPUT_DIR/potential_port_scans_$DATE.txt"
    echo ""
fi

if [ -s "$OUTPUT_DIR/high_freq_dns_$DATE.txt" ]; then
    echo "# High-Frequency DNS Sources"
    awk '{print $1}' "$OUTPUT_DIR/high_freq_dns_$DATE.txt"
    echo ""
fi

if [ -s "$OUTPUT_DIR/large_uploads_$DATE.txt" ]; then
    echo "# Large Upload Sources"
    awk '{print $1}' "$OUTPUT_DIR/large_uploads_$DATE.txt"
    echo ""
fi

} > "$OUTPUT_DIR/iocs_$DATE.txt"

log_finding "IOCs saved to: $OUTPUT_DIR/iocs_$DATE.txt"

echo "Threat hunting analysis complete. Check $OUTPUT_DIR/ for detailed results." ```_

Integration und Automatisierung

ELK Stack Integration

```bash

Install Filebeat for log shipping

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.5.0-linux-x86_64.tar.gz tar xzvf filebeat-8.5.0-linux-x86_64.tar.gz cd filebeat-8.5.0-linux-x86_64/

Configure Filebeat for Zeek logs

cat > filebeat.yml << EOF filebeat.inputs: - type: log enabled: true paths: - /opt/zeek/logs/current/*.log exclude_files: ['.gz$'] fields: logtype: zeek fields_under_root: true multiline.pattern: '^#' multiline.negate: true multiline.match: after

output.elasticsearch: hosts: ["localhost:9200"] index: "zeek-logs-%{+yyyy.MM.dd}"

setup.template.name: "zeek-logs" setup.template.pattern: "zeek-logs-*"

logging.level: info logging.to_files: true logging.files: path: /var/log/filebeat name: filebeat keepfiles: 7 permissions: 0644 EOF

Start Filebeat

sudo ./filebeat -e -c filebeat.yml ```_

Integration von Splunk

```bash

Configure Splunk Universal Forwarder for Zeek logs

Install Splunk Universal Forwarder first

Create inputs.conf

sudo tee /opt/splunkforwarder/etc/system/local/inputs.conf << EOF [monitor:///opt/zeek/logs/current/*.log] disabled = false sourcetype = zeek index = zeek host_segment = 4

[monitor:///opt/zeek/logs/current/conn.log] sourcetype = zeek:conn

[monitor:///opt/zeek/logs/current/http.log] sourcetype = zeek:http

[monitor:///opt/zeek/logs/current/dns.log] sourcetype = zeek:dns

[monitor:///opt/zeek/logs/current/ssl.log] sourcetype = zeek:ssl

[monitor:///opt/zeek/logs/current/notice.log] sourcetype = zeek:notice EOF

Create props.conf for field extraction

sudo tee /opt/splunkforwarder/etc/system/local/props.conf << EOF [zeek] SHOULD_LINEMERGE = false LINE_BREAKER = ([\r\n]+) TIME_PREFIX = ^ TIME_FORMAT = %s.%6N TRUNCATE = 0

[zeek:conn] EXTRACT-zeek_conn = ^(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)

[zeek:http] EXTRACT-zeek_http = ^(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)

[zeek:dns] EXTRACT-zeek_dns = ^(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+)\t(?[^\t]+) EOF

Restart Splunk Universal Forwarder

sudo /opt/splunkforwarder/bin/splunk restart ```_

SIEM Integrationsskript

```python

!/usr/bin/env python3

""" Zeek SIEM Integration Real-time log parsing and alert generation for SIEM systems """

import json import time import socket import syslog import requests from datetime import datetime from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import re

class ZeekSIEMIntegrator: def init(self, config_file="siem_config.json"): self.config = self.load_config(config_file) self.alert_rules = self.load_alert_rules()

def load_config(self, config_file):
    """Load SIEM integration configuration"""
    try:
        with open(config_file, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        # Default configuration
        return {
            "siem_type": "syslog",
            "syslog_server": "localhost",
            "syslog_port": 514,
            "api_endpoint": None,
            "api_key": None,
            "log_directory": "/opt/zeek/logs/current",
            "alert_threshold": {
                "port_scan": 20,
                "dns_queries": 100,
                "large_transfer": 100000000
            }
        }

def load_alert_rules(self):
    """Load alert detection rules"""
    return {
        "port_scan": {
            "pattern": r"multiple_ports",
            "severity": "medium",
            "description": "Port scan detected"
        },
        "sql_injection": {

| "pattern": r"(union | select | insert | delete | drop | exec)", | "severity": "high", "description": "SQL injection attempt detected" }, "xss_attempt": { | "pattern": r"(script | javascript | onerror | onload)", | "severity": "medium", "description": "XSS attempt detected" }, "dns_tunneling": { "pattern": r"long_dns_query", "severity": "high", "description": "Possible DNS tunneling detected" }, "malware_communication": { | "pattern": r"(6667 | 6668 | 6669 | 1337 | 31337 | 4444 | 5555)", | "severity": "high", "description": "Malware communication detected" } }

def send_syslog_alert(self, alert):
    """Send alert via syslog"""
    try:
        # Create syslog message
        message = f"ZEEK_ALERT: {alert['description']} - Source: {alert.get('source_ip', 'unknown')} - Severity: {alert['severity']}"

        # Send to syslog server
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(message.encode(), (self.config['syslog_server'], self.config['syslog_port']))
        sock.close()

        print(f"Syslog alert sent: {message}")

    except Exception as e:
        print(f"Error sending syslog alert: {e}")

def send_api_alert(self, alert):
    """Send alert via REST API"""
    try:
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f"Bearer {self.config['api_key']}"
        }

        payload = {
            'timestamp': alert['timestamp'],
            'severity': alert['severity'],
            'description': alert['description'],
            'source_ip': alert.get('source_ip'),
            'destination_ip': alert.get('destination_ip'),
            'details': alert.get('details', {})
        }

        response = requests.post(
            self.config['api_endpoint'],
            headers=headers,
            json=payload,
            timeout=10
        )

        if response.status_code == 200:
            print(f"API alert sent successfully: {alert['description']}")
        else:
            print(f"API alert failed: {response.status_code} - {response.text}")

    except Exception as e:
        print(f"Error sending API alert: {e}")

def send_alert(self, alert):
    """Send alert to configured SIEM system"""
    if self.config['siem_type'] == 'syslog':
        self.send_syslog_alert(alert)
    elif self.config['siem_type'] == 'api' and self.config['api_endpoint']:
        self.send_api_alert(alert)
    else:
        print(f"Alert: {alert}")

def analyze_connection_log(self, line):
    """Analyze connection log for suspicious activity"""
    fields = line.strip().split('\t')
    if len(fields) < 12 or fields[0].startswith('#'):
        return

    try:
        ts, uid, orig_h, orig_p, resp_h, resp_p, proto, service, duration, orig_bytes, resp_bytes, conn_state = fields[:12]

        # Check for large data transfers
        if orig_bytes.isdigit() and int(orig_bytes) > self.config['alert_threshold']['large_transfer']:
            alert = {
                'timestamp': datetime.now().isoformat(),
                'severity': 'medium',
                'description': 'Large outbound data transfer detected',
                'source_ip': orig_h,
                'destination_ip': resp_h,
                'details': {
                    'bytes_transferred': orig_bytes,
                    'protocol': proto,
                    'service': service
                }
            }
            self.send_alert(alert)

    except (ValueError, IndexError) as e:
        print(f"Error parsing connection log: {e}")

def analyze_http_log(self, line):
    """Analyze HTTP log for web attacks"""
    fields = line.strip().split('\t')
    if len(fields) < 16 or fields[0].startswith('#'):
        return

    try:
        ts, uid, orig_h, orig_p, resp_h, resp_p, trans_depth, method, host, uri, referrer, version, user_agent, request_body_len, response_body_len, status_code = fields[:16]

        # Check for SQL injection
        if re.search(self.alert_rules['sql_injection']['pattern'], uri, re.IGNORECASE):
            alert = {
                'timestamp': datetime.now().isoformat(),
                'severity': self.alert_rules['sql_injection']['severity'],
                'description': self.alert_rules['sql_injection']['description'],
                'source_ip': orig_h,
                'destination_ip': resp_h,
                'details': {
                    'method': method,
                    'host': host,
                    'uri': uri,
                    'user_agent': user_agent
                }
            }
            self.send_alert(alert)

        # Check for XSS attempts
        if re.search(self.alert_rules['xss_attempt']['pattern'], uri, re.IGNORECASE):
            alert = {
                'timestamp': datetime.now().isoformat(),
                'severity': self.alert_rules['xss_attempt']['severity'],
                'description': self.alert_rules['xss_attempt']['description'],
                'source_ip': orig_h,
                'destination_ip': resp_h,
                'details': {
                    'method': method,
                    'host': host,
                    'uri': uri,
                    'user_agent': user_agent
                }
            }
            self.send_alert(alert)

    except (ValueError, IndexError) as e:
        print(f"Error parsing HTTP log: {e}")

def analyze_dns_log(self, line):
    """Analyze DNS log for tunneling and suspicious queries"""
    fields = line.strip().split('\t')
    if len(fields) < 16 or fields[0].startswith('#'):
        return

    try:
        ts, uid, orig_h, orig_p, resp_h, resp_p, proto, trans_id, rtt, query, qclass, qclass_name, qtype, qtype_name, rcode, rcode_name = fields[:16]

        # Check for long DNS queries (possible tunneling)
        if len(query) > 50:
            alert = {
                'timestamp': datetime.now().isoformat(),
                'severity': 'high',
                'description': 'Possible DNS tunneling detected (long query)',
                'source_ip': orig_h,
                'destination_ip': resp_h,
                'details': {
                    'query': query,
                    'query_length': len(query),
                    'query_type': qtype_name
                }
            }
            self.send_alert(alert)

        # Check for suspicious TLDs
        suspicious_tlds = ['.tk', '.ml', '.ga', '.cf']
        if any(tld in query for tld in suspicious_tlds):
            alert = {
                'timestamp': datetime.now().isoformat(),
                'severity': 'medium',
                'description': 'Query to suspicious TLD detected',
                'source_ip': orig_h,
                'destination_ip': resp_h,
                'details': {
                    'query': query,
                    'query_type': qtype_name
                }
            }
            self.send_alert(alert)

    except (ValueError, IndexError) as e:
        print(f"Error parsing DNS log: {e}")

class ZeekLogHandler(FileSystemEventHandler): def init(self, integrator): self.integrator = integrator self.log_processors = { 'conn.log': integrator.analyze_connection_log, 'http.log': integrator.analyze_http_log, 'dns.log': integrator.analyze_dns_log }

def on_modified(self, event):
    if event.is_directory:
        return

    filename = event.src_path.split('/')[-1]
    if filename in self.log_processors:
        try:
            with open(event.src_path, 'r') as f:
                # Read only new lines (simple approach)
                f.seek(0, 2)  # Go to end
                file_size = f.tell()

                # Read last few lines
                f.seek(max(0, file_size - 1024))
                lines = f.readlines()

                # Process last line (most recent)
                if lines:
                    self.log_processors[filename](lines[-1])

        except Exception as e:
            print(f"Error processing {filename}: {e}")

def main(): # Initialize SIEM integrator integrator = ZeekSIEMIntegrator()

# Set up file monitoring
event_handler = ZeekLogHandler(integrator)
observer = Observer()
observer.schedule(event_handler, integrator.config['log_directory'], recursive=False)

print(f"Starting Zeek SIEM integration...")
print(f"Monitoring: {integrator.config['log_directory']}")
print(f"SIEM Type: {integrator.config['siem_type']}")

observer.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
    print("Stopping Zeek SIEM integration...")

observer.join()

if name == "main": main() ```_

Diese umfassende Zeek cheatsheet bietet umfassende Abdeckung von Netzwerk-Sicherheitsüberwachung, Verkehrsanalyse, Log-Analyse, benutzerdefinierte Skriptentwicklung, Bedrohungsjagd und SIEM-Integration. Die mitgelieferten Skripte und Beispiele ermöglichen eine professionelle Netzwerksicherheitsüberwachung und Vorfallreaktion auf der Zeek-Plattform.