Logstash

__FRONTMATTER_54_# Logstash Cheatsheet

Logstash es un potente oleoducto de procesamiento de datos que ingiere datos de múltiples fuentes, lo transforma y lo envía a su "estrella" favorito como Elasticsearch. Es parte del Elastic Stack y destaca a la hora de analizar, filtrar y enriquecer los datos de registro para el análisis y la visualización. ## Instalación y configuración ### Instalación de paquetes **Ubuntu/Debian:**
# Import Elasticsearch GPG key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch|sudo apt-key add -

# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list

# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash

# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
**CentOS/RHEL**
# Import GPG key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# Create repository file
cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

# Install Logstash
sudo yum install logstash

# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
### Docker Instalación **Docker Compose Setup:**
version: '3.8'
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xmx1g -Xms1g"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
      - ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
    ports:
      - "5044:5044"
      - "9600:9600"
    networks:
      - elastic
## Configuration Basics ### Estructura de configuración de tuberías **Basic Pipeline (logstash.conf):**
input \\\\{
  # Input plugins
\\\\}

filter \\\\{
  # Filter plugins
\\\\}

output \\\\{
  # Output plugins
\\\\}
**Configuración principal (logstash.yml):**
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb
## Input Plugins ### File Input **Basic File Input:**
input \\\\{
  file \\\\{
    path => "/var/log/apache2/access.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => "plain"
  \\\\}
\\\\}
** Entrada avanzada de archivos:**
input \\\\{
  file \\\\{
    path => ["/var/log/*.log", "/var/log/app/*.log"]
    exclude => "*.gz"
    start_position => "end"
    sincedb_path => "/var/lib/logstash/sincedb"
    discover_interval => 15
    stat_interval => 1
    codec => multiline \\\\{
      pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}"
      negate => true
      what => "previous"
    \\\\}
    add_field => \\\\{ "log_source" => "application" \\\\}
    tags => ["application", "production"]
  \\\\}
\\\\}
### Beats Input **Filebeat Input:**
input \\\\{
  beats \\\\{
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_verify_mode => "force_peer"
    ssl_peer_metadata => true
  \\\\}
\\\\}
### Syslog Input **Syslog UDP Entrada:**
input \\\\{
  syslog \\\\{
    port => 514
    type => "syslog"
    codec => cef
  \\\\}
\\\\}
**Syslog TCP Entrada:**
input \\\\{
  tcp \\\\{
    port => 514
    type => "syslog"
    codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\}
  \\\\}
\\\\}
### HTTP Input **HTTP Webhook:**
input \\\\{
  http \\\\{
    port => 8080
    codec => json
    additional_codecs => \\\\{
      "application/json" => "json"
      "text/plain" => "plain"
    \\\\}
    ssl => true
    ssl_certificate => "/path/to/cert.pem"
    ssl_key => "/path/to/key.pem"
  \\\\}
\\\\}
## Filtro Plugins ## Grok Filter **Patrones básicos de Grok:**
filter \\\\{
  grok \\\\{
    match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
  \\\\}
\\\\}
**Custom Grok Patterns:**
filter \\\\{
  grok \\\\{
    patterns_dir => ["/etc/logstash/patterns"]
    match => \\\\{
      "message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}"
    \\\\}
    add_field => \\\\{ "parsed" => "true" \\\\}
    tag_on_failure => ["_grokparsefailure"]
  \\\\}
\\\\}
**Multiple Grok Patterns:**
filter \\\\{
  grok \\\\{
    match => \\\\{
      "message" => [
        "%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORHOST:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}",
        "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}",
        "%\\\\{GREEDYDATA:message\\\\}"
      ]
    \\\\}
    break_on_match => true
  \\\\}
\\\\}
### Fecha Filtro **Parse Timestamps:**
filter \\\\{
  date \\\\{
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
    timezone => "UTC"
  \\\\}
\\\\}
**Multiple Date Formats:**
filter \\\\{
  date \\\\{
    match => [
      "timestamp",
      "yyyy-MM-dd HH:mm:ss",
      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
      "MMM dd HH:mm:ss"
    ]
    target => "@timestamp"
    locale => "en"
  \\\\}
\\\\}
### Mutate Filter Manipulación fina**
filter \\\\{
  mutate \\\\{
    # Add fields
    add_field => \\\\{
      "environment" => "production"
      "processed_by" => "logstash"
    \\\\}

    # Remove fields
    remove_field => [ "host", "agent" ]

    # Rename fields
    rename => \\\\{ "old_field" => "new_field" \\\\}

    # Convert field types
    convert => \\\\{
      "response_time" => "float"
      "status_code" => "integer"
    \\\\}

    # String operations
    lowercase => [ "method" ]
    uppercase => [ "level" ]
    strip => [ "message" ]
    gsub => [ "message", "/", "_" ]
  \\\\}
\\\\}
### JSON Filter #Parse JSON: #
filter \\\\{
  json \\\\{
    source => "message"
    target => "parsed_json"
    skip_on_invalid_json => true
  \\\\}
\\\\}
Filtro CSV **Parse CSV Datos**
filter \\\\{
  csv \\\\{
    separator => ","
    columns => [ "timestamp", "level", "component", "message" ]
    skip_header => true
    convert => \\\\{ "timestamp" => "date" \\\\}
  \\\\}
\\\\}
### Procesamiento condicional ** Filtros convencionales:**
filter \\\\{
  if [type] == "apache" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
    \\\\}
  \\\\} else if [type] == "nginx" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\}
    \\\\}
  \\\\}

  if [status] >= 400 \\\\{
    mutate \\\\{
      add_tag => [ "error" ]
    \\\\}
  \\\\}

  if "error" in [tags] \\\\{
    mutate \\\\{
      add_field => \\\\{ "alert_level" => "high" \\\\}
    \\\\}
  \\\\}
