Salta ai contenuti

Source: Input plugins

PiattaformaComando
Ubuntu/Debian (td-agent)`curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-td-agent4.sh \
RHEL/CentOS`curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh \
macOSbrew install fluentd
Ruby Gemgem install fluentd
Dockerdocker pull fluent/fluentd:latest
KubernetesDistribuisci come DaemonSet (vedi sezione Configurazione)
ComandoDescrizione
fluentd -c fluent.confAvvia Fluentd con il file di configurazione specificato
fluentd -c fluent.conf -vvEsegui con output di debug dettagliato
fluentd -c fluent.conf --dry-runConvalida configurazione senza avviare
fluentd --setup ./fluentCreare la struttura di directory di configurazione predefinita
fluentd --versionVisualizza informazioni sulla versione di Fluentd
sudo systemctl start td-agentAvvia il servizio td-agent (Linux)
sudo systemctl stop td-agentArrestare il servizio td-agent
sudo systemctl restart td-agentRiavvia il servizio td-agent
sudo systemctl status td-agentControlla lo stato del servizio td-agent
sudo systemctl reload td-agentRicarica configurazione senza riavviare
sudo systemctl enable td-agentAbilita td-agent all’avvio automatico
sudo journalctl -u td-agent -fSegui i log del servizio td-agent in tempo reale
`echo ’{“msg”:“test”}’ \fluent-cat debug.test`
curl -X POST -d 'json={"event":"test"}' http://localhost:8888/test.cycleInvia log di test HTTP
`td-agent-gem list \grep fluent-plugin`
ComandoDescrizione
fluentd -c fluent.conf -d /var/run/fluentd.pidEsegui Fluentd in modalità daemon con file PID
fluentd -c fluent.conf -o /var/log/fluentd.logEsegui con output su file di log specifico
fluentd -c fluent.conf --workers 4Esegui con più processi worker
fluentd -c fluent.conf -vvvEsegui con registrazione a livello di traccia per il debug
fluentd --show-plugin-config=input:tailVisualizza le opzioni di configurazione per il plugin specifico
td-agent-gem install fluent-plugin-elasticsearchInstalla il plugin di output Elasticsearch
td-agent-gem install fluent-plugin-kafka -v 0.17.5Installa una versione specifica del plugin Kafka
td-agent-gem update fluent-plugin-s3Aggiorna il plugin S3 all’ultima versione
td-agent-gem uninstall fluent-plugin-mongoRimuovi plugin MongoDB
td-agent-gem search -r fluent-pluginCerca plugin disponibili nel repository
fluent-cat --host 192.168.1.100 --port 24224 app.logsInvia log all’istanza remota di Fluentd
fluent-cat app.logs < /path/to/logfile.jsonInvia il contenuto del file di log a Fluentd
docker run -d -p 24224:24224 -v /data/fluentd:/fluentd/etc fluent/fluentdEsegui Fluentd in Docker con configurazione montata
sudo kill -USR1 $(cat /var/run/td-agent/td-agent.pid)Ricarica Fluentd con eleganza (riapri i file di log)
sudo kill -USR2 $(cat /var/run/td-agent/td-agent.pid)Riaprire i file di log di Fluentd senza ricaricamento
/etc/td-agent/td-agent.conf## Utilizzo Avanzato
./fluent/fluent.conf## Configurazione
/fluentd/etc/fluent.conf### Posizioni dei File di Configurazione Principali
  • td-agent (Linux): ```ruby

Source: Input plugins

@type forward port 24224 bind 0.0.0.0

Filter: Process/transform logs

<filter app.**> @type record_transformer hostname ”#{Socket.gethostname}” tag ${tag}

Match: Output plugins

<match app.**> @type elasticsearch host elasticsearch.local port 9200 index_name fluentd type_name fluentd

- **Installazione Gem**: ```ruby
# Forward input (receive from other Fluentd instances)
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# Tail log files
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/td-agent/nginx-access.pos
  tag nginx.access
  <parse>
    @type nginx
  </parse>
</source>

# HTTP input
<source>
  @type http
  port 8888
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
</source>

