Python ICAP YARA
Build and deploy an ICAP (Internet Content Adaptation Protocol) server that integrates YARA rules for real-time malware detection in proxied HTTP content. Detect threats, scan network traffic, and protect against malicious files.
Quick Start
# Install dependencies
pip install python-icap yara-python requests
# Create YARA rules directory
mkdir -p ./yara_rules
# Start basic ICAP server with YARA scanning
python3 -m icap.server --port 1344 --yara-rules ./yara_rules
# Test ICAP connectivity
curl -i -X OPTIONS icap://localhost:1344/avscan
Installation
Linux/macOS
# Install system dependencies
sudo apt install libyara-dev yara python3-dev # Ubuntu/Debian
brew install yara python3 # macOS
# Create virtual environment
python3 -m venv icap-env
source icap-env/bin/activate
# Install Python packages
pip install --upgrade pip
pip install python-icap yara-python
pip install requests urllib3
# Verify installation
python3 -c "import yara; print(yara.__version__)"
python3 -c "import icap; print(icap.__version__)"
YARA Rules
Create Rule Files
# ./yara_rules/malware.yar
rule Eicar_Test_File {
meta:
description = "EICAR test malware"
date = "2025-01-01"
strings:
$eicar = "X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
condition:
$eicar
}
rule Suspicious_Shell_Script {
meta:
description = "Detects suspicious shell commands"
strings:
$bash = "#!/bin/bash"
$cmd1 = "nc -l"
$cmd2 = "bash -i"
condition:
$bash and any of ($cmd*)
}
rule Detect_Base64_Encoded_Payload {
meta:
description = "Detects high entropy base64 blocks"
strings:
$pattern = /[A-Za-z0-9+\/=]{100,}/
condition:
$pattern
}
Compile Rules
# Verify syntax
yara -S -d test_rule=1 ./yara_rules/malware.yar test_file.bin
# Compile to binary
yara -c ./yara_rules/malware.yar > malware.yarc
# Test rules against sample
yara ./yara_rules/malware.yar /path/to/suspicious_file
ICAP Server Implementation
Basic ICAP Server (Python)
Configuration
ICAP Server Config
# icap_config.yaml
server:
host: 0.0.0.0
port: 1344
max_connections: 100
request_timeout: 30
max_file_size: 52428800 # 50MB
yara:
rules_directory: ./yara_rules
compile_rules: true
cache_compiled: true
timeout: 5
enable_logging: true
scanning:
scan_request_body: true
scan_response_body: true
scan_file_types: [exe, dll, scr, jar, zip, rar, pdf, doc, docx]
min_file_size: 100
quarantine_dir: ./quarantine
logging:
level: DEBUG
file: ./icap_server.log
max_size: 10485760
backup_count: 5
performance:
threads: 4
chunk_size: 8192
enable_caching: true
Proxy Integration
# Configure Squid proxy to use ICAP
# /etc/squid/squid.conf
icap_enable on
icap_service service_req reqmod_precache bypass=0 icap://127.0.0.1:1344/reqmod
icap_service service_resp respmod_precache bypass=0 icap://127.0.0.1:1344/respmod
adapt_service_set req_side service_req
adapt_service_set resp_side service_resp
Advanced Server Implementation
Full ICAP Server with YARA
#!/usr/bin/env python3
"""
ICAP Server with YARA Malware Detection
"""
import yara
import logging
from pyicap import ICAPServer, BaseICAPRequestHandler
import io
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class YARAScanner:
def __init__(self, rules_path):
self.rules = yara.compile(filepath=rules_path)
self.quarantine = []
def scan_data(self, data, filename="unknown"):
"""Scan data with YARA rules"""
try:
matches = self.rules.match(data=data)
if matches:
logger.warning(f"YARA match in {filename}: {matches}")
self.quarantine.append({
'filename': filename,
'matches': [m.rule for m in matches]
})
return False, matches
return True, None
except Exception as e:
logger.error(f"Scan error: {e}")
return True, None
class YARAICAPHandler(BaseICAPRequestHandler):
scanner = None
def respmod(self):
"""Response modification - scan response body"""
self.set_icap_response(200)
# Get response body
body = self.encapsulated_res_body
if body:
headers = self.res_headers
content_type = headers.get('Content-Type', '')
# Scan binary content
is_clean, matches = self.scanner.scan_data(body, 'response_body')
if not is_clean:
logger.warning(f"Malicious content detected: {matches}")
# Block malicious content
self.set_icap_response(403)
else:
self.set_icap_response(204)
else:
self.set_icap_response(204)
def reqmod(self):
"""Request modification - scan uploaded files"""
self.set_icap_response(200)
body = self.encapsulated_req_body
if body:
is_clean, matches = self.scanner.scan_data(body, 'upload')
if not is_clean:
logger.warning(f"Upload blocked: {matches}")
self.set_icap_response(403)
else:
self.set_icap_response(204)
else:
self.set_icap_response(204)
if __name__ == '__main__':
# Initialize scanner
scanner = YARAScanner('./yara_rules/malware.yar')
YARAICAPHandler.scanner = scanner
# Start server
server = ICAPServer(('0.0.0.0', 1344), YARAICAPHandler)
logger.info("ICAP Server with YARA started on port 1344")
server.serve_forever()
Multi-Rule Scanning
import os
import yara
class AdvancedYARAScanner:
def __init__(self, rules_dir):
self.rules_dir = rules_dir
self.rules_dict = self._compile_rules()
self.stats = {'scanned': 0, 'detected': 0}
def _compile_rules(self):
"""Compile all YARA rules from directory"""
rules = {}
for filename in os.listdir(self.rules_dir):
if filename.endswith('.yar') or filename.endswith('.yara'):
path = os.path.join(self.rules_dir, filename)
rules[filename] = yara.compile(filepath=path)
return rules
def scan_all_rulesets(self, data):
"""Scan data against all rule sets"""
all_matches = {}
for name, rule in self.rules_dict.items():
matches = rule.match(data=data)
if matches:
all_matches[name] = [m.rule for m in matches]
self.stats['detected'] += 1
self.stats['scanned'] += 1
return all_matches
def get_stats(self):
"""Return scanning statistics"""
return self.stats
Scanning & Detection
Scan Files
# Scan single file
yara ./yara_rules/malware.yar /path/to/file
# Scan directory recursively
yara -r ./yara_rules/malware.yar /path/to/directory
# Scan with multiple rule files
yara ./yara_rules/malware.yar ./yara_rules/suspicious.yar /file
# Output JSON results
yara -j ./yara_rules/malware.yar /file
# Show matching strings
yara -s ./yara_rules/malware.yar /file
# Scan with custom identifier
yara -d identifier=value ./yara_rules/malware.yar /file
# Timeout per file (seconds)
yara --scan-timeout 10 ./yara_rules/malware.yar /file
Python Scanning Script
#!/usr/bin/env python3
"""
Scan files/directories with YARA rules
"""
import yara
import os
import json
from pathlib import Path
def scan_directory(yara_rule_file, target_dir, extensions=None):
"""Recursively scan directory"""
rules = yara.compile(filepath=yara_rule_file)
results = {}
for root, dirs, files in os.walk(target_dir):
for filename in files:
filepath = os.path.join(root, filename)
if extensions:
if not any(filename.endswith(ext) for ext in extensions):
continue
try:
matches = rules.match(filepath=filepath)
if matches:
results[filepath] = [m.rule for m in matches]
except Exception as e:
print(f"Error scanning {filepath}: {e}")
return results
# Example usage
results = scan_directory('./yara_rules/malware.yar', '/downloads',
extensions=['.exe', '.dll', '.pdf', '.zip'])
# Output findings
for file, detections in results.items():
print(f"[DETECTED] {file}: {', '.join(detections)}")
Network Traffic Scanning
# Capture and scan HTTP traffic with Squid ICAP
# Configure Squid to send traffic to ICAP server
# Monitor ICAP traffic
tcpdump -i any -n 'tcp port 1344' -A
# Test ICAP service
curl -i -X OPTIONS icap://localhost:1344/avscan
# Make ICAP request with file body
curl -i -X POST icap://localhost:1344/avscan \
-H "Encapsulation: req-body=0, res-body=null" \
--data-binary "@suspicious_file.exe"
Rule Development & Management
Create Custom Rules
# Interactive rule editor
yara-editor
# Test rule before deployment
yara --print-stats ./new_rule.yar /test_samples/
# Validate rule syntax
yara -d test=1 ./new_rule.yar > /dev/null && echo "Valid" || echo "Invalid"
Rule Examples
# Detect suspicious PowerShell scripts
rule Suspicious_PowerShell {
meta:
description = "Detects obfuscated PowerShell code"
severity = "high"
strings:
$ps_header = "powershell" nocase
$encodedcmd = "System.Text.Encoding" nocase
$base64 = /FromBase64String|DownloadString|Invoke-Expression/ nocase
condition:
$ps_header and any of ($*)
}
# Detect potential ransomware
rule Potential_Ransomware {
meta:
description = "Checks for file encryption patterns"
strings:
$aes = { 6A 40 [0-5] FF 15 }
$wcry = "wcry"
$bitcoin = "bitcoin" nocase
condition:
any of them
}
# Detect cryptocurrency miners
rule Crypto_Miner {
meta:
description = "Detects mining pool connections"
strings:
$pool1 = "stratum.mining.pool.com"
$pool2 = "minergate.com"
$pool3 = "nanopool.org"
$getwork = "getwork"
condition:
any of ($pool*) or $getwork
}
Update Rules
# Download latest signatures
wget https://github.com/Yara-Rules/rules/archive/master.zip
unzip -o master.zip -d ./yara_rules
# Merge rule files
cat ./yara_rules/*.yar > ./yara_rules/combined.yar
# Remove duplicates
sort -u ./yara_rules/combined.yar > ./yara_rules/deduplicated.yar
Monitoring & Logging
Server Monitoring
# Check ICAP service status
systemctl status icap-yara
# View real-time logs
tail -f ./icap_server.log
# Monitor process
top -p $(pgrep -f icap)
# Check port availability
netstat -tlnp | grep 1344
lsof -i :1344
# Monitor ICAP connections
tcpdump -i any -n 'tcp port 1344'
# Get server stats
curl icap://localhost:1344/stats
Log Analysis
# Count detections
grep -c "YARA match" ./icap_server.log
# List detected rules
grep "YARA match" ./icap_server.log | awk -F'rule=' '{print $2}' | sort | uniq -c
# Show latest threats
tail -20 ./icap_server.log | grep -i "malicious\|detected"
# Parse JSON logs
grep "^{" ./icap_server.log | jq '.detection_type, .filename'
# Export statistics
grep "Detection:" ./icap_server.log | wc -l > detection_stats.txt
Python Logging
import logging
import json
from datetime import datetime
class ScanLogger:
def __init__(self, logfile):
self.logger = logging.getLogger(__name__)
handler = logging.FileHandler(logfile)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_detection(self, filename, matches, severity="HIGH"):
"""Log malware detection"""
entry = {
'timestamp': datetime.now().isoformat(),
'filename': filename,
'severity': severity,
'detections': matches
}
self.logger.warning(json.dumps(entry))
def log_scan(self, filename, result, duration):
"""Log scan completion"""
entry = {
'timestamp': datetime.now().isoformat(),
'filename': filename,
'result': result,
'duration_ms': duration
}
self.logger.info(json.dumps(entry))
Troubleshooting
Common Issues
YARA rules not loading
# Validate rule syntax
yara -d test=1 ./yara_rules/malware.yar
# Check file permissions
ls -la ./yara_rules/
# Test with simple rule
echo 'rule test { strings: $a = "test" condition: $a }' > test.yar
yara test.yar test.txt
# Recompile rules
python3 -c "import yara; rules = yara.compile(filepath='./yara_rules/malware.yar')"
ICAP server not responding
# Check if server is running
pgrep -f icap
ps aux | grep icap
# Check port availability
lsof -i :1344
netstat -tlnp | grep 1344
# Start in debug mode
python3 icap_server.py --debug
# Check for port conflicts
sudo netstat -tulpn | grep 1344
High false positives
# Review detected files
ls -la ./quarantine/
# Create whitelist rules
cat > whitelist.yar << 'EOF'
rule Whitelist {
strings:
$safe = "c:\\windows\\system32" nocase
condition:
$safe
}
EOF
# Exclude rule during scan
yara --exclude-rules Whitelist ./yara_rules/malware.yar /file
Performance issues
# Reduce file size limit
# Edit config: max_file_size: 26214400 # 25MB
# Increase threads
# Edit config: threads: 8
# Monitor resource usage
watch -n 1 'top -bn1 | grep icap'
# Profile scan times
python3 -m cProfile -s cumulative icap_server.py
Debug Script
#!/usr/bin/env python3
"""Debug and validate ICAP+YARA setup"""
import yara
import os
import socket
import sys
def check_dependencies():
"""Verify required packages"""
try:
import yara
print("✓ YARA installed")
except ImportError:
print("✗ YARA not installed")
return False
try:
import pyicap
print("✓ pyICAPServer installed")
except ImportError:
print("✗ pyICAPServer not installed")
return False
return True
def validate_rules(rules_dir):
"""Check YARA rules"""
if not os.path.exists(rules_dir):
print(f"✗ Rules directory not found: {rules_dir}")
return False
for f in os.listdir(rules_dir):
if f.endswith('.yar'):
try:
yara.compile(filepath=os.path.join(rules_dir, f))
print(f"✓ Valid rule: {f}")
except Exception as e:
print(f"✗ Invalid rule {f}: {e}")
return False
return True
def check_port(port=1344):
"""Test port availability"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
if result == 0:
print(f"✓ Port {port} available")
return True
else:
print(f"✗ Port {port} in use")
return False
if __name__ == '__main__':
print("=== ICAP+YARA Diagnostic ===\n")
print("1. Checking dependencies...")
if not check_dependencies():
sys.exit(1)
print("\n2. Validating YARA rules...")
if not validate_rules('./yara_rules'):
sys.exit(1)
print("\n3. Checking port...")
check_port()
print("\n✓ All checks passed!")
Performance Tuning
Optimize Scanning
# Skip large files
yara --max-file-size 52428800 ./rules.yar /path
# Parallel scanning (Python)
python3 << 'EOF'
from concurrent.futures import ThreadPoolExecutor
import yara
import os
rules = yara.compile(filepath='./rules.yar')
def scan_file(filepath):
return rules.match(filepath=filepath)
files = [f for f in os.listdir('/target') if os.path.isfile(f)]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(scan_file, files))
EOF
# Cache compiled rules
cp ./rules.yar ./rules.yarc
# Load cached rules in code: yara.compile(filepath='./rules.yarc')
Reduce False Positives
# Create exceptions.yar for safe files
cat > exceptions.yar << 'EOF'
rule Whitelist_System_Files {
strings:
$win = "c:\\windows" nocase
$prog = "c:\\program files" nocase
condition:
any of them
}
EOF
# Exclude during scan
yara --exclude-rules Whitelist_System_Files rules.yar /target
Batch Processing
#!/usr/bin/env python3
"""Batch scan with performance tracking"""
import yara
import time
import os
from pathlib import Path
class BatchScanner:
def __init__(self, rules_file, batch_size=100):
self.rules = yara.compile(filepath=rules_file)
self.batch_size = batch_size
self.stats = {'total': 0, 'detections': 0, 'time': 0}
def scan_directory(self, directory):
"""Scan in batches"""
files = list(Path(directory).rglob('*'))
start = time.time()
for i in range(0, len(files), self.batch_size):
batch = files[i:i+self.batch_size]
for f in batch:
if f.is_file():
matches = self.rules.match(filepath=str(f))
self.stats['total'] += 1
if matches:
self.stats['detections'] += 1
self.stats['time'] = time.time() - start
return self.stats
# Usage
scanner = BatchScanner('./rules.yar', batch_size=50)
stats = scanner.scan_directory('/target_dir')
print(f"Scanned: {stats['total']}, Detections: {stats['detections']}, Time: {stats['time']:.2f}s")
Integration & Deployment
Systemd Service
# /etc/systemd/system/icap-yara.service
[Unit]
Description=ICAP Server with YARA Malware Detection
After=network.target
[Service]
Type=simple
User=icap
WorkingDirectory=/opt/icap-yara
ExecStart=/usr/bin/python3 /opt/icap-yara/icap_server.py
Restart=always
RestartSec=10
# Resource limits
MemoryMax=2G
CPUQuota=50%
[Install]
WantedBy=multi-user.target
# Install service
sudo cp icap-yara.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable icap-yara
sudo systemctl start icap-yara
Docker Container
FROM python:3.11-slim
RUN apt-get update && apt-get install -y \
libyara-dev yara && \
rm -rf /var/lib/apt/lists/*
WORKDIR /icap
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 1344
CMD ["python3", "icap_server.py"]
# Build and run
docker build -t icap-yara .
docker run -d -p 1344:1344 \
-v $(pwd)/yara_rules:/icap/yara_rules \
-v $(pwd)/quarantine:/icap/quarantine \
--name icap-yara icap-yara
Squid Integration
# Configure Squid to use ICAP
cat >> /etc/squid/squid.conf << 'EOF'
icap_enable on
icap_service respmod_service respmod_precache bypass=0 icap://127.0.0.1:1344/respmod
icap_service reqmod_service reqmod_precache bypass=0 icap://127.0.0.1:1344/reqmod
adapt_service_set req_side reqmod_service
adapt_service_set resp_side respmod_service
EOF
# Reload Squid
sudo systemctl reload squid
Real-World Workflows
Deploy Enterprise Scanning
#!/bin/bash
# Deploy ICAP+YARA across Squid proxies
set -e
# Configuration
ICAP_PORT=1344
YARA_RULES_GIT="https://github.com/Yara-Rules/rules.git"
DEPLOY_DIR="/opt/icap-yara"
# 1. Setup
echo "[*] Installing dependencies..."
sudo apt install -y python3 python3-venv libyara-dev yara squid
# 2. Clone repository
echo "[*] Setting up YARA rules..."
mkdir -p $DEPLOY_DIR
cd $DEPLOY_DIR
git clone $YARA_RULES_GIT yara-rules-repo || true
# 3. Install Python packages
python3 -m venv venv
source venv/bin/activate
pip install python-icap yara-python requests
# 4. Deploy systemd service
sudo tee /etc/systemd/system/icap-yara.service > /dev/null << 'EOF'
[Unit]
Description=ICAP YARA Server
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/icap-yara
ExecStart=/opt/icap-yara/venv/bin/python3 icap_server.py
Restart=always
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable icap-yara
sudo systemctl start icap-yara
echo "[+] Deployment complete. ICAP running on port $ICAP_PORT"
Log Analysis & Reporting
#!/usr/bin/env python3
"""Generate detection reports"""
import json
import re
from datetime import datetime, timedelta
from collections import Counter
def parse_logs(logfile):
"""Parse ICAP logs"""
detections = []
with open(logfile, 'r') as f:
for line in f:
if 'YARA match' in line or 'DETECTED' in line:
detections.append(line)
return detections
def generate_report(logfile):
"""Create HTML report"""
detections = parse_logs(logfile)
rule_counts = Counter()
for d in detections:
match = re.search(r'rule=(\w+)', d)
if match:
rule_counts[match.group(1)] += 1
html = f"""
<html>
<body>
<h1>ICAP-YARA Detection Report</h1>
<p>Generated: {datetime.now()}</p>
<h2>Summary</h2>
<p>Total detections: {len(detections)}</p>
<h2>Top Rules</h2>
<ul>
"""
for rule, count in rule_counts.most_common(10):
html += f"<li>{rule}: {count}</li>\n"
html += """
</ul>
</body>
</html>
"""
with open('report.html', 'w') as f:
f.write(html)
if __name__ == '__main__':
generate_report('./icap_server.log')
print("Report generated: report.html")
Best Practices
Security Hardening
# Run with restricted permissions
sudo useradd -r -s /bin/false icap
sudo chown -R icap:icap /opt/icap-yara
# Use read-only rule files
sudo chmod 644 ./yara_rules/*
# Isolate in container
docker run --read-only --cap-drop=ALL --net=host icap-yara
# Encrypt ICAP traffic
# Use ICAP over TLS (ICAPS)
Rule Management
# Keep rules updated
cron: 0 2 * * * cd /opt/icap-yara && git pull origin main
# Validate before deployment
yara -d test=1 ./yara_rules/* test_samples/*
# Archive old detections
find ./quarantine -type f -mtime +30 -exec gzip {} \;
tar -czf quarantine_backup_$(date +%Y%m%d).tar.gz ./quarantine/
# Test new rules in isolated environment
docker run -v $(pwd)/test_rules:/rules icap-yara:test
Performance Best Practices
# Monitor resource usage
systemctl status icap-yara --no-pager
journalctl -u icap-yara -f
# Tune thread pool
# config: threads: $(nproc) / 2
# Use SSD for quarantine directory
mount -o defaults,relatime /dev/nvme0n1 /quarantine
# Implement rate limiting in ICAP handler
# Limit to 100 requests/sec per source IP
Resources
ICAP & YARA Documentation
Python Libraries
Related Tools
- Squid Proxy - HTTP caching proxy
- ClamAV - Antivirus engine
- Suricata - Network IDS/IPS
- OSSEC - Host-based IDS
Last updated: 2025-03-30