Skip to content

Python ICAP YARA

Build and deploy an ICAP (Internet Content Adaptation Protocol) server that integrates YARA rules for real-time malware detection in proxied HTTP content. Detect threats, scan network traffic, and protect against malicious files.

Quick Start

# Install dependencies
pip install python-icap yara-python requests

# Create YARA rules directory
mkdir -p ./yara_rules

# Start basic ICAP server with YARA scanning
python3 -m icap.server --port 1344 --yara-rules ./yara_rules

# Test ICAP connectivity
curl -i -X OPTIONS icap://localhost:1344/avscan

Installation

Linux/macOS

# Install system dependencies
sudo apt install libyara-dev yara python3-dev  # Ubuntu/Debian
brew install yara python3  # macOS

# Create virtual environment
python3 -m venv icap-env
source icap-env/bin/activate

# Install Python packages
pip install --upgrade pip
pip install python-icap yara-python
pip install requests urllib3

# Verify installation
python3 -c "import yara; print(yara.__version__)"
python3 -c "import icap; print(icap.__version__)"

YARA Rules

Create Rule Files

# ./yara_rules/malware.yar
rule Eicar_Test_File {
    meta:
        description = "EICAR test malware"
        date = "2025-01-01"
    strings:
        $eicar = "X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    condition:
        $eicar
}

rule Suspicious_Shell_Script {
    meta:
        description = "Detects suspicious shell commands"
    strings:
        $bash = "#!/bin/bash"
        $cmd1 = "nc -l"
        $cmd2 = "bash -i"
    condition:
        $bash and any of ($cmd*)
}

rule Detect_Base64_Encoded_Payload {
    meta:
        description = "Detects high entropy base64 blocks"
    strings:
        $pattern = /[A-Za-z0-9+\/=]{100,}/
    condition:
        $pattern
}

Compile Rules

# Verify syntax
yara -S -d test_rule=1 ./yara_rules/malware.yar test_file.bin

# Compile to binary
yara -c ./yara_rules/malware.yar > malware.yarc

# Test rules against sample
yara ./yara_rules/malware.yar /path/to/suspicious_file

ICAP Server Implementation

Basic ICAP Server (Python)

Configuration

ICAP Server Config

# icap_config.yaml
server:
  host: 0.0.0.0
  port: 1344
  max_connections: 100
  request_timeout: 30
  max_file_size: 52428800  # 50MB

yara:
  rules_directory: ./yara_rules
  compile_rules: true
  cache_compiled: true
  timeout: 5
  enable_logging: true

scanning:
  scan_request_body: true
  scan_response_body: true
  scan_file_types: [exe, dll, scr, jar, zip, rar, pdf, doc, docx]
  min_file_size: 100
  quarantine_dir: ./quarantine

logging:
  level: DEBUG
  file: ./icap_server.log
  max_size: 10485760
  backup_count: 5

performance:
  threads: 4
  chunk_size: 8192
  enable_caching: true

Proxy Integration

# Configure Squid proxy to use ICAP
# /etc/squid/squid.conf
icap_enable on
icap_service service_req reqmod_precache bypass=0 icap://127.0.0.1:1344/reqmod
icap_service service_resp respmod_precache bypass=0 icap://127.0.0.1:1344/respmod

adapt_service_set req_side service_req
adapt_service_set resp_side service_resp

Advanced Server Implementation

Full ICAP Server with YARA

#!/usr/bin/env python3
"""
ICAP Server with YARA Malware Detection
"""

import yara
import logging
from pyicap import ICAPServer, BaseICAPRequestHandler
import io

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

class YARAScanner:
    def __init__(self, rules_path):
        self.rules = yara.compile(filepath=rules_path)
        self.quarantine = []

    def scan_data(self, data, filename="unknown"):
        """Scan data with YARA rules"""
        try:
            matches = self.rules.match(data=data)
            if matches:
                logger.warning(f"YARA match in {filename}: {matches}")
                self.quarantine.append({
                    'filename': filename,
                    'matches': [m.rule for m in matches]
                })
                return False, matches
            return True, None
        except Exception as e:
            logger.error(f"Scan error: {e}")
            return True, None

