Zum Inhalt

Logisch Cheatsheet

kopieren generieren
Logstash ist eine leistungsstarke Datenverarbeitungs-Pipeline, die Daten aus mehreren Quellen einnimmt, transformiert sie und sendet sie zu Ihrem Lieblings "stash" wie Elasticsearch. Es ist Teil des Elastischen Stacks und zeichnet sich durch Parsing, Filtern und Anreichern von Logdaten für Analyse und Visualisierung aus. ## Installation und Inbetriebnahme ### Installation von Paketen **Ubuntu/Debian:** ```bash # Import Elasticsearch GPG key wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch|sudo apt-key add - # Add Elastic repository echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main"|sudo tee /etc/apt/sources.list.d/elastic-8.x.list # Update and install Logstash sudo apt-get update sudo apt-get install logstash # Enable and start service sudo systemctl enable logstash sudo systemctl start logstash ```_ **CentOS/RHEL:** ```bash # Import GPG key sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch # Create repository file cat << EOF|sudo tee /etc/yum.repos.d/elastic.repo [elastic-8.x] name=Elastic repository for 8.x packages baseurl=https://artifacts.elastic.co/packages/8.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md EOF # Install Logstash sudo yum install logstash # Enable and start service sudo systemctl enable logstash sudo systemctl start logstash ```_ ### Docker Installation **Docker Compose Setup:** ```yaml version: '3.8' services: logstash: image: docker.elastic.co/logstash/logstash:8.11.0 container_name: logstash environment: - "LS_JAVA_OPTS=-Xmx1g -Xms1g" volumes: - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro - ./logstash.yml:/usr/share/logstash/config/logstash.yml:ro ports: - "5044:5044" - "9600:9600" networks: - elastic ```_ ## Konfigurationsgrundsätze ### Pipeline Konfiguration Struktur **Basic Pipeline (logstash.conf):** ```ruby input \\\\{ # Input plugins \\\\} filter \\\\{ # Filter plugins \\\\} output \\\\{ # Output plugins \\\\} ```_ ** Hauptkonfiguration (logstash.yml):** ```yaml node.name: logstash-node-1 path.data: /var/lib/logstash path.config: /etc/logstash/conf.d/*.conf path.logs: /var/log/logstash pipeline.workers: 4 pipeline.batch.size: 125 pipeline.batch.delay: 50 queue.type: memory queue.max_bytes: 1gb ```_ ## Input Plugins ### Datei Input **Basic Dateieingang:** ```ruby input \\\\{ file \\\\{ path => "/var/log/apache2/access.log" start_position => "beginning" sincedb_path => "/dev/null" codec => "plain" \\\\} \\\\} ```_ **Erweiterte Dateieingabe:** ```ruby input \\\\{ file \\\\{ path => ["/var/log/*.log", "/var/log/app/*.log"] exclude => "*.gz" start_position => "end" sincedb_path => "/var/lib/logstash/sincedb" discover_interval => 15 stat_interval => 1 codec => multiline \\\\{ pattern => "^%\\\\{TIMESTAMP_ISO8601\\\\}" negate => true what => "previous" \\\\} add_field => \\\\{ "log_source" => "application" \\\\} tags => ["application", "production"] \\\\} \\\\} ```_ ### Beats Input **Filebeat Eingang:** ```ruby input \\\\{ beats \\\\{ port => 5044 ssl => true ssl_certificate => "/etc/logstash/certs/logstash.crt" ssl_key => "/etc/logstash/certs/logstash.key" ssl_verify_mode => "force_peer" ssl_peer_metadata => true \\\\} \\\\} ```_ ### Syslog Input **Syslog UDP Eingabe:** ```ruby input \\\\{ syslog \\\\{ port => 514 type => "syslog" codec => cef \\\\} \\\\} ```_ **Syslog TCP Eingabe:** ```ruby input \\\\{ tcp \\\\{ port => 514 type => "syslog" codec => line \\\\{ format => "%\\\\{message\\\\}" \\\\} \\\\} \\\\} ```_ ### HTTP Input **HTTP Webhook:** ```ruby input \\\\{ http \\\\{ port => 8080 codec => json additional_codecs => \\\\{ "application/json" => "json" "text/plain" => "plain" \\\\} ssl => true ssl_certificate => "/path/to/cert.pem" ssl_key => "/path/to/key.pem" \\\\} \\\\} ```_ ## Filter Plugins ### Grobfilter **Basic Grok Patterns:** ```ruby filter \\\\{ grok \\\\{ match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\} \\\\} \\\\} ```_ **Custom Grok Patterns:** ```ruby filter \\\\{ grok \\\\{ patterns_dir => ["/etc/logstash/patterns"] match => \\\\{ "message" => "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} %\\\\{LOGLEVEL:level\\\\} %\\\\{GREEDYDATA:message\\\\}" \\\\} add_field => \\\\{ "parsed" => "true" \\\\} tag_on_failure => ["_grokparsefailure"] \\\\} \\\\} ```_ ** Multiple Grok Muster:** ```ruby filter \\\\{ grok \\\\{ match => \\\\{ "message" => [ "%\\\\{SYSLOGTIMESTAMP:timestamp\\\\} %\\\\{IPORHOST:server\\\\} %\\\\{PROG:program\\\\}: %\\\\{GREEDYDATA:message\\\\}", "%\\\\{TIMESTAMP_ISO8601:timestamp\\\\} \[%\\\\{LOGLEVEL:level\\\\}\] %\\\\{GREEDYDATA:message\\\\}", "%\\\\{GREEDYDATA:message\\\\}" ] \\\\} break_on_match => true \\\\} \\\\} ```_ ### Datum Filter **Parse Timestamps:** ```ruby filter \\\\{ date \\\\{ match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] target => "@timestamp" timezone => "UTC" \\\\} \\\\} ```_ **Multiple Date Formate:** ```ruby filter \\\\{ date \\\\{ match => [ "timestamp", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSSZ", "MMM dd HH:mm:ss" ] target => "@timestamp" locale => "en" \\\\} \\\\} ```_ ### Mutate Filter **Field Manipulation:** ```ruby filter \\\\{ mutate \\\\{ # Add fields add_field => \\\\{ "environment" => "production" "processed_by" => "logstash" \\\\} # Remove fields remove_field => [ "host", "agent" ] # Rename fields rename => \\\\{ "old_field" => "new_field" \\\\} # Convert field types convert => \\\\{ "response_time" => "float" "status_code" => "integer" \\\\} # String operations lowercase => [ "method" ] uppercase => [ "level" ] strip => [ "message" ] gsub => [ "message", "/", "_" ] \\\\} \\\\} ```_ ### JSON Filter **Parse JSON:** ```ruby filter \\\\{ json \\\\{ source => "message" target => "parsed_json" skip_on_invalid_json => true \\\\} \\\\} ```_ ### CSV Filter **Parse CSV Daten:** ```ruby filter \\\\{ csv \\\\{ separator => "," columns => [ "timestamp", "level", "component", "message" ] skip_header => true convert => \\\\{ "timestamp" => "date" \\\\} \\\\} \\\\} ```_ ### Bedingte Verarbeitung ** Zusätzliche Filter:** ```ruby filter \\\\{ if [type] == "apache" \\\\{ grok \\\\{ match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\} \\\\} \\\\} else if [type] == "nginx" \\\\{ grok \\\\{ match => \\\\{ "message" => "%\\\\{NGINXACCESS\\\\}" \\\\} \\\\} \\\\} if [status] >= 400 \\\\{ mutate \\\\{ add_tag => [ "error" ] \\\\} \\\\} if "error" in [tags] \\\\{ mutate \\\\{ add_field => \\\\{ "alert_level" => "high" \\\\} \\\\} \\\\} \\\\} ```_ ## Plugins ausgeben ### Elasticsearch Output **Basic Elasticsearch Ausgabe:** ```ruby output \\\\{ elasticsearch \\\\{ hosts => ["localhost:9200"] index => "logstash-%\\\\{+YYYY.MM.dd\\\\}" \\\\} \\\\} ```_ **Advanced Elasticsearch Ausgabe:** ```ruby output \\\\{ elasticsearch \\\\{ hosts => ["es-node1:9200", "es-node2:9200", "es-node3:9200"] index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}" template_name => "logstash" template_pattern => "logstash-*" template_overwrite => true ssl => true ssl_certificate_verification => true ssl_certificate => "/path/to/cert.pem" ssl_key => "/path/to/key.pem" user => "logstash_writer" password => "password" retry_on_conflict => 3 action => "index" \\\\} \\\\} ```_ ### Dateiausgang **Dateiausgang:** ```ruby output \\\\{ file \\\\{ path => "/var/log/logstash/output.log" codec => line \\\\{ format => "%\\\\{timestamp\\\\} %\\\\{level\\\\} %\\\\{message\\\\}" \\\\} flush_interval => 10 \\\\} \\\\} ```_ ### Kafka Ausgabe **Kafka Ausgabe:** ```ruby output \\\\{ kafka \\\\{ bootstrap_servers => "kafka1:9092,kafka2:9092" topic_id => "logstash-logs" codec => json compression_type => "gzip" batch_size => 100 linger_ms => 10 \\\\} \\\\} ```_ ### Bedingte Ausgänge ** Multiple Outputs:** ```ruby output \\\\{ if [type] == "error" \\\\{ elasticsearch \\\\{ hosts => ["localhost:9200"] index => "errors-%\\\\{+YYYY.MM.dd\\\\}" \\\\} email \\\\{ to => "admin@company.com" subject => "Error Alert: %\\\\{message\\\\}" body => "Error occurred at %\\\\{@timestamp\\\\}: %\\\\{message\\\\}" \\\\} \\\\} else \\\\{ elasticsearch \\\\{ hosts => ["localhost:9200"] index => "logs-%\\\\{+YYYY.MM.dd\\\\}" \\\\} \\\\} \\\\} ```_ ## Pipeline Management ### Mehrere Pipelines **pipelines.yml Konfiguration:** ```yaml - pipeline.id: apache-logs path.config: "/etc/logstash/conf.d/apache.conf" pipeline.workers: 2 pipeline.batch.size: 125 - pipeline.id: nginx-logs path.config: "/etc/logstash/conf.d/nginx.conf" pipeline.workers: 1 pipeline.batch.size: 50 - pipeline.id: application-logs path.config: "/etc/logstash/conf.d/app.conf" pipeline.workers: 4 pipeline.batch.size: 200 ```_ ### Pipeline-zu-Pipeline-Kommunikation **Sending Pipeline:** ```ruby output \\\\{ pipeline \\\\{ send_to => ["processing-pipeline"] \\\\} \\\\} ```_ **Receiving Pipeline:** ```ruby input \\\\{ pipeline \\\\{ address => "processing-pipeline" \\\\} \\\\} ```_ ## Befehlsleitung ### Service Management ** Service Control:** ```bash # Start Logstash sudo systemctl start logstash # Stop Logstash sudo systemctl stop logstash # Restart Logstash sudo systemctl restart logstash # Check status sudo systemctl status logstash # View logs sudo journalctl -u logstash -f ```_ ### Konfigurationsprüfung ** Testkonfiguration:** ```bash # Test configuration syntax sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t # Test with specific config file sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf -t # Run with debug output sudo -u logstash /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --log.level debug ```_ ### Manuelle Ausführung **Run Logstash Manuell: ** ```bash # Run with specific config /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf # Run with inline config /usr/share/logstash/bin/logstash -e 'input \\\\{ stdin \\\\{ \\\\} \\\\} output \\\\{ stdout \\\\{\\\\} \\\\}' # Run with additional JVM options LS_JAVA_OPTS="-Xmx2g -Xms2g" /usr/share/logstash/bin/logstash -f config.conf ```_ ## Überwachung und Debugging ### API Monitoring ** Keine Stats:** ```bash # Get node information curl -X GET "localhost:9600/_node/stats?pretty" # Get pipeline stats curl -X GET "localhost:9600/_node/stats/pipelines?pretty" # Get JVM stats curl -X GET "localhost:9600/_node/stats/jvm?pretty" # Get process stats curl -X GET "localhost:9600/_node/stats/process?pretty" ```_ **Pipeline Management: ** ```bash # List pipelines curl -X GET "localhost:9600/_node/pipelines?pretty" # Get specific pipeline stats curl -X GET "localhost:9600/_node/stats/pipelines/main?pretty" # Hot reload pipeline curl -X POST "localhost:9600/_node/pipelines/main/reload" ```_ ### Analyse der Ergebnisse **Debug Logging: ** ```bash # Enable debug logging echo 'logger.logstash.level = debug' >> /etc/logstash/log4j2.properties # Monitor specific logger echo 'logger.slowlog.name = slowlog' >> /etc/logstash/log4j2.properties echo 'logger.slowlog.level = trace' >> /etc/logstash/log4j2.properties ```_ **Performance Monitoring: ** ```bash # Monitor pipeline performance tail -f /var/log/logstash/logstash-plain.log|grep "pipeline.stats" # Check for slow filters grep "slowlog" /var/log/logstash/logstash-slow.log # Monitor memory usage ps aux|grep logstash jstat -gc $(pgrep -f logstash) ```_ ## Leistung Tuning ### JVM Tuning **JVM Einstellungen (jvm.options):** ```bash # Heap size (adjust based on available memory) -Xms2g -Xmx2g # Garbage collection -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1HeapRegionSize=16m # Memory settings -XX:+UseLargePages -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap # Debugging options -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:/var/log/logstash/gc.log ```_ ### Pipeline Optimierung **Leistungseinstellungen:** ```yaml # Pipeline workers (number of CPU cores) pipeline.workers: 8 # Batch processing pipeline.batch.size: 1000 pipeline.batch.delay: 50 # Queue settings queue.type: persisted queue.max_bytes: 4gb queue.checkpoint.writes: 1024 # Dead letter queue dead_letter_queue.enable: true dead_letter_queue.max_bytes: 1gb ```_ ### Filteroptimierung **Effiziente Filter:** ```ruby filter \\\\{ # Use conditionals to avoid unnecessary processing if [type] == "apache" \\\\{ grok \\\\{ match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\} \\\\} \\\\} # Use break_on_match for multiple patterns grok \\\\{ match => \\\\{ "message" => [ "%\\\\{PATTERN1\\\\}", "%\\\\{PATTERN2\\\\}" ] \\\\} break_on_match => true \\\\} # Remove unnecessary fields early mutate \\\\{ remove_field => [ "host", "agent", "@version" ] \\\\} \\\\} ```_ ## Sicherheitskonfiguration ### SSL/TLS Setup **Input SSL Konfiguration:** ```ruby input \\\\{ beats \\\\{ port => 5044 ssl => true ssl_certificate => "/etc/logstash/certs/logstash.crt" ssl_key => "/etc/logstash/certs/logstash.key" ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"] ssl_verify_mode => "force_peer" \\\\} \\\\} ```_ **Output SSL Konfiguration:** ```ruby output \\\\{ elasticsearch \\\\{ hosts => ["https://elasticsearch:9200"] ssl => true ssl_certificate_verification => true ssl_certificate => "/etc/logstash/certs/client.crt" ssl_key => "/etc/logstash/certs/client.key" ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"] \\\\} \\\\} ```_ ### Authentication **Elasticsearch Authentication:** ```ruby output \\\\{ elasticsearch \\\\{ hosts => ["localhost:9200"] user => "logstash_writer" password => "$\\\\{LOGSTASH_PASSWORD\\\\}" index => "logstash-%\\\\{+YYYY.MM.dd\\\\}" \\\\} \\\\} ```_ ## Fehlerbehebung ### Gemeinsame Themen **Pipeline startet nicht:** ```bash # Check configuration syntax sudo -u logstash /usr/share/logstash/bin/logstash -t # Check file permissions ls -la /etc/logstash/conf.d/ sudo chown -R logstash:logstash /etc/logstash/ # Check Java version java -version ```_ **Leistungsfragen:** ```bash # Monitor resource usage top -p $(pgrep -f logstash) iostat -x 1 # Check pipeline stats curl -X GET "localhost:9600/_node/stats/pipelines?pretty" # Analyze slow logs grep "WARN" /var/log/logstash/logstash-plain.log ```_ **Memory Issues:** ```bash # Check heap usage jstat -gc $(pgrep -f logstash) # Monitor garbage collection tail -f /var/log/logstash/gc.log # Adjust heap size echo "-Xmx4g" >> /etc/logstash/jvm.options echo "-Xms4g" >> /etc/logstash/jvm.options ```_ ## Integrationsbeispiele ### ELK Stack Integration **Komplett ELK Pipeline:** ```ruby input \\\\{ beats \\\\{ port => 5044 \\\\} \\\\} filter \\\\{ if [@metadata][beat] == "filebeat" \\\\{ if [fields][log_type] == "apache" \\\\{ grok \\\\{ match => \\\\{ "message" => "%\\\\{COMBINEDAPACHELOG\\\\}" \\\\} \\\\} date \\\\{ match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] \\\\} \\\\} \\\\} \\\\} output \\\\{ elasticsearch \\\\{ hosts => ["elasticsearch:9200"] index => "%\\\\{[@metadata][beat]\\\\}-%\\\\{[@metadata][version]\\\\}-%\\\\{+YYYY.MM.dd\\\\}" \\\\} \\\\} ```_ ### Integration von Kafka **Kafka zu Elastikforschung:** ```ruby input \\\\{ kafka \\\\{ bootstrap_servers => "kafka:9092" topics => ["logs"] group_id => "logstash" consumer_threads => 4 codec => json \\\\} \\\\} filter \\\\{ date \\\\{ match => [ "timestamp", "ISO8601" ] \\\\} \\\\} output \\\\{ elasticsearch \\\\{ hosts => ["elasticsearch:9200"] index => "kafka-logs-%\\\\{+YYYY.MM.dd\\\\}" \\\\} \\\\} ```_

Dieses umfassende Logstash-Catsheet umfasst Installation, Konfiguration, Pipeline-Management und erweiterte Funktionen für eine effektive Protokollverarbeitung und Datentransformation.