Ir al contenido

Apache NiFi Cheat Sheet

Overview

Apache NiFi is a powerful data logistics platform designed to automate the flow of data between systems. Built on the concepts of flow-based programming, NiFi provides a web-based UI where users can design data flows by dragging and dropping processors, connecting them with relationships, and configuring routing rules. Every piece of data is tracked through the system with full provenance, making it ideal for regulated industries.

NiFi handles data ingestion from hundreds of sources including files, APIs, databases, message queues, IoT devices, and cloud services. It provides back-pressure management, prioritized queuing, guaranteed delivery, and data loss prevention. The MiNiFi subproject extends NiFi’s reach to edge devices. NiFi runs as a clustered service for horizontal scalability and integrates tightly with the Hadoop ecosystem, Kafka, and cloud platforms.

Installation

# Download Apache NiFi
wget https://downloads.apache.org/nifi/1.27.0/nifi-1.27.0-bin.zip
unzip nifi-1.27.0-bin.zip
cd nifi-1.27.0

# Start NiFi
./bin/nifi.sh start

# Check status
./bin/nifi.sh status

# Stop NiFi
./bin/nifi.sh stop

# Access web UI at https://localhost:8443/nifi
# Default credentials are generated in logs/nifi-app.log

Docker Installation

docker run -d \
    --name nifi \
    -p 8443:8443 \
    -e SINGLE_USER_CREDENTIALS_USERNAME=admin \
    -e SINGLE_USER_CREDENTIALS_PASSWORD=admin12345678 \
    -e NIFI_WEB_HTTPS_PORT=8443 \
    apache/nifi:1.27.0

NiFi Toolkit

# Download NiFi Toolkit for CLI operations
wget https://downloads.apache.org/nifi/1.27.0/nifi-toolkit-1.27.0-bin.zip
unzip nifi-toolkit-1.27.0-bin.zip

# Configure CLI
./bin/cli.sh nifi current-user

Core Processors

ProcessorDescription
GetFileReads files from a local directory
PutFileWrites FlowFiles to a local directory
GetSFTP / PutSFTPTransfer files via SFTP
GetHTTP / InvokeHTTPMake HTTP requests to APIs
ListenHTTPStart an HTTP server to receive data
ConsumeKafka / PublishKafkaRead/write Kafka topics
QueryDatabaseTableExecute SQL against databases via JDBC
PutDatabaseRecordInsert records into databases
ExecuteSQL / ExecuteScriptRun SQL queries or custom scripts
ConvertRecordConvert between data formats (JSON, Avro, CSV)
SplitJSON / SplitRecordSplit FlowFiles into individual records
MergeContentMerge multiple FlowFiles into one
RouteOnAttributeRoute based on FlowFile attribute values
RouteOnContentRoute based on FlowFile content
UpdateAttributeAdd or modify FlowFile attributes
EvaluateJsonPathExtract values from JSON into attributes
JoltTransformJSONTransform JSON using JOLT specifications
ReplaceTextFind and replace text content
PutS3Object / FetchS3ObjectRead/write AWS S3
PutGCSObject / FetchGCSObjectRead/write Google Cloud Storage
PublishJMS / ConsumeJMSJMS messaging integration

CLI Commands

CommandDescription
./bin/nifi.sh startStart NiFi
./bin/nifi.sh stopStop NiFi
./bin/nifi.sh restartRestart NiFi
./bin/nifi.sh statusCheck NiFi status
./bin/nifi.sh dumpThread dump for debugging
cli.sh nifi get-root-idGet root process group ID
cli.sh nifi pg-listList process groups
cli.sh nifi pg-import -i <id> -f flow.jsonImport a flow definition
cli.sh nifi pg-export -o flow.json -pgid <id>Export a flow definition
cli.sh nifi pg-start -pgid <id>Start a process group
cli.sh nifi pg-stop -pgid <id>Stop a process group
cli.sh registry list-bucketsList NiFi Registry buckets
cli.sh registry create-bucket -bn my-bucketCreate a registry bucket

