Overview
Airbyte is an open-source data integration platform that helps teams replicate data from sources to destinations. With over 300 pre-built connectors, Airbyte handles the Extract and Load (EL) part of the ELT pipeline, supporting databases, APIs, SaaS applications, files, and event streams. Users can also build custom connectors using the Connector Development Kit (CDK) in Python or low-code YAML.
Airbyte provides incremental sync modes for efficient data replication, schema change handling, change data capture, transformation capabilities via dbt integration, and a comprehensive API for programmatic control. It can be deployed as a self-hosted open-source platform, via Airbyte Cloud (managed), or embedded in other products. The platform handles rate limiting, pagination, authentication, and error handling for each connector.
Installation
Docker Compose (Self-Hosted)
# Clone Airbyte
git clone --depth=1 https://github.com/airbytehq/airbyte.git
cd airbyte
# Start Airbyte
./run-ab-platform.sh
# Access UI at http://localhost:8000
# Default credentials: airbyte / password
# Or using Docker directly
docker compose up -d
Kubernetes with Helm
# Add Airbyte Helm repo
helm repo add airbyte https://airbytehq.github.io/helm-charts
helm repo update
# Install Airbyte
helm install airbyte airbyte/airbyte \
--namespace airbyte \
--create-namespace \
--values values.yaml
Airbyte CLI (octavia)
# Install Airbyte CLI
pip install octavia-cli
# Or use the official CLI
brew install airbytehq/tap/abctl # macOS
curl -fsSL https://get.airbyte.com | bash # Linux
# Start local Airbyte
abctl local install
# Check status
abctl local status
Core Concepts
Connection Configuration
# Example: PostgreSQL source to BigQuery destination
source:
sourceType: postgres
config:
host: db.example.com
port: 5432
database: production
username: airbyte_reader
password: ${POSTGRES_PASSWORD}
schemas:
- public
- analytics
ssl_mode:
mode: require
replication_method:
method: CDC
publication: airbyte_publication
replication_slot: airbyte_slot
destination:
destinationType: bigquery
config:
project_id: my-gcp-project
dataset_id: raw_data
dataset_location: US
credentials_json: ${GCP_CREDENTIALS_JSON}
loading_method:
method: GCS Staging
gcs_bucket_name: airbyte-staging
gcs_bucket_path: staging
credential:
credential_type: HMAC_KEY
hmac_key_access_id: ${GCS_HMAC_KEY}
hmac_key_secret: ${GCS_HMAC_SECRET}
Sync Modes
| Mode | Description | Use Case |
|---|
| Full Refresh - Overwrite | Replace destination data completely | Small reference tables |
| Full Refresh - Append | Append all source data each sync | Audit/history tables |
| Incremental - Append | Only sync new/changed records | Event logs, append-only |
| Incremental - Append + Deduped | Sync changes, dedupe at destination | SCD Type 1 (latest state) |
| CDC | Capture inserts, updates, deletes via WAL | Real-time replication |
API Reference
REST API
# Base URL: http://localhost:8000/api/v1
# List workspaces
curl http://localhost:8000/api/v1/workspaces/list \
-H "Content-Type: application/json" \
-d '{}'
# Create a source
curl -X POST http://localhost:8000/api/v1/sources/create \
-H "Content-Type: application/json" \
-d '{
"workspaceId": "workspace-uuid",
"name": "Production PostgreSQL",
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"connectionConfiguration": {
"host": "db.example.com",
"port": 5432,
"database": "production",
"username": "airbyte",
"password": "password",
"schemas": ["public"]
}
}'
# Create a destination
curl -X POST http://localhost:8000/api/v1/destinations/create \
-H "Content-Type: application/json" \
-d '{
"workspaceId": "workspace-uuid",
"name": "BigQuery Warehouse",
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"connectionConfiguration": {
"project_id": "my-project",
"dataset_id": "raw_data",
"dataset_location": "US",
"credentials_json": "{...}"
}
}'
# Create a connection
curl -X POST http://localhost:8000/api/v1/connections/create \
-H "Content-Type: application/json" \
-d '{
"sourceId": "source-uuid",
"destinationId": "destination-uuid",
"name": "PG to BQ Sync",
"syncCatalog": {
"streams": [
{
"stream": {"name": "orders", "namespace": "public"},
"config": {
"syncMode": "incremental",
"destinationSyncMode": "append_dedup",
"cursorField": ["updated_at"],
"primaryKey": [["id"]]
}
}
]
},
"scheduleType": "cron",
"scheduleData": {
"cron": {"cronExpression": "0 */6 * * *", "cronTimeZone": "UTC"}
},
"status": "active"
}'
# Trigger manual sync
curl -X POST http://localhost:8000/api/v1/connections/sync \
-H "Content-Type: application/json" \
-d '{"connectionId": "connection-uuid"}'
# Get sync history
curl -X POST http://localhost:8000/api/v1/jobs/list \
-H "Content-Type: application/json" \
-d '{"configTypes": ["sync"], "configId": "connection-uuid"}'
# Check connection status
curl -X POST http://localhost:8000/api/v1/connections/get \
-H "Content-Type: application/json" \
-d '{"connectionId": "connection-uuid"}'
Popular Connectors
| Source | Destination |
|---|
| PostgreSQL | BigQuery |
| MySQL | Snowflake |
| MongoDB | Redshift |
| Salesforce | PostgreSQL |
| Stripe | Databricks |
| HubSpot | ClickHouse |
| Google Sheets | DuckDB |
| S3 (CSV/Parquet) | S3 (Parquet/JSON) |
| GitHub | BigQuery |
| Slack | PostgreSQL |
| Jira | Snowflake |
| Shopify | Redshift |
| Notion | PostgreSQL |
| Google Analytics | BigQuery |
| Facebook Ads | Snowflake |
Configuration
Environment Variables
# .env file for docker deployment
BASIC_AUTH_USERNAME=airbyte
BASIC_AUTH_PASSWORD=password
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
DATABASE_USER=airbyte
DATABASE_PASSWORD=airbyte
TRACKING_STRATEGY=segment
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001
CONNECTOR_BUILDER_SERVER_API_HOST=airbyte-connector-builder-server:80
WEBAPP_URL=http://localhost:8000
API_URL=/api/v1/
Normalization and dbt
# Airbyte integrates with dbt for transformations
# Basic normalization creates clean tables from raw JSON
# Custom dbt transformations can be configured:
operations:
- name: "Custom dbt transformation"
operatorType: dbt
dbt:
gitRepoUrl: "https://github.com/org/dbt-project.git"
gitRepoBranch: "main"
dockerImage: "fishtownanalytics/dbt:1.7.0"
dbtArguments: "run --select staging"
Advanced Usage
Custom Connector (CDK)
# Install Airbyte CDK
pip install airbyte-cdk
# Generate connector scaffold
airbyte-cdk scaffold source my-custom-api
cd source-my-custom-api
# source_my_custom_api/source.py
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
class MyApiStream(HttpStream):
url_base = "https://api.example.com/v1/"
primary_key = "id"
def __init__(self, api_key: str, **kwargs):
super().__init__(**kwargs)
self.api_key = api_key
def next_page_token(self, response):
data = response.json()
if data.get("has_more"):
return {"page": data["page"] + 1}
return None
def request_params(self, next_page_token=None, **kwargs):
params = {"per_page": 100}
if next_page_token:
params.update(next_page_token)
return params
def request_headers(self, **kwargs):
return {"Authorization": f"Bearer {self.api_key}"}
def parse_response(self, response, **kwargs):
yield from response.json()["data"]
def path(self, **kwargs):
return "records"
class SourceMyCustomApi(AbstractSource):
def check_connection(self, logger, config):
try:
stream = MyApiStream(api_key=config["api_key"])
next(stream.read_records(sync_mode="full_refresh"))
return True, None
except Exception as e:
return False, str(e)
def streams(self, config):
return [MyApiStream(api_key=config["api_key"])]
Low-Code Connector (YAML)
# source_my_api/manifest.yaml
version: "0.80.0"
definitions:
requester:
url_base: "https://api.example.com/v1"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
paginator:
type: DefaultPaginator
page_token_option:
type: RequestOption
inject_into: request_parameter
field_name: page
pagination_strategy:
type: PageIncrement
start_from_page: 1
retriever:
requester:
$ref: "#/definitions/requester"
paginator:
$ref: "#/definitions/paginator"
record_selector:
extractor:
field_path: ["data"]
streams:
- type: DeclarativeStream
name: users
primary_key: id
retriever:
$ref: "#/definitions/retriever"
requester:
$ref: "#/definitions/requester"
path: "/users"
check:
type: CheckStream
stream_names: ["users"]
Troubleshooting
| Issue | Solution |
|---|
| Sync failing with timeout | Increase SYNC_JOB_MAX_TIMEOUT_DAYS. Check source API rate limits |
| Connector OOM | Increase worker memory in Docker/K8s config. Reduce batch size |
| Schema changes breaking sync | Enable auto-detect schema changes. Reset connection if needed |
| CDC replication slot growing | Ensure sync runs regularly. Monitor pg_replication_slots |
| Slow initial sync | Use multiple streams in parallel. Enable staging for cloud destinations |
| Destination write errors | Check destination permissions. Verify schema compatibility |
| OAuth token expired | Re-authenticate in Airbyte UI. Check token refresh configuration |
| Docker disk space full | Prune old Docker images and volumes. Increase disk allocation |
| Temporal workflow failures | Check Temporal UI for stuck workflows. Restart Temporal service |
| Custom connector errors | Test locally with python main.py spec/check/discover/read. Check CDK version |