Appearance
Logstash Cheatsheet
Logstash is a powerful data processing pipeline that ingests data from multiple sources, transforms it, and sends it to your favorite "stash" like Elasticsearch. It's part of the Elastic Stack and excels at parsing, filtering, and enriching log data for analysis and visualization.
Installation and Setup
Package Installation
Ubuntu/Debian:
bash
# Import Elasticsearch GPG key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
CentOS/RHEL:
bash
# Import GPG key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# Create repository file
cat << EOF | sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# Install Logstash
sudo yum install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
Docker Installation
Docker Compose Setup:
yaml
version: '3.8'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xmx1g -Xms1g"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
ports:
- "5044:5044"
- "9600:9600"
networks:
- elastic
Configuration Basics
Pipeline Configuration Structure
Basic Pipeline (logstash.conf):
ruby
input {
# Input plugins
}
filter {
# Filter plugins
}
output {
# Output plugins
}
Main Configuration (logstash.yml):
yaml
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb
Input Plugins
File Input
Basic File Input:
ruby
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "plain"
}
}
Advanced File Input:
ruby
input {
file {
path => ["/var/log/*.log", "/var/log/app/*.log"]
exclude => "*.gz"
start_position => "end"
sincedb_path => "/var/lib/logstash/sincedb"
discover_interval => 15
stat_interval => 1
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
add_field => { "log_source" => "application" }
tags => ["application", "production"]
}
}
Beats Input
Filebeat Input:
ruby
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_verify_mode => "force_peer"
ssl_peer_metadata => true
}
}
Syslog Input
Syslog UDP Input:
ruby
input {
syslog {
port => 514
type => "syslog"
codec => cef
}
}
Syslog TCP Input:
ruby
input {
tcp {
port => 514
type => "syslog"
codec => line { format => "%{message}" }
}
}
HTTP Input
HTTP Webhook:
ruby
input {
http {
port => 8080
codec => json
additional_codecs => {
"application/json" => "json"
"text/plain" => "plain"
}
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
}
}
Filter Plugins
Grok Filter
Basic Grok Patterns:
ruby
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
Custom Grok Patterns:
ruby
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
}
add_field => { "parsed" => "true" }
tag_on_failure => ["_grokparsefailure"]
}
}
Multiple Grok Patterns:
ruby
filter {
grok {
match => {
"message" => [
"%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:server} %{PROG:program}: %{GREEDYDATA:message}",
"%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:message}",
"%{GREEDYDATA:message}"
]
}
break_on_match => true
}
}
Date Filter
Parse Timestamps:
ruby
filter {
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
timezone => "UTC"
}
}
Multiple Date Formats:
ruby
filter {
date {
match => [
"timestamp",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"MMM dd HH:mm:ss"
]
target => "@timestamp"
locale => "en"
}
}
Mutate Filter
Field Manipulation:
ruby
filter {
mutate {
# Add fields
add_field => {
"environment" => "production"
"processed_by" => "logstash"
}
# Remove fields
remove_field => [ "host", "agent" ]
# Rename fields
rename => { "old_field" => "new_field" }
# Convert field types
convert => {
"response_time" => "float"
"status_code" => "integer"
}
# String operations
lowercase => [ "method" ]
uppercase => [ "level" ]
strip => [ "message" ]
gsub => [ "message", "/", "_" ]
}
}
JSON Filter
Parse JSON:
ruby
filter {
json {
source => "message"
target => "parsed_json"
skip_on_invalid_json => true
}
}
CSV Filter
Parse CSV Data:
ruby
filter {
csv {
separator => ","
columns => [ "timestamp", "level", "component", "message" ]
skip_header => true
convert => { "timestamp" => "date" }
}
}
Conditional Processing
Conditional Filters:
ruby
filter {
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
} else if [type] == "nginx" {
grok {
match => { "message" => "%{NGINXACCESS}" }
}
}
if [status] >= 400 {
mutate {
add_tag => [ "error" ]
}
}
if "error" in [tags] {
mutate {
add_field => { "alert_level" => "high" }
}
}
}
Output Plugins
Elasticsearch Output
Basic Elasticsearch Output:
ruby
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
Advanced Elasticsearch Output:
ruby
output {
elasticsearch {
hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
template_name => "logstash"
template_pattern => "logstash-*"
template_overwrite => true
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
user => "logstash_writer"
password => "password"
retry_on_conflict => 3
action => "index"
}
}
File Output
File Output:
ruby
output {
file {
path => "/var/log/logstash/output.log"
codec => line { format => "%{timestamp} %{level} %{message}" }
flush_interval => 10
}
}
Kafka Output
Kafka Output:
ruby
output {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "logstash-logs"
codec => json
compression_type => "gzip"
batch_size => 100
linger_ms => 10
}
}
Conditional Outputs
Multiple Outputs:
ruby
output {
if [type] == "error" {
elasticsearch {
hosts => ["localhost:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
email {
to => "admin@company.com"
subject => "Error Alert: %{message}"
body => "Error occurred at %{@timestamp}: %{message}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
}
Pipeline Management
Multiple Pipelines
pipelines.yml Configuration:
yaml
- pipeline.id: apache-logs
path.config: "/etc/logstash/conf.d/apache.conf"
pipeline.workers: 2
pipeline.batch.size: 125
- pipeline.id: nginx-logs
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 1
pipeline.batch.size: 50
- pipeline.id: application-logs
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 4
pipeline.batch.size: 200
Pipeline-to-Pipeline Communication
Sending Pipeline:
ruby
output {
pipeline {
send_to => ["processing-pipeline"]
}
}
Receiving Pipeline:
ruby
input {
pipeline {
address => "processing-pipeline"
}
}
Command Line Operations
Service Management
Service Control:
bash
# Start Logstash
sudo systemctl start logstash
# Stop Logstash
sudo systemctl stop logstash
# Restart Logstash
sudo systemctl restart logstash
# Check status
sudo systemctl status logstash
# View logs
sudo journalctl -u logstash -f
Configuration Testing
Test Configuration:
bash
# Test configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t
# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug
Manual Execution
Run Logstash Manually:
bash
# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
# Run with inline config
/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'
# Run with additional JVM options
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf
Monitoring and Debugging
API Monitoring
Node Stats:
bash
# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"
# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"
# Get process stats
curl -X GET "localhost:9600/_node/stats/process?pretty"
Pipeline Management:
bash
# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"
# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"
# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"
Log Analysis
Debug Logging:
bash
# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties
# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties
Performance Monitoring:
bash
# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log | grep "pipeline.stats"
# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log
# Monitor memory usage
ps aux | grep logstash
jstat -gc $(pgrep -f logstash)
Performance Tuning
JVM Tuning
JVM Settings (jvm.options):
bash
# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g
# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap
# Debugging options
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
Pipeline Optimization
Performance Settings:
yaml
# Pipeline workers (number of CPU cores)
pipeline.workers: 8
# Batch processing
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
Filter Optimization
Efficient Filtering:
ruby
filter {
# Use conditionals to avoid unnecessary processing
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
# Use break_on_match for multiple patterns
grok {
match => {
"message" => [
"%{PATTERN1}",
"%{PATTERN2}"
]
}
break_on_match => true
}
# Remove unnecessary fields early
mutate {
remove_field => [ "host", "agent", "@version" ]
}
}
Security Configuration
SSL/TLS Setup
Input SSL Configuration:
ruby
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "force_peer"
}
}
Output SSL Configuration:
ruby
output {
elasticsearch {
hosts => ["https://elasticsearch:9200"]
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/etc/logstash/certs/client.crt"
ssl_key => "/etc/logstash/certs/client.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
}
}
Authentication
Elasticsearch Authentication:
ruby
output {
elasticsearch {
hosts => ["localhost:9200"]
user => "logstash_writer"
password => "${LOGSTASH_PASSWORD}"
index => "logstash-%{+YYYY.MM.dd}"
}
}
Troubleshooting
Common Issues
Pipeline Not Starting:
bash
# Check configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash -t
# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/
# Check Java version
java -version
Performance Issues:
bash
# Monitor resource usage
top -p $(pgrep -f logstash)
iostat -x 1
# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log
Memory Issues:
bash
# Check heap usage
jstat -gc $(pgrep -f logstash)
# Monitor garbage collection
tail -f /var/log/logstash/gc.log
# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.options
echo "-Xms4g" >> /etc/logstash/jvm.options
Integration Examples
ELK Stack Integration
Complete ELK Pipeline:
ruby
input {
beats {
port => 5044
}
}
filter {
if [@metadata][beat] == "filebeat" {
if [fields][log_type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
Kafka Integration
Kafka to Elasticsearch:
ruby
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["logs"]
group_id => "logstash"
consumer_threads => 4
codec => json
}
}
filter {
date {
match => [ "timestamp", "ISO8601" ]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "kafka-logs-%{+YYYY.MM.dd}"
}
}
This comprehensive Logstash cheatsheet covers installation, configuration, pipeline management, and advanced features for effective log processing and data transformation.