コンテンツにスキップ

Logstash Cheatsheet

Logstash is a powerful data processing pipeline that ingests data from multiple sources, transforms it, and sends it to your favorite “stash” like Elasticsearch. It’s part of the Elastic Stack and excels at parsing, filtering, and enriching log data for analysis and visualization.

Installation and Setup

Package Installation

Ubuntu/Debian:

# Import Elasticsearch GPG key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch|sudo apt-key add -

# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list

# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash

# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash

CentOS/RHEL:

# Import GPG key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# Create repository file
cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

# Install Logstash
sudo yum install logstash

# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash

Docker Installation

Docker Compose Setup:

version: '3.8'
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xmx1g -Xms1g"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
      - ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
    ports:
      - "5044:5044"
      - "9600:9600"
    networks:
      - elastic

Configuration Basics

Pipeline Configuration Structure

Basic Pipeline (logstash.conf):

input \\\\{
  # Input plugins
\\\\}

filter \\\\{
  # Filter plugins
\\\\}

output \\\\{
  # Output plugins
\\\\}

Main Configuration (logstash.yml):

node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb

Input Plugins

File Input

Basic File Input:

input \\\\{
  file \\\\{
    path => "/var/log/apache2/access.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => "plain"
  \\\\}
\\\\}

Advanced File Input:

input \\\\{
  file \\\\{
    path => ["/var/log/*.log", "/var/log/app/*.log"]
    exclude => "*.gz"
    start_position => "end"
    sincedb_path => "/var/lib/logstash/sincedb"
    discover_interval => 15
    stat_interval => 1
    codec => multiline \\\\{
      pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}"
      negate => true
      what => "previous"
    \\\\}
    add_field => \\\\{ "log_source" => "application" \\\\}
    tags => ["application", "production"]
  \\\\}
\\\\}

Beats Input

Filebeat Input:

input \\\\{
  beats \\\\{
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_verify_mode => "force_peer"
    ssl_peer_metadata => true
  \\\\}
\\\\}

Syslog Input

Syslog UDP Input:

input \\\\{
  syslog \\\\{
    port => 514
    type => "syslog"
    codec => cef
  \\\\}
\\\\}

Syslog TCP Input:

input \\\\{
  tcp \\\\{
    port => 514
    type => "syslog"
    codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\}
  \\\\}
\\\\}

HTTP Input

HTTP Webhook:

input \\\\{
  http \\\\{
    port => 8080
    codec => json
    additional_codecs => \\\\{
      "application/json" => "json"
      "text/plain" => "plain"
    \\\\}
    ssl => true
    ssl_certificate => "/path/to/cert.pem"
    ssl_key => "/path/to/key.pem"
  \\\\}
\\\\}

Filter Plugins

Grok Filter

Basic Grok Patterns:

filter \\\\{
  grok \\\\{
    match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
  \\\\}
\\\\}

Custom Grok Patterns:

filter \\\\{
  grok \\\\{
    patterns_dir => ["/etc/logstash/patterns"]
    match => \\\\{
      "message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}"
    \\\\}
    add_field => \\\\{ "parsed" => "true" \\\\}
    tag_on_failure => ["_grokparsefailure"]
  \\\\}
\\\\}

Multiple Grok Patterns:

filter \\\\{
  grok \\\\{
    match => \\\\{
      "message" => [
        "%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORHOST:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}",
        "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}",
        "%\\\\{GREEDYDATA:message\\\\}"
      ]
    \\\\}
    break_on_match => true
  \\\\}
\\\\}

Date Filter

Parse Timestamps:

filter \\\\{
  date \\\\{
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
    timezone => "UTC"
  \\\\}
\\\\}

Multiple Date Formats:

filter \\\\{
  date \\\\{
    match => [
      "timestamp",
      "yyyy-MM-dd HH:mm:ss",
      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
      "MMM dd HH:mm:ss"
    ]
    target => "@timestamp"
    locale => "en"
  \\\\}
\\\\}

Mutate Filter

Field Manipulation:

