Saltar a contenido

Logstash Cheatsheet

Logstash is a powerful data procesoing pipeline that ingests data from multiple sources, transforms it, and sends it to your favorite "stash" like Elasticsearch. It's part of the Elastic Stack and excels at parsing, filtering, and enriching log data for analysis and visualization. ## instalación and Setup ### Package instalación **Ubuntu/Debian:**
# Impuerto Elasticsearch GPG clave
wget -qO - https://artifacts.elastic.co/GPG-clave-elasticsearch|sudo apt-clave add -

# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list

# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash

# Enable and start servicio
sudo systemctl enable logstash
sudo systemctl start logstash
**CentOS/RHEL:**
# Impuerto GPG clave
sudo rpm --impuerto https://artifacts.elastic.co/GPG-clave-elasticsearch

# Create repository file
cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgclave=https://artifacts.elastic.co/GPG-clave-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

# Install Logstash
sudo yum install logstash

# Enable and start servicio
sudo systemctl enable logstash
sudo systemctl start logstash
### Docker instalación **Docker Compose Setup:**
version: '3.8'
servicios:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xmx1g -Xms1g"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
      - ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
    puertos:
      - "5044:5044"
      - "9600:9600"
    networks:
      - elastic
## configuración Basics ### Pipeline configuración Structure **Basic Pipeline (logstash.conf):**
input \\\\{
  # Input plugins
\\\\}

filter \\\\{
  # Filter plugins
\\\\}

output \\\\{
  # Output plugins
\\\\}
**Main configuración (logstash.yml):**
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb
## Input Plugins ### File Input **Basic File Input:**
input \\\\{
  file \\\\{
    path => "/var/log/apache2/access.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => "plain"
  \\\\}
\\\\}
**Advanced File Input:**
input \\\\{
  file \\\\{
    path => ["/var/log/*.log", "/var/log/app/*.log"]
    exclude => "*.gz"
    start_position => "end"
    sincedb_path => "/var/lib/logstash/sincedb"
    discover_interval => 15
    stat_interval => 1
    codec => multiline \\\\{
      pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}"
      negate => true
      what => "previous"
    \\\\}
    add_field => \\\\{ "log_source" => "application" \\\\}
    tags => ["application", "production"]
  \\\\}
\\\\}
### Beats Input **Filebeat Input:**
input \\\\{
  beats \\\\{
    puerto => 5044
    ssl => true
    ssl_certificado => "/etc/logstash/certs/logstash.crt"
    ssl_clave => "/etc/logstash/certs/logstash.clave"
    ssl_verify_mode => "force_peer"
    ssl_peer_metadata => true
  \\\\}
\\\\}
### Syslog Input **Syslog UDP Input:**
input \\\\{
  syslog \\\\{
    puerto => 514
    type => "syslog"
    codec => cef
  \\\\}
\\\\}
**Syslog TCP Input:**
input \\\\{
  tcp \\\\{
    puerto => 514
    type => "syslog"
    codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\}
  \\\\}
