Skip to content

Airbyte Cheat Sheet

Overview

Airbyte is an open-source data integration platform that helps teams replicate data from sources to destinations. With over 300 pre-built connectors, Airbyte handles the Extract and Load (EL) part of the ELT pipeline, supporting databases, APIs, SaaS applications, files, and event streams. Users can also build custom connectors using the Connector Development Kit (CDK) in Python or low-code YAML.

Airbyte provides incremental sync modes for efficient data replication, schema change handling, change data capture, transformation capabilities via dbt integration, and a comprehensive API for programmatic control. It can be deployed as a self-hosted open-source platform, via Airbyte Cloud (managed), or embedded in other products. The platform handles rate limiting, pagination, authentication, and error handling for each connector.

Installation

Docker Compose (Self-Hosted)

# Clone Airbyte
git clone --depth=1 https://github.com/airbytehq/airbyte.git
cd airbyte

# Start Airbyte
./run-ab-platform.sh

# Access UI at http://localhost:8000
# Default credentials: airbyte / password

# Or using Docker directly
docker compose up -d

Kubernetes with Helm

# Add Airbyte Helm repo
helm repo add airbyte https://airbytehq.github.io/helm-charts
helm repo update

# Install Airbyte
helm install airbyte airbyte/airbyte \
    --namespace airbyte \
    --create-namespace \
    --values values.yaml

Airbyte CLI (octavia)

# Install Airbyte CLI
pip install octavia-cli

# Or use the official CLI
brew install airbytehq/tap/abctl  # macOS
curl -fsSL https://get.airbyte.com | bash  # Linux

# Start local Airbyte
abctl local install

# Check status
abctl local status

Core Concepts

Connection Configuration

# Example: PostgreSQL source to BigQuery destination
source:
  sourceType: postgres
  config:
    host: db.example.com
    port: 5432
    database: production
    username: airbyte_reader
    password: ${POSTGRES_PASSWORD}
    schemas:
      - public
      - analytics
    ssl_mode:
      mode: require
    replication_method:
      method: CDC
      publication: airbyte_publication
      replication_slot: airbyte_slot

destination:
  destinationType: bigquery
  config:
    project_id: my-gcp-project
    dataset_id: raw_data
    dataset_location: US
    credentials_json: ${GCP_CREDENTIALS_JSON}
    loading_method:
      method: GCS Staging
      gcs_bucket_name: airbyte-staging
      gcs_bucket_path: staging
      credential:
        credential_type: HMAC_KEY
        hmac_key_access_id: ${GCS_HMAC_KEY}
        hmac_key_secret: ${GCS_HMAC_SECRET}

Sync Modes

ModeDescriptionUse Case
Full Refresh - OverwriteReplace destination data completelySmall reference tables
Full Refresh - AppendAppend all source data each syncAudit/history tables
Incremental - AppendOnly sync new/changed recordsEvent logs, append-only
Incremental - Append + DedupedSync changes, dedupe at destinationSCD Type 1 (latest state)
CDCCapture inserts, updates, deletes via WALReal-time replication

API Reference

REST API

# Base URL: http://localhost:8000/api/v1

# List workspaces
curl http://localhost:8000/api/v1/workspaces/list \
    -H "Content-Type: application/json" \
    -d '{}'

# Create a source
curl -X POST http://localhost:8000/api/v1/sources/create \
    -H "Content-Type: application/json" \
    -d '{
        "workspaceId": "workspace-uuid",
        "name": "Production PostgreSQL",
        "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
        "connectionConfiguration": {
            "host": "db.example.com",
            "port": 5432,
            "database": "production",
            "username": "airbyte",
            "password": "password",
            "schemas": ["public"]
        }
    }'

# Create a destination
curl -X POST http://localhost:8000/api/v1/destinations/create \
    -H "Content-Type: application/json" \
    -d '{
        "workspaceId": "workspace-uuid",
        "name": "BigQuery Warehouse",
        "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
        "connectionConfiguration": {
            "project_id": "my-project",
            "dataset_id": "raw_data",
            "dataset_location": "US",
            "credentials_json": "{...}"
        }
    }'

# Create a connection
curl -X POST http://localhost:8000/api/v1/connections/create \
    -H "Content-Type: application/json" \
    -d '{
        "sourceId": "source-uuid",
        "destinationId": "destination-uuid",
        "name": "PG to BQ Sync",
        "syncCatalog": {
            "streams": [
                {
                    "stream": {"name": "orders", "namespace": "public"},
                    "config": {
                        "syncMode": "incremental",
                        "destinationSyncMode": "append_dedup",
                        "cursorField": ["updated_at"],
                        "primaryKey": [["id"]]
                    }
                }
            ]
        },
        "scheduleType": "cron",
        "scheduleData": {
            "cron": {"cronExpression": "0 */6 * * *", "cronTimeZone": "UTC"}
        },
        "status": "active"
    }'

# Trigger manual sync
curl -X POST http://localhost:8000/api/v1/connections/sync \
    -H "Content-Type: application/json" \
    -d '{"connectionId": "connection-uuid"}'