filter \\\\{
  mutate \\\\{
    # Add fields
    add_field => \\\\{
      "environment" => "production"
      "processed_by" => "logstash"
    \\\\}

    # Remove fields
    remove_field => [ "host", "agent" ]

    # Rename fields
    rename => \\\\{ "old_field" => "new_field" \\\\}

    # Convert field types
    convert => \\\\{
      "response_time" => "float"
      "status_code" => "integer"
    \\\\}

    # String operations
    lowercase => [ "method" ]
    uppercase => [ "level" ]
    strip => [ "message" ]
    gsub => [ "message", "/", "_" ]
  \\\\}
\\\\}

JSON Filter

Parse JSON:

filter \\\\{
  json \\\\{
    source => "message"
    target => "parsed_json"
    skip_on_invalid_json => true
  \\\\}
\\\\}

CSV Filter

Parse CSV Data:

filter \\\\{
  csv \\\\{
    separator => ","
    columns => [ "timestamp", "level", "component", "message" ]
    skip_header => true
    convert => \\\\{ "timestamp" => "date" \\\\}
  \\\\}
\\\\}

Conditional Processing

Conditional Filters:

filter \\\\{
  if [type] == "apache" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
    \\\\}
  \\\\} else if [type] == "nginx" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\}
    \\\\}
  \\\\}

  if [status] >= 400 \\\\{
    mutate \\\\{
      add_tag => [ "error" ]
    \\\\}
  \\\\}

  if "error" in [tags] \\\\{
    mutate \\\\{
      add_field => \\\\{ "alert_level" => "high" \\\\}
    \\\\}
  \\\\}
\\\\}

Output Plugins

Elasticsearch Output

Basic Elasticsearch Output:

output \\\\{
  elasticsearch \\\\{
    hosts => ["localhost:9200"]
    index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}

Advanced Elasticsearch Output:

output \\\\{
  elasticsearch \\\\{
    hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
    index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
    template_name => "logstash"
    template_pattern => "logstash-*"
    template_overwrite => true
    ssl => true
    ssl_certificate_verification => true
    ssl_certificate => "/path/to/cert.pem"
    ssl_key => "/path/to/key.pem"
    user => "logstash_writer"
    password => "password"
    retry_on_conflict => 3
    action => "index"
  \\\\}
\\\\}

File Output

File Output:

output \\\\{
  file \\\\{
    path => "/var/log/logstash/output.log"
    codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\}
    flush_interval => 10
  \\\\}
\\\\}

Kafka Output

Kafka Output:

output \\\\{
  kafka \\\\{
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topic_id => "logstash-logs"
    codec => json
    compression_type => "gzip"
    batch_size => 100
    linger_ms => 10
  \\\\}
\\\\}

Conditional Outputs

Multiple Outputs:

output \\\\{
  if [type] == "error" \\\\{
    elasticsearch \\\\{
      hosts => ["localhost:9200"]
      index => "errors-%\\\\{+YYYY.MM.dd\\\\}"
    \\\\}
    email \\\\{
      to => "admin@company.com"
      subject => "Error Alert: %\\\\{message\\\\}"
      body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}"
    \\\\}
  \\\\} else \\\\{
    elasticsearch \\\\{
      hosts => ["localhost:9200"]
      index => "logs-%\\\\{+YYYY.MM.dd\\\\}"
    \\\\}
  \\\\}
\\\\}

Pipeline Management

Multiple Pipelines

pipelines.yml Configuration:

- pipeline.id: apache-logs
  path.config: "/etc/logstash/conf.d/apache.conf"
  pipeline.workers: 2
  pipeline.batch.size: 125

- pipeline.id: nginx-logs
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 1
  pipeline.batch.size: 50

- pipeline.id: application-logs
  path.config: "/etc/logstash/conf.d/app.conf"
  pipeline.workers: 4
  pipeline.batch.size: 200

Pipeline-to-Pipeline Communication

Sending Pipeline:

output \\\\{
  pipeline \\\\{
    send_to => ["processing-pipeline"]
  \\\\}
\\\\}

Receiving Pipeline:

input \\\\{
  pipeline \\\\{
    address => "processing-pipeline"
  \\\\}
\\\\}

Command Line Operations

Service Management

Service Control:

# Start Logstash
sudo systemctl start logstash

# Stop Logstash
sudo systemctl stop logstash