\\\\}
### HTTP Input **HTTP Webhook:**
input \\\\{
  http \\\\{
    puerto => 8080
    codec => json
    additional_codecs => \\\\{
      "application/json" => "json"
      "text/plain" => "plain"
    \\\\}
    ssl => true
    ssl_certificado => "/path/to/cert.pem"
    ssl_clave => "/path/to/clave.pem"
  \\\\}
\\\\}
## Filter Plugins ### Grok Filter **Basic Grok Patterns:**
filter \\\\{
  grok \\\\{
    match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
  \\\\}
\\\\}
**Custom Grok Patterns:**
filter \\\\{
  grok \\\\{
    patterns_dir => ["/etc/logstash/patterns"]
    match => \\\\{
      "message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}"
    \\\\}
    add_field => \\\\{ "parsed" => "true" \\\\}
    tag_on_failure => ["_grokparsefailure"]
  \\\\}
\\\\}
**Multiple Grok Patterns:**
filter \\\\{
  grok \\\\{
    match => \\\\{
      "message" => [
        "%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORhost:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}",
        "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}",
        "%\\\\{GREEDYDATA:message\\\\}"
      ]
    \\\\}
    break_on_match => true
  \\\\}
\\\\}
### Date Filter **Parse Timestamps:**
filter \\\\{
  date \\\\{
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    objetivo => "@timestamp"
    timezone => "UTC"
  \\\\}
\\\\}
**Multiple Date Formats:**
filter \\\\{
  date \\\\{
    match => [
      "timestamp",
      "yyyy-MM-dd HH:mm:ss",
      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
      "MMM dd HH:mm:ss"
    ]
    objetivo => "@timestamp"
    locale => "en"
  \\\\}
\\\\}
### Mutate Filter **Field Manipulation:**
filter \\\\{
  mutate \\\\{
    # Add fields
    add_field => \\\\{
      "environment" => "production"
      "procesoed_by" => "logstash"
    \\\\}

    # Remove fields
    remove_field => [ "host", "agent" ]

    # Rename fields
    rename => \\\\{ "old_field" => "new_field" \\\\}

    # Convert field types
    convert => \\\\{
      "response_time" => "float"
      "status_code" => "integer"
    \\\\}

    # String operations
    lowercase => [ "method" ]
    uppercase => [ "level" ]
    strip => [ "message" ]
    gsub => [ "message", "/", "_" ]
  \\\\}
\\\\}
### JSON Filter **Parse JSON:**
filter \\\\{
  json \\\\{
    source => "message"
    objetivo => "parsed_json"
    skip_on_invalid_json => true
  \\\\}
\\\\}
### CSV Filter **Parse CSV Data:**
filter \\\\{
  csv \\\\{
    separator => ","
    columns => [ "timestamp", "level", "component", "message" ]
    skip_header => true
    convert => \\\\{ "timestamp" => "date" \\\\}
  \\\\}
\\\\}
### Conditional procesoing **Conditional Filters:**
filter \\\\{
  if [type] == "apache" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
    \\\\}
  \\\\} else if [type] == "nginx" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\}
    \\\\}
  \\\\}

  if [status] >= 400 \\\\{
    mutate \\\\{
      add_tag => [ "error" ]
    \\\\}
  \\\\}

  if "error" in [tags] \\\\{
    mutate \\\\{
      add_field => \\\\{ "alert_level" => "high" \\\\}
    \\\\}
  \\\\}
\\\\}
## Output Plugins ### Elasticsearch Output **Basic Elasticsearch Output:**
output \\\\{
  elasticsearch \\\\{
    hosts => ["localhost:9200"]
    index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}
**Advanced Elasticsearch Output:**
output \\\\{
  elasticsearch \\\\{
    hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
    index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
    template_name => "logstash"
    template_pattern => "logstash-*"
    template_overwrite => true
    ssl => true
    ssl_certificado_verification => true
    ssl_certificado => "/path/to/cert.pem"
    ssl_clave => "/path/to/clave.pem"
    user => "logstash_writer"
    contraseña => "contraseña"
    retry_on_conflict => 3
    action => "index"
  \\\\}
\\\\}
### File Output **File Output:**
output \\\\{
  file \\\\{
    path => "/var/log/logstash/output.log"
    codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\}
    flush_interval => 10
  \\\\}
\\\\}
### Kafka Output **Kafka Output:**
output \\\\{
  kafka \\\\{
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topic_id => "logstash-logs"
    codec => json
    compression_type => "gzip"
    batch_size => 100
    linger_ms => 10
  \\\\}
\\\\}
### Conditional Outputs **Multiple Outputs:**
output \\\\{
  if [type] == "error" \\\\{
    elasticsearch \\\\{
      hosts => ["localhost:9200"]
      index => "errors-%\\\\{+YYYY.MM.dd\\\\}"
    \\\\}
    email \\\\{
      to => "admin@company.com"
      subject => "Error Alert: %\\\\{message\\\\}"
      body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}"
    \\\\}
  \\\\} else \\\\{
    elasticsearch \\\\{
      hosts => ["localhost:9200"]
      index => "logs-%\\\\{+YYYY.MM.dd\\\\}"
    \\\\}
  \\\\}
\\\\}
## Pipeline Management ### Multiple Pipelines **pipelines.yml configuración:**
- pipeline.id: apache-logs
  path.config: "/etc/logstash/conf.d/apache.conf"
  pipeline.workers: 2
  pipeline.batch.size: 125

- pipeline.id: nginx-logs
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 1
  pipeline.batch.size: 50

- pipeline.id: application-logs
  path.config: "/etc/logstash/conf.d/app.conf"
  pipeline.workers: 4
  pipeline.batch.size: 200
### Pipeline-to-Pipeline Communication **Sending Pipeline:**
output \\\\{
  pipeline \\\\{
    send_to => ["procesoing-pipeline"]
  \\\\}
\\\\}
**Receiving Pipeline:**
input \\\\{
  pipeline \\\\{
    address => "procesoing-pipeline"
  \\\\}
\\\\}
## comando Line Operations ### servicio Management **servicio Control:**
# Start Logstash
sudo systemctl start logstash

# Stop Logstash
sudo systemctl stop logstash

# Restart Logstash
sudo systemctl restart logstash

# Check status
sudo systemctl status logstash