Configuration

nifi.properties Key Settings

# Web properties
nifi.web.https.host=0.0.0.0
nifi.web.https.port=8443

# Cluster settings
nifi.cluster.is.node=true
nifi.cluster.node.address=node1.example.com
nifi.cluster.node.protocol.port=11443
nifi.zookeeper.connect.string=zk1:2181,zk2:2181,zk3:2181

# Content repository
nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.usage.percentage=50%
nifi.content.repository.archive.max.retention.period=7 days

# Provenance repository
nifi.provenance.repository.directory.default=./provenance_repository
nifi.provenance.repository.max.storage.time=30 days
nifi.provenance.repository.max.storage.size=10 GB

# Performance tuning
nifi.queue.swap.threshold=20000
nifi.bored.yield.duration=10 millis
nifi.content.claim.max.appendable.size=1 MB

Flow Definition (JSON)

{
  "rootGroup": {
    "name": "ETL Pipeline",
    "processors": [
      {
        "name": "GetSourceData",
        "type": "org.apache.nifi.processors.standard.GetFile",
        "config": {
          "properties": {
            "Input Directory": "/data/incoming",
            "File Filter": "[^.].*.csv",
            "Keep Source File": "false"
          },
          "schedulingPeriod": "10 sec",
          "schedulingStrategy": "TIMER_DRIVEN"
        }
      }
    ]
  }
}

Advanced Usage

Expression Language

# FlowFile attribute references
${filename}
${filename:substringBefore('.')}
${fileSize:toNumber():divide(1024)}

# Date/time functions
${now():format('yyyy-MM-dd')}
${now():toNumber():minus(86400000):format('yyyy-MM-dd')}

# Conditional logic
${filename:endsWith('.csv'):ifElse('CSV','OTHER')}
${status:equals('active'):and(${age:gt(18)})}

# String manipulation
${hostname:toUpper()}
${message:replaceAll('[^a-zA-Z0-9]', '_')}
${uuid()}

Record-Based Processing

// Avro schema for ConvertRecord processor
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "order_date", "type": "string"}
  ]
}

NiFi Registry Version Control

# Connect NiFi to Registry
# In nifi.properties:
# nifi.registry.url=http://registry:18080

# Or via CLI
cli.sh registry create-bucket -bn production-flows

# Export and version flows
cli.sh nifi pg-export -pgid <group-id> -o flow.json
cli.sh registry import-flow-version \
    -bn production-flows \
    -fn etl-pipeline \
    -f flow.json

Custom Processor Development

# Generate processor project with Maven archetype
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.nifi \
    -DarchetypeArtifactId=nifi-processor-bundle-archetype \
    -DarchetypeVersion=1.27.0 \
    -DnifiVersion=1.27.0 \
    -DgroupId=com.example \
    -DartifactId=my-processors

# Build and deploy
cd my-processors
mvn clean package
cp nifi-my-processors-nar/target/*.nar $NIFI_HOME/lib/

Troubleshooting

IssueSolution
NiFi won’t startCheck logs/nifi-app.log for errors. Verify Java 11+ is installed
Processor in invalid stateCheck processor configuration. Required properties may be missing
Back-pressure causing stallsIncrease queue thresholds or add more processing capacity
Content repository fullIncrease disk space or reduce archive.max.retention.period
Out of memoryIncrease heap in bootstrap.conf: java.arg.3=-Xmx4g
Cluster node disconnectedCheck ZooKeeper connectivity and firewall rules between nodes
Slow provenance queriesReduce max.storage.time or switch to WriteAheadProvenanceRepository
SSL/TLS certificate errorsVerify keystore/truststore paths and passwords in nifi.properties
FlowFile stuck in queueCheck downstream processor errors. Use Empty Queue option if safe
Bulletin board alertsClick the bulletin icon to see warnings. Address processor-level errors