# Get sync history
curl -X POST http://localhost:8000/api/v1/jobs/list \
    -H "Content-Type: application/json" \
    -d '{"configTypes": ["sync"], "configId": "connection-uuid"}'

# Check connection status
curl -X POST http://localhost:8000/api/v1/connections/get \
    -H "Content-Type: application/json" \
    -d '{"connectionId": "connection-uuid"}'
SourceDestination
PostgreSQLBigQuery
MySQLSnowflake
MongoDBRedshift
SalesforcePostgreSQL
StripeDatabricks
HubSpotClickHouse
Google SheetsDuckDB
S3 (CSV/Parquet)S3 (Parquet/JSON)
GitHubBigQuery
SlackPostgreSQL
JiraSnowflake
ShopifyRedshift
NotionPostgreSQL
Google AnalyticsBigQuery
Facebook AdsSnowflake

Configuration

Environment Variables

# .env file for docker deployment
BASIC_AUTH_USERNAME=airbyte
BASIC_AUTH_PASSWORD=password
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
DATABASE_USER=airbyte
DATABASE_PASSWORD=airbyte
TRACKING_STRATEGY=segment
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001
CONNECTOR_BUILDER_SERVER_API_HOST=airbyte-connector-builder-server:80
WEBAPP_URL=http://localhost:8000
API_URL=/api/v1/

Normalization and dbt

# Airbyte integrates with dbt for transformations
# Basic normalization creates clean tables from raw JSON
# Custom dbt transformations can be configured:

operations:
  - name: "Custom dbt transformation"
    operatorType: dbt
    dbt:
      gitRepoUrl: "https://github.com/org/dbt-project.git"
      gitRepoBranch: "main"
      dockerImage: "fishtownanalytics/dbt:1.7.0"
      dbtArguments: "run --select staging"

Advanced Usage

Custom Connector (CDK)

# Install Airbyte CDK
pip install airbyte-cdk

# Generate connector scaffold
airbyte-cdk scaffold source my-custom-api
cd source-my-custom-api
# source_my_custom_api/source.py
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream

class MyApiStream(HttpStream):
    url_base = "https://api.example.com/v1/"
    primary_key = "id"

    def __init__(self, api_key: str, **kwargs):
        super().__init__(**kwargs)
        self.api_key = api_key

    def next_page_token(self, response):
        data = response.json()
        if data.get("has_more"):
            return {"page": data["page"] + 1}
        return None

    def request_params(self, next_page_token=None, **kwargs):
        params = {"per_page": 100}
        if next_page_token:
            params.update(next_page_token)
        return params

    def request_headers(self, **kwargs):
        return {"Authorization": f"Bearer {self.api_key}"}

    def parse_response(self, response, **kwargs):
        yield from response.json()["data"]

    def path(self, **kwargs):
        return "records"

class SourceMyCustomApi(AbstractSource):
    def check_connection(self, logger, config):
        try:
            stream = MyApiStream(api_key=config["api_key"])
            next(stream.read_records(sync_mode="full_refresh"))
            return True, None
        except Exception as e:
            return False, str(e)

    def streams(self, config):
        return [MyApiStream(api_key=config["api_key"])]

Low-Code Connector (YAML)

# source_my_api/manifest.yaml
version: "0.80.0"
definitions:
  requester:
    url_base: "https://api.example.com/v1"
    http_method: "GET"
    authenticator:
      type: BearerAuthenticator
      api_token: "{{ config['api_key'] }}"
  paginator:
    type: DefaultPaginator
    page_token_option:
      type: RequestOption
      inject_into: request_parameter
      field_name: page
    pagination_strategy:
      type: PageIncrement
      start_from_page: 1
  retriever:
    requester:
      $ref: "#/definitions/requester"
    paginator:
      $ref: "#/definitions/paginator"
    record_selector:
      extractor:
        field_path: ["data"]

streams:
  - type: DeclarativeStream
    name: users
    primary_key: id
    retriever:
      $ref: "#/definitions/retriever"
      requester:
        $ref: "#/definitions/requester"
        path: "/users"

check:
  type: CheckStream
  stream_names: ["users"]

Troubleshooting

IssueSolution
Sync failing with timeoutIncrease SYNC_JOB_MAX_TIMEOUT_DAYS. Check source API rate limits
Connector OOMIncrease worker memory in Docker/K8s config. Reduce batch size
Schema changes breaking syncEnable auto-detect schema changes. Reset connection if needed
CDC replication slot growingEnsure sync runs regularly. Monitor pg_replication_slots
Slow initial syncUse multiple streams in parallel. Enable staging for cloud destinations
Destination write errorsCheck destination permissions. Verify schema compatibility
OAuth token expiredRe-authenticate in Airbyte UI. Check token refresh configuration
Docker disk space fullPrune old Docker images and volumes. Increase disk allocation
Temporal workflow failuresCheck Temporal UI for stuck workflows. Restart Temporal service
Custom connector errorsTest locally with python main.py spec/check/discover/read. Check CDK version