\\\\}
## Output Plugins ### Elasticsearch Output **Básica búsqueda elástica Producto**
output \\\\{
  elasticsearch \\\\{
    hosts => ["localhost:9200"]
    index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}
**Investigación elástica avanzada Producto**
output \\\\{
  elasticsearch \\\\{
    hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
    index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
    template_name => "logstash"
    template_pattern => "logstash-*"
    template_overwrite => true
    ssl => true
    ssl_certificate_verification => true
    ssl_certificate => "/path/to/cert.pem"
    ssl_key => "/path/to/key.pem"
    user => "logstash_writer"
    password => "password"
    retry_on_conflict => 3
    action => "index"
  \\\\}
\\\\}
### File Output **File Output:**
output \\\\{
  file \\\\{
    path => "/var/log/logstash/output.log"
    codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\}
    flush_interval => 10
  \\\\}
\\\\}
### Kafka Producto *Kafka Output*
output \\\\{
  kafka \\\\{
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topic_id => "logstash-logs"
    codec => json
    compression_type => "gzip"
    batch_size => 100
    linger_ms => 10
  \\\\}
\\\\}
### Salidas condicionales **Multiple Outputs:**
output \\\\{
  if [type] == "error" \\\\{
    elasticsearch \\\\{
      hosts => ["localhost:9200"]
      index => "errors-%\\\\{+YYYY.MM.dd\\\\}"
    \\\\}
    email \\\\{
      to => "admin@company.com"
      subject => "Error Alert: %\\\\{message\\\\}"
      body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}"
    \\\\}
  \\\\} else \\\\{
    elasticsearch \\\\{
      hosts => ["localhost:9200"]
      index => "logs-%\\\\{+YYYY.MM.dd\\\\}"
    \\\\}
  \\\\}
\\\\}
## Pipeline Management ## Multiple Pipelines **pipelines.yml Configuración:**
- pipeline.id: apache-logs
  path.config: "/etc/logstash/conf.d/apache.conf"
  pipeline.workers: 2
  pipeline.batch.size: 125

- pipeline.id: nginx-logs
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 1
  pipeline.batch.size: 50

- pipeline.id: application-logs
  path.config: "/etc/logstash/conf.d/app.conf"
  pipeline.workers: 4
  pipeline.batch.size: 200
### Pipeline-to-Pipeline Communication **Enviando la tubería:**
output \\\\{
  pipeline \\\\{
    send_to => ["processing-pipeline"]
  \\\\}
\\\\}
**Recibir Pipeline:**
input \\\\{
  pipeline \\\\{
    address => "processing-pipeline"
  \\\\}
\\\\}
## Command Line Operations ### Service Management ** Control de servicios:**
# Start Logstash
sudo systemctl start logstash

# Stop Logstash
sudo systemctl stop logstash

# Restart Logstash
sudo systemctl restart logstash

# Check status
sudo systemctl status logstash