# Syslog input
<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  tag system.syslog
</source>
  • Docker: ```ruby

Add/modify record fields

<filter app.**> @type record_transformer hostname ”#{Socket.gethostname}” environment production timestamp ${time}

Parse unstructured logs

@type parser key_name message @type json

Grep filter (include/exclude)

<filter app.**> @type grep key level pattern /^(ERROR|FATAL)$/

Modify tag

<match app.raw.**> @type rewrite_tag_filter key level pattern /^ERROR$/ tag app.error.${tag}

```ruby
# Elasticsearch output
<match app.**>
  @type elasticsearch
  host elasticsearch.local
  port 9200
  logstash_format true
  logstash_prefix fluentd
  <buffer>
    @type file
    path /var/log/fluentd/buffer/elasticsearch
    flush_interval 10s
    retry_max_interval 300s
  </buffer>
</match>

# S3 output
<match logs.**>
  @type s3
  aws_key_id YOUR_AWS_KEY_ID
  aws_sec_key YOUR_AWS_SECRET_KEY
  s3_bucket your-bucket-name
  s3_region us-east-1
  path logs/
  time_slice_format %Y%m%d%H
  <buffer time>
    timekey 3600
    timekey_wait 10m
  </buffer>
</match>

# File output
<match debug.**>
  @type file
  path /var/log/fluentd/output
  <buffer>
    timekey 1d
    timekey_use_utc true
  </buffer>
</match>

# Forward to another Fluentd
<match forward.**>
  @type forward
  <server>
    host 192.168.1.100
    port 24224
  </server>
  <buffer>
    @type file
    path /var/log/fluentd/buffer/forward
  </buffer>
</match>

# Stdout (debugging)
<match debug.**>
  @type stdout
</match>
```### Struttura di Configurazione Base
```ruby
<match pattern.**>
  @type elasticsearch
  
  # File buffer with advanced settings
  <buffer>
    @type file
    path /var/log/fluentd/buffer
    
    # Flush settings
    flush_mode interval
    flush_interval 10s
    flush_at_shutdown true
    
    # Retry settings
    retry_type exponential_backoff
    retry_wait 10s
    retry_max_interval 300s
    retry_timeout 72h
    retry_max_times 17
    
    # Chunk settings
    chunk_limit_size 5M
    queue_limit_length 32
    overflow_action drop_oldest_chunk
    
    # Compression
    compress gzip
  </buffer>
</match>

# Memory buffer for high-performance
<match fast.**>
  @type forward
  <buffer>
    @type memory
    flush_interval 5s
    chunk_limit_size 1M
    queue_limit_length 64
  </buffer>
</match>
<system>
  workers 4
  root_dir /var/log/fluentd
</system>

# Worker-specific sources
<worker 0>
  <source>
    @type forward
    port 24224
  </source>
</worker>

