Overview
Apache NiFi is a powerful data logistics platform designed to automate the flow of data between systems. Built on the concepts of flow-based programming, NiFi provides a web-based UI where users can design data flows by dragging and dropping processors, connecting them with relationships, and configuring routing rules. Every piece of data is tracked through the system with full provenance, making it ideal for regulated industries.
NiFi handles data ingestion from hundreds of sources including files, APIs, databases, message queues, IoT devices, and cloud services. It provides back-pressure management, prioritized queuing, guaranteed delivery, and data loss prevention. The MiNiFi subproject extends NiFi’s reach to edge devices. NiFi runs as a clustered service for horizontal scalability and integrates tightly with the Hadoop ecosystem, Kafka, and cloud platforms.
Installation
# Download Apache NiFi
wget https://downloads.apache.org/nifi/1.27.0/nifi-1.27.0-bin.zip
unzip nifi-1.27.0-bin.zip
cd nifi-1.27.0
# Start NiFi
./bin/nifi.sh start
# Check status
./bin/nifi.sh status
# Stop NiFi
./bin/nifi.sh stop
# Access web UI at https://localhost:8443/nifi
# Default credentials are generated in logs/nifi-app.log
Docker Installation
docker run -d \
--name nifi \
-p 8443:8443 \
-e SINGLE_USER_CREDENTIALS_USERNAME=admin \
-e SINGLE_USER_CREDENTIALS_PASSWORD=admin12345678 \
-e NIFI_WEB_HTTPS_PORT=8443 \
apache/nifi:1.27.0
# Download NiFi Toolkit for CLI operations
wget https://downloads.apache.org/nifi/1.27.0/nifi-toolkit-1.27.0-bin.zip
unzip nifi-toolkit-1.27.0-bin.zip
# Configure CLI
./bin/cli.sh nifi current-user
Core Processors
| Processor | Description |
|---|
GetFile | Reads files from a local directory |
PutFile | Writes FlowFiles to a local directory |
GetSFTP / PutSFTP | Transfer files via SFTP |
GetHTTP / InvokeHTTP | Make HTTP requests to APIs |
ListenHTTP | Start an HTTP server to receive data |
ConsumeKafka / PublishKafka | Read/write Kafka topics |
QueryDatabaseTable | Execute SQL against databases via JDBC |
PutDatabaseRecord | Insert records into databases |
ExecuteSQL / ExecuteScript | Run SQL queries or custom scripts |
ConvertRecord | Convert between data formats (JSON, Avro, CSV) |
SplitJSON / SplitRecord | Split FlowFiles into individual records |
MergeContent | Merge multiple FlowFiles into one |
RouteOnAttribute | Route based on FlowFile attribute values |
RouteOnContent | Route based on FlowFile content |
UpdateAttribute | Add or modify FlowFile attributes |
EvaluateJsonPath | Extract values from JSON into attributes |
JoltTransformJSON | Transform JSON using JOLT specifications |
ReplaceText | Find and replace text content |
PutS3Object / FetchS3Object | Read/write AWS S3 |
PutGCSObject / FetchGCSObject | Read/write Google Cloud Storage |
PublishJMS / ConsumeJMS | JMS messaging integration |
CLI Commands
| Command | Description |
|---|
./bin/nifi.sh start | Start NiFi |
./bin/nifi.sh stop | Stop NiFi |
./bin/nifi.sh restart | Restart NiFi |
./bin/nifi.sh status | Check NiFi status |
./bin/nifi.sh dump | Thread dump for debugging |
cli.sh nifi get-root-id | Get root process group ID |
cli.sh nifi pg-list | List process groups |
cli.sh nifi pg-import -i <id> -f flow.json | Import a flow definition |
cli.sh nifi pg-export -o flow.json -pgid <id> | Export a flow definition |
cli.sh nifi pg-start -pgid <id> | Start a process group |
cli.sh nifi pg-stop -pgid <id> | Stop a process group |
cli.sh registry list-buckets | List NiFi Registry buckets |
cli.sh registry create-bucket -bn my-bucket | Create a registry bucket |
Configuration
nifi.properties Key Settings
# Web properties
nifi.web.https.host=0.0.0.0
nifi.web.https.port=8443
# Cluster settings
nifi.cluster.is.node=true
nifi.cluster.node.address=node1.example.com
nifi.cluster.node.protocol.port=11443
nifi.zookeeper.connect.string=zk1:2181,zk2:2181,zk3:2181
# Content repository
nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.usage.percentage=50%
nifi.content.repository.archive.max.retention.period=7 days
# Provenance repository
nifi.provenance.repository.directory.default=./provenance_repository
nifi.provenance.repository.max.storage.time=30 days
nifi.provenance.repository.max.storage.size=10 GB
# Performance tuning
nifi.queue.swap.threshold=20000
nifi.bored.yield.duration=10 millis
nifi.content.claim.max.appendable.size=1 MB
Flow Definition (JSON)
{
"rootGroup": {
"name": "ETL Pipeline",
"processors": [
{
"name": "GetSourceData",
"type": "org.apache.nifi.processors.standard.GetFile",
"config": {
"properties": {
"Input Directory": "/data/incoming",
"File Filter": "[^.].*.csv",
"Keep Source File": "false"
},
"schedulingPeriod": "10 sec",
"schedulingStrategy": "TIMER_DRIVEN"
}
}
]
}
}
Advanced Usage
Expression Language
# FlowFile attribute references
${filename}
${filename:substringBefore('.')}
${fileSize:toNumber():divide(1024)}
# Date/time functions
${now():format('yyyy-MM-dd')}
${now():toNumber():minus(86400000):format('yyyy-MM-dd')}
# Conditional logic
${filename:endsWith('.csv'):ifElse('CSV','OTHER')}
${status:equals('active'):and(${age:gt(18)})}
# String manipulation
${hostname:toUpper()}
${message:replaceAll('[^a-zA-Z0-9]', '_')}
${uuid()}
Record-Based Processing
// Avro schema for ConvertRecord processor
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
}
NiFi Registry Version Control
# Connect NiFi to Registry
# In nifi.properties:
# nifi.registry.url=http://registry:18080
# Or via CLI
cli.sh registry create-bucket -bn production-flows
# Export and version flows
cli.sh nifi pg-export -pgid <group-id> -o flow.json
cli.sh registry import-flow-version \
-bn production-flows \
-fn etl-pipeline \
-f flow.json
Custom Processor Development
# Generate processor project with Maven archetype
mvn archetype:generate \
-DarchetypeGroupId=org.apache.nifi \
-DarchetypeArtifactId=nifi-processor-bundle-archetype \
-DarchetypeVersion=1.27.0 \
-DnifiVersion=1.27.0 \
-DgroupId=com.example \
-DartifactId=my-processors
# Build and deploy
cd my-processors
mvn clean package
cp nifi-my-processors-nar/target/*.nar $NIFI_HOME/lib/
Troubleshooting
| Issue | Solution |
|---|
| NiFi won’t start | Check logs/nifi-app.log for errors. Verify Java 11+ is installed |
| Processor in invalid state | Check processor configuration. Required properties may be missing |
| Back-pressure causing stalls | Increase queue thresholds or add more processing capacity |
| Content repository full | Increase disk space or reduce archive.max.retention.period |
| Out of memory | Increase heap in bootstrap.conf: java.arg.3=-Xmx4g |
| Cluster node disconnected | Check ZooKeeper connectivity and firewall rules between nodes |
| Slow provenance queries | Reduce max.storage.time or switch to WriteAheadProvenanceRepository |
| SSL/TLS certificate errors | Verify keystore/truststore paths and passwords in nifi.properties |
| FlowFile stuck in queue | Check downstream processor errors. Use Empty Queue option if safe |
| Bulletin board alerts | Click the bulletin icon to see warnings. Address processor-level errors |