class YARAICAPHandler(BaseICAPRequestHandler):
    scanner = None

    def respmod(self):
        """Response modification - scan response body"""
        self.set_icap_response(200)

        # Get response body
        body = self.encapsulated_res_body

        if body:
            headers = self.res_headers
            content_type = headers.get('Content-Type', '')

            # Scan binary content
            is_clean, matches = self.scanner.scan_data(body, 'response_body')

            if not is_clean:
                logger.warning(f"Malicious content detected: {matches}")
                # Block malicious content
                self.set_icap_response(403)
            else:
                self.set_icap_response(204)
        else:
            self.set_icap_response(204)

    def reqmod(self):
        """Request modification - scan uploaded files"""
        self.set_icap_response(200)

        body = self.encapsulated_req_body

        if body:
            is_clean, matches = self.scanner.scan_data(body, 'upload')

            if not is_clean:
                logger.warning(f"Upload blocked: {matches}")
                self.set_icap_response(403)
            else:
                self.set_icap_response(204)
        else:
            self.set_icap_response(204)

if __name__ == '__main__':
    # Initialize scanner
    scanner = YARAScanner('./yara_rules/malware.yar')
    YARAICAPHandler.scanner = scanner

    # Start server
    server = ICAPServer(('0.0.0.0', 1344), YARAICAPHandler)
    logger.info("ICAP Server with YARA started on port 1344")
    server.serve_forever()

Multi-Rule Scanning

import os
import yara

class AdvancedYARAScanner:
    def __init__(self, rules_dir):
        self.rules_dir = rules_dir
        self.rules_dict = self._compile_rules()
        self.stats = {'scanned': 0, 'detected': 0}

    def _compile_rules(self):
        """Compile all YARA rules from directory"""
        rules = {}
        for filename in os.listdir(self.rules_dir):
            if filename.endswith('.yar') or filename.endswith('.yara'):
                path = os.path.join(self.rules_dir, filename)
                rules[filename] = yara.compile(filepath=path)
        return rules

    def scan_all_rulesets(self, data):
        """Scan data against all rule sets"""
        all_matches = {}
        for name, rule in self.rules_dict.items():
            matches = rule.match(data=data)
            if matches:
                all_matches[name] = [m.rule for m in matches]
                self.stats['detected'] += 1

        self.stats['scanned'] += 1
        return all_matches

    def get_stats(self):
        """Return scanning statistics"""
        return self.stats

Scanning & Detection

Scan Files

# Scan single file
yara ./yara_rules/malware.yar /path/to/file

# Scan directory recursively
yara -r ./yara_rules/malware.yar /path/to/directory

# Scan with multiple rule files
yara ./yara_rules/malware.yar ./yara_rules/suspicious.yar /file

# Output JSON results
yara -j ./yara_rules/malware.yar /file

# Show matching strings
yara -s ./yara_rules/malware.yar /file

# Scan with custom identifier
yara -d identifier=value ./yara_rules/malware.yar /file

# Timeout per file (seconds)
yara --scan-timeout 10 ./yara_rules/malware.yar /file

Python Scanning Script

#!/usr/bin/env python3
"""
Scan files/directories with YARA rules
"""

import yara
import os
import json
from pathlib import Path

def scan_directory(yara_rule_file, target_dir, extensions=None):
    """Recursively scan directory"""
    rules = yara.compile(filepath=yara_rule_file)
    results = {}

    for root, dirs, files in os.walk(target_dir):
        for filename in files:
            filepath = os.path.join(root, filename)

            if extensions:
                if not any(filename.endswith(ext) for ext in extensions):
                    continue

            try:
                matches = rules.match(filepath=filepath)
                if matches:
                    results[filepath] = [m.rule for m in matches]
            except Exception as e:
                print(f"Error scanning {filepath}: {e}")

    return results

# Example usage
results = scan_directory('./yara_rules/malware.yar', '/downloads',
                        extensions=['.exe', '.dll', '.pdf', '.zip'])

# Output findings
for file, detections in results.items():
    print(f"[DETECTED] {file}: {', '.join(detections)}")

Network Traffic Scanning

# Capture and scan HTTP traffic with Squid ICAP
# Configure Squid to send traffic to ICAP server

# Monitor ICAP traffic
tcpdump -i any -n 'tcp port 1344' -A

# Test ICAP service
curl -i -X OPTIONS icap://localhost:1344/avscan

# Make ICAP request with file body
curl -i -X POST icap://localhost:1344/avscan \
  -H "Encapsulation: req-body=0, res-body=null" \
  --data-binary "@suspicious_file.exe"

Rule Development & Management

Create Custom Rules

# Interactive rule editor
yara-editor

# Test rule before deployment
yara --print-stats ./new_rule.yar /test_samples/

# Validate rule syntax
yara -d test=1 ./new_rule.yar > /dev/null && echo "Valid" || echo "Invalid"