<worker 1-3>
  <source>
    @type tail
    path /var/log/app/*.log
    tag app.logs
  </source>
</worker>
```### Plugin Sorgente (Input)
```ruby
# Route to different pipelines using labels
<source>
  @type forward
  @label @mainstream
</source>

<source>
  @type tail
  path /var/log/secure.log
  @label @security
</source>

<label @mainstream>
  <filter **>
    @type record_transformer
    <record>
      pipeline mainstream
    </record>
  </filter>
  
  <match **>
    @type elasticsearch
    host es-main
  </match>
</label>

<label @security>
  <filter **>
    @type grep
    <regexp>
      key message
      pattern /authentication failure/
    </regexp>
  </filter>
  
  <match **>
    @type s3
    s3_bucket security-logs
  </match>
</label>
# Install Elasticsearch plugin
sudo td-agent-gem install fluent-plugin-elasticsearch

# Configure Fluentd
sudo tee /etc/td-agent/td-agent.conf > /dev/null <<'EOF'
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/td-agent/nginx-access.pos
  tag nginx.access
  <parse>
    @type nginx
  </parse>
</source>

<match nginx.access>
  @type elasticsearch
  host localhost
  port 9200
  logstash_format true
  logstash_prefix nginx
  <buffer>
    flush_interval 10s
  </buffer>
</match>
EOF

# Restart td-agent
sudo systemctl restart td-agent

# Verify logs are flowing
sudo journalctl -u td-agent -f
```### Plugin Filtro (Elaborazione)
```yaml
# Deploy Fluentd DaemonSet
kubectl apply -f - <<'EOF'
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluentd
rules:
- apiGroups: [""]
  resources: ["pods", "namespaces"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fluentd
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: fluentd
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: kube-system
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
    spec:
      serviceAccountName: fluentd
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch
        env:
        - name: FLUENT_ELASTICSEARCH_HOST
          value: "elasticsearch.logging.svc.cluster.local"
        - name: FLUENT_ELASTICSEARCH_PORT
          value: "9200"
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
EOF

# Check DaemonSet status
kubectl get daemonset -n kube-system fluentd
kubectl logs -n kube-system -l k8s-app=fluentd-logging --tail=50
# Install S3 plugin
sudo td-agent-gem install fluent-plugin-s3

# Configure S3 output
sudo tee /etc/td-agent/td-agent.conf > /dev/null <<'EOF'
<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/td-agent/app.pos
  tag app.logs
  <parse>
    @type json
  </parse>
</source>

<match app.logs>
  @type s3
  
  aws_key_id YOUR_AWS_ACCESS_KEY
  aws_sec_key YOUR_AWS_SECRET_KEY
  s3_bucket my-application-logs
  s3_region us-east-1
  
  path logs/%Y/%m/%d/
  s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
  
  <buffer time>
    @type file
    path /var/log/td-agent/s3
    timekey 3600
    timekey_wait 10m
    chunk_limit_size 256m
  </buffer>
  
  <format>
    @type json
  </format>
</match>
EOF

# Restart and verify
sudo systemctl restart td-agent
sudo systemctl status td-agent
```### Plugin Match (Output)
```bash
# Configure routing to multiple destinations
sudo tee /etc/td-agent/td-agent.conf > /dev/null <<'EOF'
<source>
  @type tail
  path /var/log/app/application.log
  pos_file /var/log/td-agent/app.pos
  tag app.logs
  <parse>
    @type json
  </parse>
</source>

# Copy logs to multiple destinations
<match app.logs>
  @type copy
  
  # Send to Elasticsearch
  <store>
    @type elasticsearch
    host elasticsearch.local
    port 9200
    logstash_format true
  </store>
  
  # Send to S3 for archival
  <store>
    @type s3
    s3_bucket app-logs-archive
    path logs/
    <buffer time>
      timekey 86400
    </buffer>
  </store>
  
  # Send errors to Slack
  <store>
    @type grep
    <regexp>
      key level
      pattern /^ERROR$/
    </regexp>
    @type slack
    webhook_url https://hooks.slack.com/services/YOUR/WEBHOOK/URL
    channel alerts
    username fluentd
  </store>
</match>
EOF

sudo systemctl restart td-agent
# Configure APM log forwarding
sudo tee /etc/td-agent/td-agent.conf > /dev/null <<'EOF'
<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/td-agent/app.pos
  tag app.logs
  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

# Enrich logs with metadata
<filter app.logs>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    environment ${ENV['ENVIRONMENT'] || 'production'}
    service_name myapp
    trace_id ${record['trace_id']}
  </record>
</filter>

# Calculate response time metrics
<filter app.logs>
  @type prometheus
  <metric>
    name http_request_duration_seconds
    type histogram
    desc HTTP request duration
    key response_time
  </metric>
</filter>

# Forward to APM system
<match app.logs>
  @type http
  endpoint http://apm-server:8200/intake/v2/events
  <buffer>
    flush_interval 5s
  </buffer>
</match>
EOF

sudo systemctl restart td-agent
```### Configurazione Buffer
`pos_file`per input di coda e impostare appropriatamente`timekey`valori nei buffer per prevenire problemi di spazio su disco. Utilizzare`rotate_age`e`rotate_size`per output di file.

- **Etichettare i log gerarchicamente**: Utilizzare tag con notazione a punti (es.,`app.production.web`) per consentire un instradamento e filtraggio flessibile. Questo permette di abbinare pattern come`app.**`o`app.production.*`.

- **Monitorare le prestazioni di Fluentd**: Tracciare la lunghezza della coda del buffer, i conteggi dei tentativi e i tassi di emissione. Utilizzare il plugin Prometheus o il monitoraggio integrato per rilevare colli di bottiglia prima che causino perdita di dati.

- **Proteggere dati sensibili**: Utilizzare`@type secure_forward`per la trasmissione di log crittografati, filtrare campi sensibili con`record_modifier`, e limitare i permessi dei file nelle configurazioni contenenti credenziali.

- **Testare modifiche di configurazione**: Utilizzare sempre`--dry-run`per convalidare la sintassi della configurazione prima del deployment. Testare la logica di routing con volumi di log ridotti prima di applicarli in produzione.

- **Utilizzare la modalità multi-worker con giudizio**: Abilitare worker per operazioni intensive della CPU (parsing, filtraggio) ma essere consapevoli che alcuni plugin non supportano la modalità multi-worker. Iniziare con 2-4 worker e monitorare l'utilizzo della CPU.

- **Implementare degradazione graduale**: Configurare`overflow_action`nei buffer per gestire il backpressure (utilizzare`drop_oldest_chunk`o`block`in base ai propri requisiti). Impostare valori`retry_timeout`ragionevoli per prevenire tentativi infiniti.

- **Separare le preoccupazioni con etichette**: Utilizzare direttive`@label`per creare pipeline di elaborazione isolate per diversi tipi di log. Questo migliora la manutenibilità e previene l'instradamento involontario.

- **Mantenere i plugin aggiornati**: Aggiornare regolarmente Fluentd e plugin per ottenere correzioni di sicurezza e miglioramenti delle prestazioni. Bloccare le versioni dei plugin in produzione per garantire coerenza.

## Risoluzione dei problemi

| Problema | Soluzione |
|-------|----------|
| **Fluentd won't start** | Check syntax: `fluentd -c fluent.conf --dry-run`. Review logs: `sudo journalctl -u td-agent -n 100`. Verify file permissions on config and buffer directories. |
| **Logs not being collected** | Verify `pos_file` exists and is writable. Check file path patterns match actual log locations. Ensure log files have read permissions. Test with `tail -f` on the log file. |
| **High memory usage** | Switch from memory buffers to file buffers. Reduce `chunk_limit_size` and `queue_limit_length`. Enable multi-worker mode to distribute load. Check for memory leaks in custom plugins. |
| **Buffer queue growing** | Increase `flush_interval` or reduce log volume. Check downstream system capacity (Elasticsearch, S3). Verify network connectivity. Review `retry_max_interval` settings. |
| **Logs being dropped** | Check buffer `overflow_action` setting. Increase `queue_limit_length` and `chunk_limit_size`. Monitor disk space for file buffers. Review `retry_timeout` configuration. |
| **Plugin installation fails** | Ensure Ruby development headers installed: `sudo apt-get install ruby-dev build-essential`. Use correct gem command: `td-agent-gem` not `gem`. Check plugin compatibility with Fluentd version. |
| **Parse errors in logs** | Validate parser configuration with sample logs. Use `@type regexp` with proper regex patterns. Add error handling: `emit_invalid_record_to_error true`. Check time format strings. |
| **Cannot connect to Elasticsearch** | Verify Elasticsearch is running: `curl http://elasticsearch:9200`. Check firewall rules. Validate credentials if using authentication. Review Elasticsearch logs for rejection reasons. |
| **Duplicate logs appearing** | Check `pos_file` location is persistent across restarts. Verify only one Fluentd instance is running. Review `read_from_head` setting (should be `false` in production). |
| **Slow log processing** | Enable multi-worker mode. Optimize regex patterns in filters. Use `@type grep` before expensive parsers. Profile with `--trace` flag to identify bottlenecks. |
| **SSL/TLS connection errors** | Verify certificate paths and permissions. Check certificate expiration dates. Ensure CA bundle is up to date. Use `verify_ssl false` for testing only (not production). || **Problemi di fuso orario** | Impostare`utc`o`localtime`nel parser del tempo. Utilizzare`time_format`con timezone:`%Y-%m-%dT%H:%M:%S%z`. Configurare correttamente il fuso orario di sistema. |