Logstash
__FRONTMATTER_54_# Logstash Cheatsheet
Logstash es un potente oleoducto de procesamiento de datos que ingiere datos de múltiples fuentes, lo transforma y lo envía a su "estrella" favorito como Elasticsearch. Es parte del Elastic Stack y destaca a la hora de analizar, filtrar y enriquecer los datos de registro para el análisis y la visualización.
## Instalación y configuración
### Instalación de paquetes
**Ubuntu/Debian:**
**CentOS/RHEL**
### Docker Instalación
**Docker Compose Setup:**
## Configuration Basics
### Estructura de configuración de tuberías
**Basic Pipeline (logstash.conf):**
**Configuración principal (logstash.yml):**
## Input Plugins
### File Input
**Basic File Input:**
** Entrada avanzada de archivos:**
### Beats Input
**Filebeat Input:**
### Syslog Input
**Syslog UDP Entrada:**
**Syslog TCP Entrada:**
### HTTP Input
**HTTP Webhook:**
## Filtro Plugins
## Grok Filter
**Patrones básicos de Grok:**
**Custom Grok Patterns:**
**Multiple Grok Patterns:**
### Fecha Filtro
**Parse Timestamps:**
**Multiple Date Formats:**
### Mutate Filter
Manipulación fina**
### JSON Filter
#Parse JSON: #
Filtro CSV
**Parse CSV Datos**
### Procesamiento condicional
** Filtros convencionales:**
## Output Plugins
### Elasticsearch Output
**Básica búsqueda elástica Producto**
**Investigación elástica avanzada Producto**
### File Output
**File Output:**
### Kafka Producto
*Kafka Output*
### Salidas condicionales
**Multiple Outputs:**
## Pipeline Management
## Multiple Pipelines
**pipelines.yml Configuración:**
### Pipeline-to-Pipeline Communication
**Enviando la tubería:**
**Recibir Pipeline:**
## Command Line Operations
### Service Management
** Control de servicios:**
#### Configuration Testing
**Configuración del usuario:**
### Manual Execution
**Run Logstash Manualmente: #
## Monitoring and Debugging
### API Monitoring
**Nodos Stats:**
**Manejo de tuberías: #
### Log Analysis
**Debug Logging: #
** Supervisión de la ejecución: #
## Performance Tuning
### JVM Tuning
**JVM Settings (jvm.options):**
Optimización de tuberías
** Ajustes de rendimiento:**
Optimización de filtros
** Filtro eficiente:**
Configuración de seguridad
## SSL/TLS Setup
** Entrada SSL Configuración:**
**Output SSL Configuración:**
### Authentication
**Investigación Elástica Autenticación:**
## Troubleshooting
#### Common Issues
**Pipeline No empieza:**
** Cuestiones de desempeño**
## Integración Ejemplos
### ELK Stack Integration
**Completo ELK Pipeline:**
### Kafka Integration
**Kafka a Elasticsearch:**
# Import Elasticsearch GPG key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch|sudo apt-key add -
# Add Elastic repository
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list
# Update and install Logstash
sudo apt-get update
sudo apt-get install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
# Import GPG key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# Create repository file
cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo
[elastic-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# Install Logstash
sudo yum install logstash
# Enable and start service
sudo systemctl enable logstash
sudo systemctl start logstash
version: '3.8'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xmx1g -Xms1g"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro
ports:
- "5044:5044"
- "9600:9600"
networks:
- elastic
input \\\\{
# Input plugins
\\\\}
filter \\\\{
# Filter plugins
\\\\}
output \\\\{
# Output plugins
\\\\}
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1gb
input \\\\{
file \\\\{
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "plain"
\\\\}
\\\\}
input \\\\{
file \\\\{
path => ["/var/log/*.log", "/var/log/app/*.log"]
exclude => "*.gz"
start_position => "end"
sincedb_path => "/var/lib/logstash/sincedb"
discover_interval => 15
stat_interval => 1
codec => multiline \\\\{
pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}"
negate => true
what => "previous"
\\\\}
add_field => \\\\{ "log_source" => "application" \\\\}
tags => ["application", "production"]
\\\\}
\\\\}
input \\\\{
beats \\\\{
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_verify_mode => "force_peer"
ssl_peer_metadata => true
\\\\}
\\\\}
input \\\\{
tcp \\\\{
port => 514
type => "syslog"
codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\}
\\\\}
\\\\}
input \\\\{
http \\\\{
port => 8080
codec => json
additional_codecs => \\\\{
"application/json" => "json"
"text/plain" => "plain"
\\\\}
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
\\\\}
\\\\}
filter \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\}
filter \\\\{
grok \\\\{
patterns_dir => ["/etc/logstash/patterns"]
match => \\\\{
"message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}"
\\\\}
add_field => \\\\{ "parsed" => "true" \\\\}
tag_on_failure => ["_grokparsefailure"]
\\\\}
\\\\}
filter \\\\{
grok \\\\{
match => \\\\{
"message" => [
"%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORHOST:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}",
"%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}",
"%\\\\{GREEDYDATA:message\\\\}"
]
\\\\}
break_on_match => true
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
timezone => "UTC"
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [
"timestamp",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"MMM dd HH:mm:ss"
]
target => "@timestamp"
locale => "en"
\\\\}
\\\\}
filter \\\\{
mutate \\\\{
# Add fields
add_field => \\\\{
"environment" => "production"
"processed_by" => "logstash"
\\\\}
# Remove fields
remove_field => [ "host", "agent" ]
# Rename fields
rename => \\\\{ "old_field" => "new_field" \\\\}
# Convert field types
convert => \\\\{
"response_time" => "float"
"status_code" => "integer"
\\\\}
# String operations
lowercase => [ "method" ]
uppercase => [ "level" ]
strip => [ "message" ]
gsub => [ "message", "/", "_" ]
\\\\}
\\\\}
filter \\\\{
json \\\\{
source => "message"
target => "parsed_json"
skip_on_invalid_json => true
\\\\}
\\\\}
filter \\\\{
csv \\\\{
separator => ","
columns => [ "timestamp", "level", "component", "message" ]
skip_header => true
convert => \\\\{ "timestamp" => "date" \\\\}
\\\\}
\\\\}
filter \\\\{
if [type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\} else if [type] == "nginx" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\}
\\\\}
\\\\}
if [status] >= 400 \\\\{
mutate \\\\{
add_tag => [ "error" ]
\\\\}
\\\\}
if "error" in [tags] \\\\{
mutate \\\\{
add_field => \\\\{ "alert_level" => "high" \\\\}
\\\\}
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"]
index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
template_name => "logstash"
template_pattern => "logstash-*"
template_overwrite => true
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
user => "logstash_writer"
password => "password"
retry_on_conflict => 3
action => "index"
\\\\}
\\\\}
output \\\\{
file \\\\{
path => "/var/log/logstash/output.log"
codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\}
flush_interval => 10
\\\\}
\\\\}
output \\\\{
kafka \\\\{
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "logstash-logs"
codec => json
compression_type => "gzip"
batch_size => 100
linger_ms => 10
\\\\}
\\\\}
output \\\\{
if [type] == "error" \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "errors-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
email \\\\{
to => "admin@company.com"
subject => "Error Alert: %\\\\{message\\\\}"
body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}"
\\\\}
\\\\} else \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
index => "logs-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
\\\\}
- pipeline.id: apache-logs
path.config: "/etc/logstash/conf.d/apache.conf"
pipeline.workers: 2
pipeline.batch.size: 125
- pipeline.id: nginx-logs
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 1
pipeline.batch.size: 50
- pipeline.id: application-logs
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 4
pipeline.batch.size: 200
# Start Logstash
sudo systemctl start logstash
# Stop Logstash
sudo systemctl stop logstash
# Restart Logstash
sudo systemctl restart logstash
# Check status
sudo systemctl status logstash
# View logs
sudo journalctl -u logstash -f
# Test configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
# Test with specific config file
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t
# Run with debug output
sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug
# Run with specific config
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
# Run with inline config
/usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}'
# Run with additional JVM options
LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf
# Get node information
curl -X GET "localhost:9600/_node/stats?pretty"
# Get pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Get JVM stats
curl -X GET "localhost:9600/_node/stats/jvm?pretty"
# Get process stats
curl -X GET "localhost:9600/_node/stats/process?pretty"
# List pipelines
curl -X GET "localhost:9600/_node/pipelines?pretty"
# Get specific pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty"
# Hot reload pipeline
curl -X POST "localhost:9600/_node/pipelines/main/reload"
# Enable debug logging
echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties
# Monitor specific logger
echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties
echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties
# Monitor pipeline performance
tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats"
# Check for slow filters
grep "slowlog" /var/log/logstash/logstash-slow.log
# Monitor memory usage
ps aux|grep logstash
jstat -gc $(pgrep -f logstash)
# Heap size (adjust based on available memory)
-Xms2g
-Xmx2g
# Garbage collection
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
# Memory settings
-XX:+UseLargePages
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap
# Debugging options
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
# Pipeline workers (number of CPU cores)
pipeline.workers: 8
# Batch processing
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
filter \\\\{
# Use conditionals to avoid unnecessary processing
if [type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
\\\\}
# Use break_on_match for multiple patterns
grok \\\\{
match => \\\\{
"message" => [
"%\\\\{PATTERN1\\\\}",
"%\\\\{PATTERN2\\\\}"
]
\\\\}
break_on_match => true
\\\\}
# Remove unnecessary fields early
mutate \\\\{
remove_field => [ "host", "agent", "@version" ]
\\\\}
\\\\}
input \\\\{
beats \\\\{
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "force_peer"
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["https://elasticsearch:9200"]
ssl => true
ssl_certificate_verification => true
ssl_certificate => "/etc/logstash/certs/client.crt"
ssl_key => "/etc/logstash/certs/client.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["localhost:9200"]
user => "logstash_writer"
password => "$\\\\{LOGSTASH_PASSWORD\\\\}"
index => "logstash-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
# Check configuration syntax
sudo -u logstash /usr/share/logstash/bin/logstash -t
# Check file permissions
ls -la /etc/logstash/conf.d/
sudo chown -R logstash:logstash /etc/logstash/
# Check Java version
java -version
# Monitor resource usage
top -p $(pgrep -f logstash)
iostat -x 1
# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"
# Analyze slow logs
grep "WARN" /var/log/logstash/logstash-plain.log
```_
** Cuestiones de memoria:**
```bash
# Check heap usage
jstat -gc $(pgrep -f logstash)
# Monitor garbage collection
tail -f /var/log/logstash/gc.log
# Adjust heap size
echo "-Xmx4g" >> /etc/logstash/jvm.options
echo "-Xms4g" >> /etc/logstash/jvm.options
input \\\\{
beats \\\\{
port => 5044
\\\\}
\\\\}
filter \\\\{
if [@metadata][beat] == "filebeat" \\\\{
if [fields][log_type] == "apache" \\\\{
grok \\\\{
match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\}
\\\\}
date \\\\{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
\\\\}
\\\\}
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["elasticsearch:9200"]
index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
input \\\\{
kafka \\\\{
bootstrap_servers => "kafka:9092"
topics => ["logs"]
group_id => "logstash"
consumer_threads => 4
codec => json
\\\\}
\\\\}
filter \\\\{
date \\\\{
match => [ "timestamp", "ISO8601" ]
\\\\}
\\\\}
output \\\\{
elasticsearch \\\\{
hosts => ["elasticsearch:9200"]
index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}"
\\\\}
\\\\}
Esta hoja de trampa completa de Logstash cubre la instalación, configuración, gestión de oleoductos y funciones avanzadas para el procesamiento eficaz de registros y la transformación de datos.