ETL Pipelines
ETL (Extract, Transform, Load) is the foundational pattern in data engineering. It describes the process of pulling data from source systems, applying transformations, and loading the results into a target system. While the concept is simple, building ETL pipelines that are reliable, scalable, and maintainable is one of the most challenging problems in the field.
ETL vs ELT
The classic ETL pattern transforms data before loading it into the target. The modern ELT pattern loads raw data first and transforms it inside the target system (typically a cloud data warehouse).
ETL (Extract → Transform → Load):┌──────────┐ ┌──────────────┐ ┌──────────────┐│ Source │───▶│ Transform │───▶│ Warehouse ││ Systems │ │ (Spark, etc)│ │ (Clean data)│└──────────┘ └──────────────┘ └──────────────┘
ELT (Extract → Load → Transform):┌──────────┐ ┌──────────────┐ ┌──────────────┐│ Source │───▶│ Warehouse │───▶│ Transform ││ Systems │ │ (Raw data) │ │ (dbt, SQL) │└──────────┘ └──────────────┘ └──────────────┘Comparison
| Aspect | ETL | ELT |
|---|---|---|
| Transform location | External compute (Spark, Python) | Inside the warehouse (SQL, dbt) |
| Data in warehouse | Only transformed data | Raw + transformed data |
| Flexibility | Must redesign pipeline for new transforms | Re-transform raw data anytime |
| Cost model | Pay for external compute | Pay for warehouse compute |
| Latency | Higher — transform before load | Lower — load first, transform later |
| Best for | Complex non-SQL transforms, legacy systems | Cloud warehouses, SQL-centric teams |
| Data retention | Raw data may not be preserved | Raw data always available |
Pipeline Design Patterns
Pattern 1: Full Load
The simplest approach — extract all data from the source and replace the target table entirely.
Source Table (10M rows) │ ▼ Extract ALL rowsStaging Table │ ▼ TRUNCATE + INSERTTarget Table (10M rows, fully replaced)Pros: Simple, guarantees target matches source Cons: Slow and expensive for large tables; no historical tracking
Pattern 2: Incremental Load
Extract only new or changed records since the last run.
Source Table (10M rows total, 5K new since last run) │ ▼ Extract WHERE updated_at > last_run_timestampStaging Table (5K rows) │ ▼ MERGE / UPSERT into targetTarget Table (10M rows, 5K updated)Pros: Much faster and cheaper than full load Cons: Requires a reliable change-tracking column; deletes are hard to detect
Pattern 3: Change Data Capture (CDC)
Capture every change (INSERT, UPDATE, DELETE) from the source database log.
Source Database │ ▼ Read transaction log (binlog / WAL)CDC Tool (Debezium, AWS DMS) │ ▼ Stream change eventsMessage Queue (Kafka) │ ▼ Apply changesTarget TablePros: Captures all changes including deletes; near real-time; minimal source impact Cons: Complex setup; requires access to database logs; schema changes need handling
Pattern 4: Slowly Changing Dimensions (SCD)
Track historical changes to dimension data over time.
| SCD Type | Strategy | Example |
|---|---|---|
| Type 1 | Overwrite the old value | Customer address updated in place |
| Type 2 | Add a new row with versioning | New row with effective_date and expiry_date columns |
| Type 3 | Add a column for the old value | current_address and previous_address columns |
import pandas as pdfrom datetime import datetime
def apply_scd_type_2(existing_df: pd.DataFrame, incoming_df: pd.DataFrame, key_col: str) -> pd.DataFrame: """Apply SCD Type 2 logic — keep history of changes.""" now = datetime.utcnow()
# Find changed records merged = existing_df.merge( incoming_df, on=key_col, suffixes=("_old", "_new") )
# Identify rows where values changed change_cols = [c for c in incoming_df.columns if c != key_col] changed_mask = False for col in change_cols: changed_mask = changed_mask | ( merged[f"{col}_old"] != merged[f"{col}_new"] )
changed_keys = merged.loc[changed_mask, key_col]
# Close old records (set expiry date) existing_df.loc[ (existing_df[key_col].isin(changed_keys)) & (existing_df["is_current"] == True), ["expiry_date", "is_current"] ] = [now, False]
# Create new current records new_records = incoming_df[incoming_df[key_col].isin(changed_keys)].copy() new_records["effective_date"] = now new_records["expiry_date"] = None new_records["is_current"] = True
# Handle truly new records (not in existing data) new_keys = set(incoming_df[key_col]) - set(existing_df[key_col]) brand_new = incoming_df[incoming_df[key_col].isin(new_keys)].copy() brand_new["effective_date"] = now brand_new["expiry_date"] = None brand_new["is_current"] = True
return pd.concat([existing_df, new_records, brand_new], ignore_index=True)function applyScdType2(existingRows, incomingRows, keyCol) { /** Apply SCD Type 2 logic — keep history of changes. */ const now = new Date(); const existingMap = new Map(); existingRows.forEach((row) => existingMap.set(row[keyCol], row));
const result = [...existingRows]; const newRecords = [];
for (const incoming of incomingRows) { const key = incoming[keyCol]; const existing = existingMap.get(key);
if (!existing) { // Brand new record newRecords.push({ ...incoming, effective_date: now, expiry_date: null, is_current: true, }); } else if (hasChanged(existing, incoming, keyCol)) { // Close the old record existing.expiry_date = now; existing.is_current = false;
// Add a new current record newRecords.push({ ...incoming, effective_date: now, expiry_date: null, is_current: true, }); } }
return result.concat(newRecords);}
function hasChanged(existing, incoming, keyCol) { return Object.keys(incoming).some( (col) => col !== keyCol && existing[col] !== incoming[col] );}Apache Airflow
Apache Airflow is the most widely used open-source orchestration platform for data pipelines. It allows you to define workflows as Directed Acyclic Graphs (DAGs) in Python code.
Core Concepts
┌─────────────────────────────────────────────────────────┐│ Airflow Architecture ││ ││ ┌───────────┐ ┌────────────┐ ┌──────────────┐ ││ │ Scheduler │──▶│ Executor │──▶│ Workers │ ││ │ │ │ │ │ (run tasks) │ ││ └───────────┘ └────────────┘ └──────────────┘ ││ │ ││ ▼ ││ ┌───────────┐ ┌────────────┐ ││ │ Metadata │ │ Web UI │ ││ │ DB │ │ (monitor) │ ││ └───────────┘ └────────────┘ │└─────────────────────────────────────────────────────────┘| Concept | Description |
|---|---|
| DAG | A Directed Acyclic Graph that defines the workflow structure and dependencies |
| Task | A single unit of work within a DAG (e.g., run a query, call an API) |
| Operator | A template for a task (PythonOperator, BashOperator, SQLOperator) |
| Sensor | A special operator that waits for an external condition to be met |
| XCom | Cross-communication mechanism for passing small data between tasks |
| Connection | Stored credentials for external systems (databases, APIs, cloud) |
| Variable | Global configuration values accessible from any DAG |
Writing an Airflow DAG
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.postgres.operators.postgres import PostgresOperatorfrom airflow.sensors.external_task import ExternalTaskSensor
default_args = { "owner": "data-team", "depends_on_past": False, "email_on_failure": True, "email": ["data-alerts@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
with DAG( dag_id="daily_orders_pipeline", default_args=default_args, description="Extract orders, transform, and load into warehouse", schedule_interval="0 2 * * *", # Run at 2 AM daily start_date=datetime(2024, 1, 1), catchup=False, tags=["orders", "warehouse"],) as dag:
def extract_orders(**context): """Extract orders from the source API.""" import requests execution_date = context["ds"] response = requests.get( f"https://api.example.com/orders?date={execution_date}", timeout=60, ) response.raise_for_status() orders = response.json() # Push to XCom for downstream tasks context["ti"].xcom_push(key="order_count", value=len(orders)) return orders
def validate_data(**context): """Validate extracted data meets quality expectations.""" order_count = context["ti"].xcom_pull( task_ids="extract", key="order_count" ) if order_count == 0: raise ValueError( f"No orders extracted for {context['ds']}" )
extract = PythonOperator( task_id="extract", python_callable=extract_orders, )
validate = PythonOperator( task_id="validate", python_callable=validate_data, )
transform_load = PostgresOperator( task_id="transform_and_load", postgres_conn_id="warehouse", sql="sql/transform_orders.sql", parameters={"execution_date": "{{ ds }}"}, )
update_aggregates = PostgresOperator( task_id="update_aggregates", postgres_conn_id="warehouse", sql="sql/update_daily_aggregates.sql", )
# Define task dependencies extract >> validate >> transform_load >> update_aggregatesAirflow Best Practices
| Practice | Why |
|---|---|
| Keep DAG files lightweight | DAG parsing happens every few seconds |
Use catchup=False for new DAGs | Prevents accidental historical backfills |
Set meaningful retries and retry_delay | Transient failures are common |
| Use task groups for organization | Improves readability in the web UI |
| Parameterize with Jinja templates | Use {{ ds }}, {{ execution_date }} for date logic |
| Test DAGs locally before deploying | Use airflow dags test command |
dbt (Data Build Tool)
dbt is the standard tool for the “T” in ELT. It allows data teams to write modular SQL transformations that are version-controlled, tested, and documented.
How dbt Works
┌───────────────────────────────────────────────────────┐│ dbt Workflow ││ ││ 1. Write SQL models (SELECT statements) ││ 2. Define dependencies between models ││ 3. dbt compiles SQL and runs it in the warehouse ││ 4. dbt runs tests to validate the output ││ 5. dbt generates documentation automatically ││ ││ models/ ││ ├── staging/ ││ │ ├── stg_orders.sql ││ │ └── stg_customers.sql ││ ├── intermediate/ ││ │ └── int_order_items.sql ││ └── marts/ ││ ├── fct_orders.sql ││ └── dim_customers.sql │└───────────────────────────────────────────────────────┘dbt Model Examples
-- models/staging/stg_orders.sql-- Staging models clean and standardize raw source data
WITH source AS ( SELECT * FROM {{ source('raw', 'orders') }}),
renamed AS ( SELECT id AS order_id, user_id AS customer_id, LOWER(status) AS order_status, amount_cents / 100.0 AS order_amount, created_at::timestamp AS ordered_at, updated_at::timestamp AS last_updated_at FROM source WHERE id IS NOT NULL)
SELECT * FROM renamed-- models/marts/fct_orders.sql-- Fact models join staging models to create analytical datasets
{{ config( materialized='incremental', unique_key='order_id', partition_by={ "field": "ordered_at", "data_type": "timestamp", "granularity": "day" } )}}
WITH orders AS ( SELECT * FROM {{ ref('stg_orders') }}),
customers AS ( SELECT * FROM {{ ref('dim_customers') }}),
order_items AS ( SELECT * FROM {{ ref('int_order_items') }})
SELECT o.order_id, o.customer_id, c.customer_name, c.customer_segment, o.order_status, o.order_amount, oi.total_items, oi.total_quantity, o.ordered_at, o.last_updated_atFROM orders oLEFT JOIN customers c ON o.customer_id = c.customer_idLEFT JOIN order_items oi ON o.order_id = oi.order_id
{% if is_incremental() %}WHERE o.last_updated_at > (SELECT MAX(last_updated_at) FROM {{ this }}){% endif %}version: 2
models: - name: fct_orders description: "One row per order with customer and item details" columns: - name: order_id description: "Primary key" tests: - unique - not_null - name: customer_id description: "Foreign key to dim_customers" tests: - not_null - relationships: to: ref('dim_customers') field: customer_id - name: order_amount description: "Total order amount in dollars" tests: - not_null - dbt_utils.accepted_range: min_value: 0 max_value: 100000 - name: order_status tests: - accepted_values: values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']Orchestration and Scheduling
Scheduling Strategies
| Strategy | Cron Expression | Use Case |
|---|---|---|
| Daily at 2 AM | 0 2 * * * | Most batch pipelines |
| Hourly | 0 * * * * | Near-real-time reporting |
| Every 15 minutes | */15 * * * * | Frequent data refresh |
| Weekdays at 6 AM | 0 6 * * 1-5 | Business-hours reporting |
| First of month | 0 0 1 * * | Monthly aggregation |
| Event-driven | N/A (trigger-based) | Respond to upstream events |
Orchestration Tools Compared
| Feature | Airflow | Dagster | Prefect | Mage |
|---|---|---|---|---|
| Language | Python | Python | Python | Python |
| Scheduling | Cron-based | Cron + sensors | Cron + events | Cron + events |
| UI | Functional | Modern | Modern | Modern |
| Testing | Manual | Built-in | Built-in | Built-in |
| Data awareness | Low (task-centric) | High (asset-centric) | Medium | High |
| Learning curve | Moderate | Moderate | Low | Low |
| Community | Very large | Growing | Growing | Growing |
| Best for | Complex workflows | Data-asset pipelines | Simple orchestration | Interactive pipelines |
Pipeline Reliability Patterns
Building pipelines that run correctly once is easy. Building pipelines that run correctly every time requires deliberate design.
Idempotency
An idempotent pipeline produces the same result whether it runs once or multiple times for the same input.
# NON-IDEMPOTENT: Running twice doubles the datadef load_orders_bad(df, engine): df.to_sql("orders", engine, if_exists="append", index=False)
# IDEMPOTENT: Running twice produces the same resultdef load_orders_good(df, engine, execution_date): with engine.begin() as conn: # Delete existing data for this date partition conn.execute( text("DELETE FROM orders WHERE order_date = :dt"), {"dt": execution_date}, ) # Insert fresh data df.to_sql("orders", conn, if_exists="append", index=False)// NON-IDEMPOTENT: Running twice doubles the dataasync function loadOrdersBad(data, db) { await db("orders").insert(data);}
// IDEMPOTENT: Running twice produces the same resultasync function loadOrdersGood(data, db, executionDate) { await db.transaction(async (trx) => { // Delete existing data for this date partition await trx("orders").where("order_date", executionDate).del(); // Insert fresh data await trx("orders").insert(data); });}Retry and Dead Letter Queues
┌──────────┐ ┌──────────┐ ┌──────────┐│ Source │────▶│ Pipeline │────▶│ Target │└──────────┘ └────┬─────┘ └──────────┘ │ On failure: │ ┌───────▼────────┐ │ Retry (3x with │ │ exp. backoff) │ └───────┬────────┘ │ Still failing: │ ┌───────▼────────┐ │ Dead Letter │ │ Queue (DLQ) │ │ for manual │ │ investigation │ └────────────────┘Key Reliability Principles
| Principle | Description |
|---|---|
| Idempotency | Same input always produces same output, regardless of run count |
| Atomicity | Pipeline steps succeed or fail as a unit — no partial writes |
| Retry with backoff | Transient failures are retried with exponential delay |
| Dead letter queues | Failed records are captured for investigation, not silently dropped |
| Data validation | Validate data at each stage — row counts, schema, null checks |
| Alerting | Notify the team immediately when a pipeline fails or data quality drops |
Quick Reference: Pipeline Checklist
| Area | Question |
|---|---|
| Extraction | Is the extract incremental or full? How do you detect changes? |
| Transformation | Are transforms in SQL (ELT) or code (ETL)? Are they idempotent? |
| Loading | Is the load atomic? What happens on partial failure? |
| Scheduling | What is the SLA? What happens if a run is delayed? |
| Monitoring | Are you alerted on failure? Do you track row counts and freshness? |
| Recovery | Can you rerun a failed pipeline safely? Is backfill supported? |
| Documentation | Is the pipeline documented? Are data contracts defined? |