# View logs
sudo journalctl -u logstash -f
### configuración Testing **Test configuración:**
# Test configuración sintaxis
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t

# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t

# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug
### Manual Execution **Run Logstash Manually:**
# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf

# Run with inline config
/usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}'

# Run with additional JVM opcións
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf
## Monitoring and Debugging ### API Monitoring **Node Stats:**
# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"

# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"

# Get proceso stats
curl -X GET "localhost:9600/_node/stats/proceso?pretty"
**Pipeline Management:**
# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"

# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"

# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"
### Análisis de Logs **Debug Logging:**
# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties

# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties
**Performance Monitoring:**
# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats"

# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log

# Monitor memory uso
ps aux|grep logstash
jstat -gc $(pgrep -f logstash)
## Performance Tuning ### JVM Tuning **JVM Settings (jvm.opcións):**
# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g

# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m

# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMopcións
-XX:+UseCGroupMemoryLimitForHeap

# Debugging opcións
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
### Pipeline Optimization **Performance Settings:**
# Pipeline workers (number of CPU cores)
pipeline.workers: 8

# Batch procesoing
pipeline.batch.size: 1000
pipeline.batch.delay: 50

# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024

# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
### Filter Optimization **Efficient Filtering:**
filter \\\\{
  # Use conditionals to avoid unnecessary procesoing
  if [type] == "apache" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
    \\\\}
  \\\\}

  # Use break_on_match for multiple patterns
  grok \\\\{
    match => \\\\{
      "message" => [
        "%\\\\{PATTERN1\\\\}",
        "%\\\\{PATTERN2\\\\}"
      ]
    \\\\}
    break_on_match => true
  \\\\}

  # Remove unnecessary fields early
  mutate \\\\{
    remove_field => [ "host", "agent", "@version" ]
  \\\\}
\\\\}
## Security configuración ### SSL/TLS Setup **Input SSL configuración:**
input \\\\{
  beats \\\\{
    puerto => 5044
    ssl => true
    ssl_certificado => "/etc/logstash/certs/logstash.crt"
    ssl_clave => "/etc/logstash/certs/logstash.clave"
    ssl_certificado_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "force_peer"
  \\\\}
\\\\}
**Output SSL configuración:**
output \\\\{
  elasticsearch \\\\{
    hosts => ["https://elasticsearch:9200"]
    ssl => true
    ssl_certificado_verification => true
    ssl_certificado => "/etc/logstash/certs/client.crt"
    ssl_clave => "/etc/logstash/certs/client.clave"
    ssl_certificado_authorities => ["/etc/logstash/certs/ca.crt"]
  \\\\}
\\\\}
### autenticación **Elasticsearch autenticación:**
output \\\\{
  elasticsearch \\\\{
    hosts => ["localhost:9200"]
    user => "logstash_writer"
    contraseña => "$\\\\{LOGSTASH_contraseña\\\\}"
    index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}
## solución de problemas ### Common Issues **Pipeline Not Starting:**
# Check configuración sintaxis
sudo -u logstash /usr/share/logstash/bin/logstash -t

# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/

# Check Java version
java -version
**Performance Issues:**
# Monitor resource uso
top -p $(pgrep -f logstash)
iostat -x 1

# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log
**Memory Issues:**
# Check heap uso
jstat -gc $(pgrep -f logstash)

# Monitor garbage collection
tail -f /var/log/logstash/gc.log

# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.opcións
echo "-Xms4g" >> /etc/logstash/jvm.opcións
## Integration ejemplos ### ELK Stack Integration **Complete ELK Pipeline:**
input \\\\{
  beats \\\\{
    puerto => 5044
  \\\\}
\\\\}

filter \\\\{
  if [@metadata][beat] == "filebeat" \\\\{
    if [fields][log_type] == "apache" \\\\{
      grok \\\\{
        match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
      \\\\}
      date \\\\{
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      \\\\}
    \\\\}
  \\\\}
\\\\}

output \\\\{
  elasticsearch \\\\{
    hosts => ["elasticsearch:9200"]
    index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}
### Kafka Integration **Kafka to Elasticsearch:**
input \\\\{
  kafka \\\\{
    bootstrap_servers => "kafka:9092"
    topics => ["logs"]
    group_id => "logstash"
    consumer_hilos => 4
    codec => json
  \\\\}
\\\\}

filter \\\\{
  date \\\\{
    match => [ "timestamp", "ISO8601" ]
  \\\\}
\\\\}

output \\\\{
  elasticsearch \\\\{
    hosts => ["elasticsearch:9200"]
    index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}

This comprehensive Logstash cheatsheet covers instalación, configuración, pipeline management, and advanced features for effective log procesoing and data transformation.