# Restart Logstash
sudo systemctl restart logstash

# Check status
sudo systemctl status logstash

# View logs
sudo journalctl -u logstash -f

Configuration Testing

Test Configuration:

# Test configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t

# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t

# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug

Manual Execution

Run Logstash Manually:

# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf

# Run with inline config
/usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}'

# Run with additional JVM options
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf

Monitoring and Debugging

API Monitoring

Node Stats:

# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"

# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"

# Get process stats
curl -X GET "localhost:9600/_node/stats/process?pretty"

Pipeline Management:

# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"

# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"

# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"

Log Analysis

Debug Logging:

# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties

# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties

Performance Monitoring:

# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats"

# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log

# Monitor memory usage
ps aux|grep logstash
jstat -gc $(pgrep -f logstash)

Performance Tuning

JVM Tuning

JVM Settings (jvm.options):

# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g

# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m

# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap

# Debugging options
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log

Pipeline Optimization

Performance Settings:

# Pipeline workers (number of CPU cores)
pipeline.workers: 8

# Batch processing
pipeline.batch.size: 1000
pipeline.batch.delay: 50

# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024

# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb

Filter Optimization

Efficient Filtering:

filter \\\\{
  # Use conditionals to avoid unnecessary processing
  if [type] == "apache" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
    \\\\}
  \\\\}

  # Use break_on_match for multiple patterns
  grok \\\\{
    match => \\\\{
      "message" => [
        "%\\\\{PATTERN1\\\\}",
        "%\\\\{PATTERN2\\\\}"
      ]
    \\\\}
    break_on_match => true
  \\\\}

  # Remove unnecessary fields early
  mutate \\\\{
    remove_field => [ "host", "agent", "@version" ]
  \\\\}
\\\\}

Security Configuration

SSL/TLS Setup

Input SSL Configuration:

input \\\\{
  beats \\\\{
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "force_peer"
  \\\\}
\\\\}

Output SSL Configuration:

output \\\\{
  elasticsearch \\\\{
    hosts => ["https://elasticsearch:9200"]
    ssl => true
    ssl_certificate_verification => true
    ssl_certificate => "/etc/logstash/certs/client.crt"
    ssl_key => "/etc/logstash/certs/client.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
  \\\\}
\\\\}

Authentication

Elasticsearch Authentication:

output \\\\{
  elasticsearch \\\\{
    hosts => ["localhost:9200"]
    user => "logstash_writer"
    password => "$\\\\{LOGSTASH_PASSWORD\\\\}"
    index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}

Troubleshooting

Common Issues

Pipeline Not Starting:

# Check configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash -t

# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/

# Check Java version
java -version

Performance Issues:

# Monitor resource usage
top -p $(pgrep -f logstash)
iostat -x 1

# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log

Memory Issues:

# Check heap usage
jstat -gc $(pgrep -f logstash)

# Monitor garbage collection
tail -f /var/log/logstash/gc.log

# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.options
echo "-Xms4g" >> /etc/logstash/jvm.options

Integration Examples

ELK Stack Integration

Complete ELK Pipeline:

input \\\\{
  beats \\\\{
    port => 5044
  \\\\}
\\\\}

filter \\\\{
  if [@metadata][beat] == "filebeat" \\\\{
    if [fields][log_type] == "apache" \\\\{
      grok \\\\{
        match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
      \\\\}
      date \\\\{
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      \\\\}
    \\\\}
  \\\\}
\\\\}

output \\\\{
  elasticsearch \\\\{
    hosts => ["elasticsearch:9200"]
    index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}

Kafka Integration

Kafka to Elasticsearch:

input \\\\{
  kafka \\\\{
    bootstrap_servers => "kafka:9092"
    topics => ["logs"]
    group_id => "logstash"
    consumer_threads => 4
    codec => json
  \\\\}
\\\\}

filter \\\\{
  date \\\\{
    match => [ "timestamp", "ISO8601" ]
  \\\\}
\\\\}

output \\\\{
  elasticsearch \\\\{
    hosts => ["elasticsearch:9200"]
    index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}

This comprehensive Logstash cheatsheet covers installation, configuration, pipeline management, and advanced features for effective log processing and data transformation.