# View logs
sudo journalctl -u logstash -f
#### Configuration Testing **Configuración del usuario:**
# Test configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t

# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t

# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug
### Manual Execution **Run Logstash Manualmente: #
# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf

# Run with inline config
/usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}'

# Run with additional JVM options
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf
## Monitoring and Debugging ### API Monitoring **Nodos Stats:**
# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"

# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"

# Get process stats
curl -X GET "localhost:9600/_node/stats/process?pretty"
**Manejo de tuberías: #
# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"

# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"

# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"
### Log Analysis **Debug Logging: #
# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties

# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties
** Supervisión de la ejecución: #
# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats"

# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log

# Monitor memory usage
ps aux|grep logstash
jstat -gc $(pgrep -f logstash)
## Performance Tuning ### JVM Tuning **JVM Settings (jvm.options):**
# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g

# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m

# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap

# Debugging options
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
Optimización de tuberías ** Ajustes de rendimiento:**
# Pipeline workers (number of CPU cores)
pipeline.workers: 8

# Batch processing
pipeline.batch.size: 1000
pipeline.batch.delay: 50

# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024

# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
Optimización de filtros ** Filtro eficiente:**
filter \\\\{
  # Use conditionals to avoid unnecessary processing
  if [type] == "apache" \\\\{
    grok \\\\{
      match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
    \\\\}
  \\\\}

  # Use break_on_match for multiple patterns
  grok \\\\{
    match => \\\\{
      "message" => [
        "%\\\\{PATTERN1\\\\}",
        "%\\\\{PATTERN2\\\\}"
      ]
    \\\\}
    break_on_match => true
  \\\\}

  # Remove unnecessary fields early
  mutate \\\\{
    remove_field => [ "host", "agent", "@version" ]
  \\\\}
\\\\}
Configuración de seguridad ## SSL/TLS Setup ** Entrada SSL Configuración:**
input \\\\{
  beats \\\\{
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "force_peer"
  \\\\}
\\\\}
**Output SSL Configuración:**
output \\\\{
  elasticsearch \\\\{
    hosts => ["https://elasticsearch:9200"]
    ssl => true
    ssl_certificate_verification => true
    ssl_certificate => "/etc/logstash/certs/client.crt"
    ssl_key => "/etc/logstash/certs/client.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
  \\\\}
\\\\}
### Authentication **Investigación Elástica Autenticación:**
output \\\\{
  elasticsearch \\\\{
    hosts => ["localhost:9200"]
    user => "logstash_writer"
    password => "$\\\\{LOGSTASH_PASSWORD\\\\}"
    index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}
## Troubleshooting #### Common Issues **Pipeline No empieza:**
# Check configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash -t

# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/

# Check Java version
java -version
** Cuestiones de desempeño**
# Monitor resource usage
top -p $(pgrep -f logstash)
iostat -x 1

# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log
```_

** Cuestiones de memoria:**
```bash
# Check heap usage
jstat -gc $(pgrep -f logstash)

# Monitor garbage collection
tail -f /var/log/logstash/gc.log

# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.options
echo "-Xms4g" >> /etc/logstash/jvm.options
## Integración Ejemplos ### ELK Stack Integration **Completo ELK Pipeline:**
input \\\\{
  beats \\\\{
    port => 5044
  \\\\}
\\\\}

filter \\\\{
  if [@metadata][beat] == "filebeat" \\\\{
    if [fields][log_type] == "apache" \\\\{
      grok \\\\{
        match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
      \\\\}
      date \\\\{
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      \\\\}
    \\\\}
  \\\\}
\\\\}

output \\\\{
  elasticsearch \\\\{
    hosts => ["elasticsearch:9200"]
    index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}
### Kafka Integration **Kafka a Elasticsearch:**
input \\\\{
  kafka \\\\{
    bootstrap_servers => "kafka:9092"
    topics => ["logs"]
    group_id => "logstash"
    consumer_threads => 4
    codec => json
  \\\\}
\\\\}

filter \\\\{
  date \\\\{
    match => [ "timestamp", "ISO8601" ]
  \\\\}
\\\\}

output \\\\{
  elasticsearch \\\\{
    hosts => ["elasticsearch:9200"]
    index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}"
  \\\\}
\\\\}

Esta hoja de trampa completa de Logstash cubre la instalación, configuración, gestión de oleoductos y funciones avanzadas para el procesamiento eficaz de registros y la transformación de datos.