Logisch Cheatsheet
kopieren
generieren
Logstash ist eine leistungsstarke Datenverarbeitungs-Pipeline, die Daten aus mehreren Quellen einnimmt, transformiert sie und sendet sie zu Ihrem Lieblings "stash" wie Elasticsearch. Es ist Teil des Elastischen Stacks und zeichnet sich durch Parsing, Filtern und Anreichern von Logdaten für Analyse und Visualisierung aus.
## Installation und Inbetriebnahme
### Installation von Paketen
**Ubuntu/Debian:**
```bash
# Import Elasticsearch GPG key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch|sudo apt-key add -
# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list
# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
```_
**CentOS/RHEL:**
```bash
# Import GPG key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# Create repository file
cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# Install Logstash
sudo yum install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
```_
### Docker Installation
**Docker Compose Setup:**
```yaml
version: '3.8'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xmx1g -Xms1g"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
ports:
- "5044:5044"
- "9600:9600"
networks:
- elastic
```_
## Konfigurationsgrundsätze
### Pipeline Konfiguration Struktur
**Basic Pipeline (logstash.conf):**
```ruby
input \\\\{
# Input plugins
\\\\}
filter \\\\{
# Filter plugins
\\\\}
output \\\\{
# Output plugins
\\\\}
```_
** Hauptkonfiguration (logstash.yml):**
```yaml
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb
```_
## Input Plugins
### Datei Input
**Basic Dateieingang:**
```ruby
input \\\\{
file \\\\{
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "plain"
\\\\}
\\\\}
```_
**Erweiterte Dateieingabe:**
```ruby
input \\\\{
file \\\\{
path => ["/var/log/*.log", "/var/log/app/*.log"]
exclude => "*.gz"
start_position => "end"
sincedb_path => "/var/lib/logstash/sincedb"
discover_interval => 15
stat_interval => 1
codec => multiline \\\\{
pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}"
negate => true
what => "previous"
\\\\}
add_field => \\\\{ "log_source" => "application" \\\\}
tags => ["application", "production"]
\\\\}
\\\\}
```_
### Beats Input
**Filebeat Eingang:**
```ruby
input \\\\{
beats \\\\{
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_verify_mode => "force_peer"
ssl_peer_metadata => true
\\\\}
\\\\}
```_
### Syslog Input
**Syslog UDP Eingabe:**
```ruby
input \\\\{
syslog \\\\{
port => 514
type => "syslog"
codec => cef
\\\\}
\\\\}
```_
**Syslog TCP Eingabe:**
```ruby
input \\\\{
tcp \\\\{
port => 514
type => "syslog"
codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\}
\\\\}
\\\\}
```_
### HTTP Input
**HTTP Webhook:**
```ruby
input \\\\{
http \\\\{
port => 8080
codec => json
additional_codecs => \\\\{
"application/json" => "json"
"text/plain" => "plain"
\\\\}
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
\\\\}
\\\\}
```_
## Filter Plugins
### Grobfilter
**Basic Grok Patterns:**
```ruby
filter \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\}
```_
**Custom Grok Patterns:**
```ruby
filter \\\\{
grok \\\\{
patterns_dir => ["/etc/logstash/patterns"]
match => \\\\{
"message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}"
\\\\}
add_field => \\\\{ "parsed" => "true" \\\\}
tag_on_failure => ["_grokparsefailure"]
\\\\}
\\\\}
```_
** Multiple Grok Muster:**
```ruby
filter \\\\{
grok \\\\{
match => \\\\{
"message" => [
"%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORHOST:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}",
"%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}",
"%\\\\{GREEDYDATA:message\\\\}"
]
\\\\}
break_on_match => true
\\\\}
\\\\}
```_
### Datum Filter
**Parse Timestamps:**
```ruby
filter \\\\{
date \\\\{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
timezone => "UTC"
\\\\}
\\\\}
```_
**Multiple Date Formate:**
```ruby
filter \\\\{
date \\\\{
match => [
"timestamp",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"MMM dd HH:mm:ss"
]
target => "@timestamp"
locale => "en"
\\\\}
\\\\}
```_
### Mutate Filter
**Field Manipulation:**
```ruby
filter \\\\{
mutate \\\\{
# Add fields
add_field => \\\\{
"environment" => "production"
"processed_by" => "logstash"
\\\\}
# Remove fields
remove_field => [ "host", "agent" ]
# Rename fields
rename => \\\\{ "old_field" => "new_field" \\\\}
# Convert field types
convert => \\\\{
"response_time" => "float"
"status_code" => "integer"
\\\\}
# String operations
lowercase => [ "method" ]
uppercase => [ "level" ]
strip => [ "message" ]
gsub => [ "message", "/", "_" ]
\\\\}
\\\\}
```_
### JSON Filter
**Parse JSON:**
```ruby
filter \\\\{
json \\\\{
source => "message"
target => "parsed_json"
skip_on_invalid_json => true
\\\\}
\\\\}
```_
### CSV Filter
**Parse CSV Daten:**
```ruby
filter \\\\{
csv \\\\{
separator => ","
columns => [ "timestamp", "level", "component", "message" ]
skip_header => true
convert => \\\\{ "timestamp" => "date" \\\\}
\\\\}
\\\\}
```_
### Bedingte Verarbeitung
** Zusätzliche Filter:**
```ruby
filter \\\\{
if [type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\} else if [type] == "nginx" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\}
\\\\}
\\\\}
if [status] >= 400 \\\\{
mutate \\\\{
add_tag => [ "error" ]
\\\\}
\\\\}
if "error" in [tags] \\\\{
mutate \\\\{
add_field => \\\\{ "alert_level" => "high" \\\\}
\\\\}
\\\\}
\\\\}
```_
## Plugins ausgeben
### Elasticsearch Output
**Basic Elasticsearch Ausgabe:**
```ruby
output \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
```_
**Advanced Elasticsearch Ausgabe:**
```ruby
output \\\\{
elasticsearch \\\\{
hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
template_name => "logstash"
template_pattern => "logstash-*"
template_overwrite => true
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
user => "logstash_writer"
password => "password"
retry_on_conflict => 3
action => "index"
\\\\}
\\\\}
```_
### Dateiausgang
**Dateiausgang:**
```ruby
output \\\\{
file \\\\{
path => "/var/log/logstash/output.log"
codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\}
flush_interval => 10
\\\\}
\\\\}
```_
### Kafka Ausgabe
**Kafka Ausgabe:**
```ruby
output \\\\{
kafka \\\\{
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "logstash-logs"
codec => json
compression_type => "gzip"
batch_size => 100
linger_ms => 10
\\\\}
\\\\}
```_
### Bedingte Ausgänge
** Multiple Outputs:**
```ruby
output \\\\{
if [type] == "error" \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "errors-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
email \\\\{
to => "admin@company.com"
subject => "Error Alert: %\\\\{message\\\\}"
body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}"
\\\\}
\\\\} else \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "logs-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
\\\\}
```_
## Pipeline Management
### Mehrere Pipelines
**pipelines.yml Konfiguration:**
```yaml
- pipeline.id: apache-logs
path.config: "/etc/logstash/conf.d/apache.conf"
pipeline.workers: 2
pipeline.batch.size: 125
- pipeline.id: nginx-logs
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 1
pipeline.batch.size: 50
- pipeline.id: application-logs
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 4
pipeline.batch.size: 200
```_
### Pipeline-zu-Pipeline-Kommunikation
**Sending Pipeline:**
```ruby
output \\\\{
pipeline \\\\{
send_to => ["processing-pipeline"]
\\\\}
\\\\}
```_
**Receiving Pipeline:**
```ruby
input \\\\{
pipeline \\\\{
address => "processing-pipeline"
\\\\}
\\\\}
```_
## Befehlsleitung
### Service Management
** Service Control:**
```bash
# Start Logstash
sudo systemctl start logstash
# Stop Logstash
sudo systemctl stop logstash
# Restart Logstash
sudo systemctl restart logstash
# Check status
sudo systemctl status logstash
# View logs
sudo journalctl -u logstash -f
```_
### Konfigurationsprüfung
** Testkonfiguration:**
```bash
# Test configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t
# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug
```_
### Manuelle Ausführung
**Run Logstash Manuell: **
```bash
# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
# Run with inline config
/usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}'
# Run with additional JVM options
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf
```_
## Überwachung und Debugging
### API Monitoring
** Keine Stats:**
```bash
# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"
# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"
# Get process stats
curl -X GET "localhost:9600/_node/stats/process?pretty"
```_
**Pipeline Management: **
```bash
# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"
# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"
# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"
```_
### Analyse der Ergebnisse
**Debug Logging: **
```bash
# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties
# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties
```_
**Performance Monitoring: **
```bash
# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats"
# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log
# Monitor memory usage
ps aux|grep logstash
jstat -gc $(pgrep -f logstash)
```_
## Leistung Tuning
### JVM Tuning
**JVM Einstellungen (jvm.options):**
```bash
# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g
# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap
# Debugging options
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
```_
### Pipeline Optimierung
**Leistungseinstellungen:**
```yaml
# Pipeline workers (number of CPU cores)
pipeline.workers: 8
# Batch processing
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
```_
### Filteroptimierung
**Effiziente Filter:**
```ruby
filter \\\\{
# Use conditionals to avoid unnecessary processing
if [type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\}
# Use break_on_match for multiple patterns
grok \\\\{
match => \\\\{
"message" => [
"%\\\\{PATTERN1\\\\}",
"%\\\\{PATTERN2\\\\}"
]
\\\\}
break_on_match => true
\\\\}
# Remove unnecessary fields early
mutate \\\\{
remove_field => [ "host", "agent", "@version" ]
\\\\}
\\\\}
```_
## Sicherheitskonfiguration
### SSL/TLS Setup
**Input SSL Konfiguration:**
```ruby
input \\\\{
beats \\\\{
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "force_peer"
\\\\}
\\\\}
```_
**Output SSL Konfiguration:**
```ruby
output \\\\{
elasticsearch \\\\{
hosts => ["https://elasticsearch:9200"]
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/etc/logstash/certs/client.crt"
ssl_key => "/etc/logstash/certs/client.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
\\\\}
\\\\}
```_
### Authentication
**Elasticsearch Authentication:**
```ruby
output \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
user => "logstash_writer"
password => "$\\\\{LOGSTASH_PASSWORD\\\\}"
index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
```_
## Fehlerbehebung
### Gemeinsame Themen
**Pipeline startet nicht:**
```bash
# Check configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash -t
# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/
# Check Java version
java -version
```_
**Leistungsfragen:**
```bash
# Monitor resource usage
top -p $(pgrep -f logstash)
iostat -x 1
# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log
```_
**Memory Issues:**
```bash
# Check heap usage
jstat -gc $(pgrep -f logstash)
# Monitor garbage collection
tail -f /var/log/logstash/gc.log
# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.options
echo "-Xms4g" >> /etc/logstash/jvm.options
```_
## Integrationsbeispiele
### ELK Stack Integration
**Komplett ELK Pipeline:**
```ruby
input \\\\{
beats \\\\{
port => 5044
\\\\}
\\\\}
filter \\\\{
if [@metadata][beat] == "filebeat" \\\\{
if [fields][log_type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
date \\\\{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
\\\\}
\\\\}
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["elasticsearch:9200"]
index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
```_
### Integration von Kafka
**Kafka zu Elastikforschung:**
```ruby
input \\\\{
kafka \\\\{
bootstrap_servers => "kafka:9092"
topics => ["logs"]
group_id => "logstash"
consumer_threads => 4
codec => json
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [ "timestamp", "ISO8601" ]
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["elasticsearch:9200"]
index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
```_
Dieses umfassende Logstash-Catsheet umfasst Installation, Konfiguration, Pipeline-Management und erweiterte Funktionen für eine effektive Protokollverarbeitung und Datentransformation.