Logstash Cheatsheet¶
Logstash is a powerful data processing pipeline that ingests data from multiple sources, transforms it, and sends it to your favorite "stash" like Elasticsearch. It's part of the Elastic Stack and excels at parsing, filtering, and enriching log data for analysis and visualization.
## Installation and Setup
### Package Installation
**Ubuntu/Debian:**
**CentOS/RHEL:**
### Docker Installation
**Docker Compose Setup:**
## Configuration Basics
### Pipeline Configuration Structure
**Basic Pipeline (logstash.conf):**
**Main Configuration (logstash.yml):**
## Input Plugins
### File Input
**Basic File Input:**
**Advanced File Input:**
### Beats Input
**Filebeat Input:**
### Syslog Input
**Syslog UDP Input:**
**Syslog TCP Input:**
### HTTP Input
**HTTP Webhook:**
## Filter Plugins
### Grok Filter
**Basic Grok Patterns:**
**Custom Grok Patterns:**
**Multiple Grok Patterns:**
### Date Filter
**Parse Timestamps:**
**Multiple Date Formats:**
### Mutate Filter
**Field Manipulation:**
### JSON Filter
**Parse JSON:**
### CSV Filter
**Parse CSV Data:**
### Conditional Processing
**Conditional Filters:**
## Output Plugins
### Elasticsearch Output
**Basic Elasticsearch Output:**
**Advanced Elasticsearch Output:**
### File Output
**File Output:**
### Kafka Output
**Kafka Output:**
### Conditional Outputs
**Multiple Outputs:**
## Pipeline Management
### Multiple Pipelines
**pipelines.yml Configuration:**
### Pipeline-to-Pipeline Communication
**Sending Pipeline:**
**Receiving Pipeline:**
## Command Line Operations
### Service Management
**Service Control:**
### Configuration Testing
**Test Configuration:**
### Manual Execution
**Run Logstash Manually:**
## Monitoring and Debugging
### API Monitoring
**Node Stats:**
**Pipeline Management:**
### Log Analysis
**Debug Logging:**
**Performance Monitoring:**
## Performance Tuning
### JVM Tuning
**JVM Settings (jvm.options):**
### Pipeline Optimization
**Performance Settings:**
### Filter Optimization
**Efficient Filtering:**
## Security Configuration
### SSL/TLS Setup
**Input SSL Configuration:**
**Output SSL Configuration:**
### Authentication
**Elasticsearch Authentication:**
## Troubleshooting
### Common Issues
**Pipeline Not Starting:**
**Performance Issues:**
**Memory Issues:**
## Integration Examples
### ELK Stack Integration
**Complete ELK Pipeline:**
### Kafka Integration
**Kafka to Elasticsearch:**
# Import Elasticsearch GPG key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch|sudo apt-key add -
# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list
# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
# Import GPG key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# Create repository file
cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# Install Logstash
sudo yum install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
version: '3.8'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xmx1g -Xms1g"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
ports:
- "5044:5044"
- "9600:9600"
networks:
- elastic
input \\\\{
# Input plugins
\\\\}
filter \\\\{
# Filter plugins
\\\\}
output \\\\{
# Output plugins
\\\\}
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb
input \\\\{
file \\\\{
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "plain"
\\\\}
\\\\}
input \\\\{
file \\\\{
path => ["/var/log/*.log", "/var/log/app/*.log"]
exclude => "*.gz"
start_position => "end"
sincedb_path => "/var/lib/logstash/sincedb"
discover_interval => 15
stat_interval => 1
codec => multiline \\\\{
pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}"
negate => true
what => "previous"
\\\\}
add_field => \\\\{ "log_source" => "application" \\\\}
tags => ["application", "production"]
\\\\}
\\\\}
input \\\\{
beats \\\\{
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_verify_mode => "force_peer"
ssl_peer_metadata => true
\\\\}
\\\\}
input \\\\{
tcp \\\\{
port => 514
type => "syslog"
codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\}
\\\\}
\\\\}
input \\\\{
http \\\\{
port => 8080
codec => json
additional_codecs => \\\\{
"application/json" => "json"
"text/plain" => "plain"
\\\\}
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
\\\\}
\\\\}
filter \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\}
filter \\\\{
grok \\\\{
patterns_dir => ["/etc/logstash/patterns"]
match => \\\\{
"message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}"
\\\\}
add_field => \\\\{ "parsed" => "true" \\\\}
tag_on_failure => ["_grokparsefailure"]
\\\\}
\\\\}
filter \\\\{
grok \\\\{
match => \\\\{
"message" => [
"%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORHOST:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}",
"%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}",
"%\\\\{GREEDYDATA:message\\\\}"
]
\\\\}
break_on_match => true
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
timezone => "UTC"
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [
"timestamp",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"MMM dd HH:mm:ss"
]
target => "@timestamp"
locale => "en"
\\\\}
\\\\}
filter \\\\{
mutate \\\\{
# Add fields
add_field => \\\\{
"environment" => "production"
"processed_by" => "logstash"
\\\\}
# Remove fields
remove_field => [ "host", "agent" ]
# Rename fields
rename => \\\\{ "old_field" => "new_field" \\\\}
# Convert field types
convert => \\\\{
"response_time" => "float"
"status_code" => "integer"
\\\\}
# String operations
lowercase => [ "method" ]
uppercase => [ "level" ]
strip => [ "message" ]
gsub => [ "message", "/", "_" ]
\\\\}
\\\\}
filter \\\\{
json \\\\{
source => "message"
target => "parsed_json"
skip_on_invalid_json => true
\\\\}
\\\\}
filter \\\\{
csv \\\\{
separator => ","
columns => [ "timestamp", "level", "component", "message" ]
skip_header => true
convert => \\\\{ "timestamp" => "date" \\\\}
\\\\}
\\\\}
filter \\\\{
if [type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\} else if [type] == "nginx" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\}
\\\\}
\\\\}
if [status] >= 400 \\\\{
mutate \\\\{
add_tag => [ "error" ]
\\\\}
\\\\}
if "error" in [tags] \\\\{
mutate \\\\{
add_field => \\\\{ "alert_level" => "high" \\\\}
\\\\}
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
template_name => "logstash"
template_pattern => "logstash-*"
template_overwrite => true
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
user => "logstash_writer"
password => "password"
retry_on_conflict => 3
action => "index"
\\\\}
\\\\}
output \\\\{
file \\\\{
path => "/var/log/logstash/output.log"
codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\}
flush_interval => 10
\\\\}
\\\\}
output \\\\{
kafka \\\\{
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "logstash-logs"
codec => json
compression_type => "gzip"
batch_size => 100
linger_ms => 10
\\\\}
\\\\}
output \\\\{
if [type] == "error" \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "errors-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
email \\\\{
to => "admin@company.com"
subject => "Error Alert: %\\\\{message\\\\}"
body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}"
\\\\}
\\\\} else \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "logs-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
\\\\}
- pipeline.id: apache-logs
path.config: "/etc/logstash/conf.d/apache.conf"
pipeline.workers: 2
pipeline.batch.size: 125
- pipeline.id: nginx-logs
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 1
pipeline.batch.size: 50
- pipeline.id: application-logs
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 4
pipeline.batch.size: 200
# Start Logstash
sudo systemctl start logstash
# Stop Logstash
sudo systemctl stop logstash
# Restart Logstash
sudo systemctl restart logstash
# Check status
sudo systemctl status logstash
# View logs
sudo journalctl -u logstash -f
# Test configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t
# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug
# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
# Run with inline config
/usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}'
# Run with additional JVM options
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf
# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"
# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"
# Get process stats
curl -X GET "localhost:9600/_node/stats/process?pretty"
# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"
# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"
# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"
# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties
# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties
# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats"
# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log
# Monitor memory usage
ps aux|grep logstash
jstat -gc $(pgrep -f logstash)
# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g
# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap
# Debugging options
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
# Pipeline workers (number of CPU cores)
pipeline.workers: 8
# Batch processing
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
filter \\\\{
# Use conditionals to avoid unnecessary processing
if [type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\}
# Use break_on_match for multiple patterns
grok \\\\{
match => \\\\{
"message" => [
"%\\\\{PATTERN1\\\\}",
"%\\\\{PATTERN2\\\\}"
]
\\\\}
break_on_match => true
\\\\}
# Remove unnecessary fields early
mutate \\\\{
remove_field => [ "host", "agent", "@version" ]
\\\\}
\\\\}
input \\\\{
beats \\\\{
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "force_peer"
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["https://elasticsearch:9200"]
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/etc/logstash/certs/client.crt"
ssl_key => "/etc/logstash/certs/client.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
user => "logstash_writer"
password => "$\\\\{LOGSTASH_PASSWORD\\\\}"
index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
# Check configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash -t
# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/
# Check Java version
java -version
# Monitor resource usage
top -p $(pgrep -f logstash)
iostat -x 1
# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log
# Check heap usage
jstat -gc $(pgrep -f logstash)
# Monitor garbage collection
tail -f /var/log/logstash/gc.log
# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.options
echo "-Xms4g" >> /etc/logstash/jvm.options
input \\\\{
beats \\\\{
port => 5044
\\\\}
\\\\}
filter \\\\{
if [@metadata][beat] == "filebeat" \\\\{
if [fields][log_type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
date \\\\{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
\\\\}
\\\\}
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["elasticsearch:9200"]
index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
input \\\\{
kafka \\\\{
bootstrap_servers => "kafka:9092"
topics => ["logs"]
group_id => "logstash"
consumer_threads => 4
codec => json
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [ "timestamp", "ISO8601" ]
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["elasticsearch:9200"]
index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
This comprehensive Logstash cheatsheet covers installation, configuration, pipeline management, and advanced features for effective log processing and data transformation.