Rule Examples

# Detect suspicious PowerShell scripts
rule Suspicious_PowerShell {
    meta:
        description = "Detects obfuscated PowerShell code"
        severity = "high"
    strings:
        $ps_header = "powershell" nocase
        $encodedcmd = "System.Text.Encoding" nocase
        $base64 = /FromBase64String|DownloadString|Invoke-Expression/ nocase
    condition:
        $ps_header and any of ($*)
}

# Detect potential ransomware
rule Potential_Ransomware {
    meta:
        description = "Checks for file encryption patterns"
    strings:
        $aes = { 6A 40 [0-5] FF 15 }
        $wcry = "wcry"
        $bitcoin = "bitcoin" nocase
    condition:
        any of them
}

# Detect cryptocurrency miners
rule Crypto_Miner {
    meta:
        description = "Detects mining pool connections"
    strings:
        $pool1 = "stratum.mining.pool.com"
        $pool2 = "minergate.com"
        $pool3 = "nanopool.org"
        $getwork = "getwork"
    condition:
        any of ($pool*) or $getwork
}

Update Rules

# Download latest signatures
wget https://github.com/Yara-Rules/rules/archive/master.zip
unzip -o master.zip -d ./yara_rules

# Merge rule files
cat ./yara_rules/*.yar > ./yara_rules/combined.yar

# Remove duplicates
sort -u ./yara_rules/combined.yar > ./yara_rules/deduplicated.yar

Monitoring & Logging

Server Monitoring

# Check ICAP service status
systemctl status icap-yara

# View real-time logs
tail -f ./icap_server.log

# Monitor process
top -p $(pgrep -f icap)

# Check port availability
netstat -tlnp | grep 1344
lsof -i :1344

# Monitor ICAP connections
tcpdump -i any -n 'tcp port 1344'

# Get server stats
curl icap://localhost:1344/stats

Log Analysis

# Count detections
grep -c "YARA match" ./icap_server.log

# List detected rules
grep "YARA match" ./icap_server.log | awk -F'rule=' '{print $2}' | sort | uniq -c

# Show latest threats
tail -20 ./icap_server.log | grep -i "malicious\|detected"

# Parse JSON logs
grep "^{" ./icap_server.log | jq '.detection_type, .filename'

# Export statistics
grep "Detection:" ./icap_server.log | wc -l > detection_stats.txt

Python Logging

import logging
import json
from datetime import datetime

class ScanLogger:
    def __init__(self, logfile):
        self.logger = logging.getLogger(__name__)
        handler = logging.FileHandler(logfile)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_detection(self, filename, matches, severity="HIGH"):
        """Log malware detection"""
        entry = {
            'timestamp': datetime.now().isoformat(),
            'filename': filename,
            'severity': severity,
            'detections': matches
        }
        self.logger.warning(json.dumps(entry))

    def log_scan(self, filename, result, duration):
        """Log scan completion"""
        entry = {
            'timestamp': datetime.now().isoformat(),
            'filename': filename,
            'result': result,
            'duration_ms': duration
        }
        self.logger.info(json.dumps(entry))

Troubleshooting

Common Issues

YARA rules not loading

# Validate rule syntax
yara -d test=1 ./yara_rules/malware.yar

# Check file permissions
ls -la ./yara_rules/

# Test with simple rule
echo 'rule test { strings: $a = "test" condition: $a }' > test.yar
yara test.yar test.txt

# Recompile rules
python3 -c "import yara; rules = yara.compile(filepath='./yara_rules/malware.yar')"

ICAP server not responding

# Check if server is running
pgrep -f icap
ps aux | grep icap

# Check port availability
lsof -i :1344
netstat -tlnp | grep 1344

# Start in debug mode
python3 icap_server.py --debug

# Check for port conflicts
sudo netstat -tulpn | grep 1344

High false positives

# Review detected files
ls -la ./quarantine/

# Create whitelist rules
cat > whitelist.yar << 'EOF'
rule Whitelist {
    strings:
        $safe = "c:\\windows\\system32" nocase
    condition:
        $safe
}
EOF

# Exclude rule during scan
yara --exclude-rules Whitelist ./yara_rules/malware.yar /file

Performance issues

# Reduce file size limit
# Edit config: max_file_size: 26214400  # 25MB

# Increase threads
# Edit config: threads: 8

# Monitor resource usage
watch -n 1 'top -bn1 | grep icap'

# Profile scan times
python3 -m cProfile -s cumulative icap_server.py

Debug Script

#!/usr/bin/env python3
"""Debug and validate ICAP+YARA setup"""

import yara
import os
import socket
import sys

def check_dependencies():
    """Verify required packages"""
    try:
        import yara
        print("✓ YARA installed")
    except ImportError:
        print("✗ YARA not installed")
        return False

    try:
        import pyicap
        print("✓ pyICAPServer installed")
    except ImportError:
        print("✗ pyICAPServer not installed")
        return False
    return True

def validate_rules(rules_dir):
    """Check YARA rules"""
    if not os.path.exists(rules_dir):
        print(f"✗ Rules directory not found: {rules_dir}")
        return False

    for f in os.listdir(rules_dir):
        if f.endswith('.yar'):
            try:
                yara.compile(filepath=os.path.join(rules_dir, f))
                print(f"✓ Valid rule: {f}")
            except Exception as e:
                print(f"✗ Invalid rule {f}: {e}")
                return False
    return True

def check_port(port=1344):
    """Test port availability"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex(('127.0.0.1', port))
    sock.close()

    if result == 0:
        print(f"✓ Port {port} available")
        return True
    else:
        print(f"✗ Port {port} in use")
        return False

if __name__ == '__main__':
    print("=== ICAP+YARA Diagnostic ===\n")

    print("1. Checking dependencies...")
    if not check_dependencies():
        sys.exit(1)

    print("\n2. Validating YARA rules...")
    if not validate_rules('./yara_rules'):
        sys.exit(1)

    print("\n3. Checking port...")
    check_port()

    print("\n✓ All checks passed!")

Performance Tuning

Optimize Scanning

# Skip large files
yara --max-file-size 52428800 ./rules.yar /path

# Parallel scanning (Python)
python3 << 'EOF'
from concurrent.futures import ThreadPoolExecutor
import yara
import os

rules = yara.compile(filepath='./rules.yar')

def scan_file(filepath):
    return rules.match(filepath=filepath)

files = [f for f in os.listdir('/target') if os.path.isfile(f)]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(scan_file, files))
EOF

# Cache compiled rules
cp ./rules.yar ./rules.yarc
# Load cached rules in code: yara.compile(filepath='./rules.yarc')

Reduce False Positives

# Create exceptions.yar for safe files
cat > exceptions.yar << 'EOF'
rule Whitelist_System_Files {
    strings:
        $win = "c:\\windows" nocase
        $prog = "c:\\program files" nocase
    condition:
        any of them
}
EOF

# Exclude during scan
yara --exclude-rules Whitelist_System_Files rules.yar /target

Batch Processing

#!/usr/bin/env python3
"""Batch scan with performance tracking"""

import yara
import time
import os
from pathlib import Path

class BatchScanner:
    def __init__(self, rules_file, batch_size=100):
        self.rules = yara.compile(filepath=rules_file)
        self.batch_size = batch_size
        self.stats = {'total': 0, 'detections': 0, 'time': 0}

    def scan_directory(self, directory):
        """Scan in batches"""
        files = list(Path(directory).rglob('*'))
        start = time.time()

        for i in range(0, len(files), self.batch_size):
            batch = files[i:i+self.batch_size]
            for f in batch:
                if f.is_file():
                    matches = self.rules.match(filepath=str(f))
                    self.stats['total'] += 1
                    if matches:
                        self.stats['detections'] += 1

        self.stats['time'] = time.time() - start
        return self.stats

# Usage
scanner = BatchScanner('./rules.yar', batch_size=50)
stats = scanner.scan_directory('/target_dir')
print(f"Scanned: {stats['total']}, Detections: {stats['detections']}, Time: {stats['time']:.2f}s")

Integration & Deployment

Systemd Service

# /etc/systemd/system/icap-yara.service
[Unit]
Description=ICAP Server with YARA Malware Detection
After=network.target

[Service]
Type=simple
User=icap
WorkingDirectory=/opt/icap-yara
ExecStart=/usr/bin/python3 /opt/icap-yara/icap_server.py
Restart=always
RestartSec=10

# Resource limits
MemoryMax=2G
CPUQuota=50%

[Install]
WantedBy=multi-user.target
# Install service
sudo cp icap-yara.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable icap-yara
sudo systemctl start icap-yara

Docker Container

FROM python:3.11-slim

RUN apt-get update && apt-get install -y \
    libyara-dev yara && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /icap

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 1344

CMD ["python3", "icap_server.py"]
# Build and run
docker build -t icap-yara .
docker run -d -p 1344:1344 \
  -v $(pwd)/yara_rules:/icap/yara_rules \
  -v $(pwd)/quarantine:/icap/quarantine \
  --name icap-yara icap-yara

Squid Integration

# Configure Squid to use ICAP
cat >> /etc/squid/squid.conf << 'EOF'
icap_enable on
icap_service respmod_service respmod_precache bypass=0 icap://127.0.0.1:1344/respmod
icap_service reqmod_service reqmod_precache bypass=0 icap://127.0.0.1:1344/reqmod

adapt_service_set req_side reqmod_service
adapt_service_set resp_side respmod_service
EOF

# Reload Squid
sudo systemctl reload squid

Real-World Workflows

Deploy Enterprise Scanning

#!/bin/bash
# Deploy ICAP+YARA across Squid proxies

set -e

# Configuration
ICAP_PORT=1344
YARA_RULES_GIT="https://github.com/Yara-Rules/rules.git"
DEPLOY_DIR="/opt/icap-yara"

# 1. Setup
echo "[*] Installing dependencies..."
sudo apt install -y python3 python3-venv libyara-dev yara squid

# 2. Clone repository
echo "[*] Setting up YARA rules..."
mkdir -p $DEPLOY_DIR
cd $DEPLOY_DIR
git clone $YARA_RULES_GIT yara-rules-repo || true

# 3. Install Python packages
python3 -m venv venv
source venv/bin/activate
pip install python-icap yara-python requests

# 4. Deploy systemd service
sudo tee /etc/systemd/system/icap-yara.service > /dev/null << 'EOF'
[Unit]
Description=ICAP YARA Server
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/icap-yara
ExecStart=/opt/icap-yara/venv/bin/python3 icap_server.py
Restart=always

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable icap-yara
sudo systemctl start icap-yara

echo "[+] Deployment complete. ICAP running on port $ICAP_PORT"

Log Analysis & Reporting

#!/usr/bin/env python3
"""Generate detection reports"""

import json
import re
from datetime import datetime, timedelta
from collections import Counter

def parse_logs(logfile):
    """Parse ICAP logs"""
    detections = []
    with open(logfile, 'r') as f:
        for line in f:
            if 'YARA match' in line or 'DETECTED' in line:
                detections.append(line)
    return detections

def generate_report(logfile):
    """Create HTML report"""
    detections = parse_logs(logfile)
    rule_counts = Counter()

    for d in detections:
        match = re.search(r'rule=(\w+)', d)
        if match:
            rule_counts[match.group(1)] += 1

    html = f"""
    <html>
    <body>
    <h1>ICAP-YARA Detection Report</h1>
    <p>Generated: {datetime.now()}</p>
    <h2>Summary</h2>
    <p>Total detections: {len(detections)}</p>
    <h2>Top Rules</h2>
    <ul>
    """

    for rule, count in rule_counts.most_common(10):
        html += f"<li>{rule}: {count}</li>\n"

    html += """
    </ul>
    </body>
    </html>
    """

    with open('report.html', 'w') as f:
        f.write(html)

if __name__ == '__main__':
    generate_report('./icap_server.log')
    print("Report generated: report.html")

Best Practices

Security Hardening

# Run with restricted permissions
sudo useradd -r -s /bin/false icap
sudo chown -R icap:icap /opt/icap-yara

# Use read-only rule files
sudo chmod 644 ./yara_rules/*

# Isolate in container
docker run --read-only --cap-drop=ALL --net=host icap-yara

# Encrypt ICAP traffic
# Use ICAP over TLS (ICAPS)

Rule Management

# Keep rules updated
cron: 0 2 * * * cd /opt/icap-yara && git pull origin main

# Validate before deployment
yara -d test=1 ./yara_rules/* test_samples/*

# Archive old detections
find ./quarantine -type f -mtime +30 -exec gzip {} \;
tar -czf quarantine_backup_$(date +%Y%m%d).tar.gz ./quarantine/

# Test new rules in isolated environment
docker run -v $(pwd)/test_rules:/rules icap-yara:test

Performance Best Practices

# Monitor resource usage
systemctl status icap-yara --no-pager
journalctl -u icap-yara -f

# Tune thread pool
# config: threads: $(nproc) / 2

# Use SSD for quarantine directory
mount -o defaults,relatime /dev/nvme0n1 /quarantine

# Implement rate limiting in ICAP handler
# Limit to 100 requests/sec per source IP

Resources

ICAP & YARA Documentation

Python Libraries


Last updated: 2025-03-30