Skip to content

ETL Pipelines

ETL (Extract, Transform, Load) is the foundational pattern in data engineering. It describes the process of pulling data from source systems, applying transformations, and loading the results into a target system. While the concept is simple, building ETL pipelines that are reliable, scalable, and maintainable is one of the most challenging problems in the field.


ETL vs ELT

The classic ETL pattern transforms data before loading it into the target. The modern ELT pattern loads raw data first and transforms it inside the target system (typically a cloud data warehouse).

ETL (Extract → Transform → Load):
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Transform │───▶│ Warehouse │
│ Systems │ │ (Spark, etc)│ │ (Clean data)│
└──────────┘ └──────────────┘ └──────────────┘
ELT (Extract → Load → Transform):
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Warehouse │───▶│ Transform │
│ Systems │ │ (Raw data) │ │ (dbt, SQL) │
└──────────┘ └──────────────┘ └──────────────┘

Comparison

AspectETLELT
Transform locationExternal compute (Spark, Python)Inside the warehouse (SQL, dbt)
Data in warehouseOnly transformed dataRaw + transformed data
FlexibilityMust redesign pipeline for new transformsRe-transform raw data anytime
Cost modelPay for external computePay for warehouse compute
LatencyHigher — transform before loadLower — load first, transform later
Best forComplex non-SQL transforms, legacy systemsCloud warehouses, SQL-centric teams
Data retentionRaw data may not be preservedRaw data always available

Pipeline Design Patterns

Pattern 1: Full Load

The simplest approach — extract all data from the source and replace the target table entirely.

Source Table (10M rows)
▼ Extract ALL rows
Staging Table
▼ TRUNCATE + INSERT
Target Table (10M rows, fully replaced)

Pros: Simple, guarantees target matches source Cons: Slow and expensive for large tables; no historical tracking

Pattern 2: Incremental Load

Extract only new or changed records since the last run.

Source Table (10M rows total, 5K new since last run)
▼ Extract WHERE updated_at > last_run_timestamp
Staging Table (5K rows)
▼ MERGE / UPSERT into target
Target Table (10M rows, 5K updated)

Pros: Much faster and cheaper than full load Cons: Requires a reliable change-tracking column; deletes are hard to detect

Pattern 3: Change Data Capture (CDC)

Capture every change (INSERT, UPDATE, DELETE) from the source database log.

Source Database
▼ Read transaction log (binlog / WAL)
CDC Tool (Debezium, AWS DMS)
▼ Stream change events
Message Queue (Kafka)
▼ Apply changes
Target Table

Pros: Captures all changes including deletes; near real-time; minimal source impact Cons: Complex setup; requires access to database logs; schema changes need handling

Pattern 4: Slowly Changing Dimensions (SCD)

Track historical changes to dimension data over time.

SCD TypeStrategyExample
Type 1Overwrite the old valueCustomer address updated in place
Type 2Add a new row with versioningNew row with effective_date and expiry_date columns
Type 3Add a column for the old valuecurrent_address and previous_address columns
import pandas as pd
from datetime import datetime
def apply_scd_type_2(existing_df: pd.DataFrame,
incoming_df: pd.DataFrame,
key_col: str) -> pd.DataFrame:
"""Apply SCD Type 2 logic — keep history of changes."""
now = datetime.utcnow()
# Find changed records
merged = existing_df.merge(
incoming_df, on=key_col, suffixes=("_old", "_new")
)
# Identify rows where values changed
change_cols = [c for c in incoming_df.columns if c != key_col]
changed_mask = False
for col in change_cols:
changed_mask = changed_mask | (
merged[f"{col}_old"] != merged[f"{col}_new"]
)
changed_keys = merged.loc[changed_mask, key_col]
# Close old records (set expiry date)
existing_df.loc[
(existing_df[key_col].isin(changed_keys)) &
(existing_df["is_current"] == True),
["expiry_date", "is_current"]
] = [now, False]
# Create new current records
new_records = incoming_df[incoming_df[key_col].isin(changed_keys)].copy()
new_records["effective_date"] = now
new_records["expiry_date"] = None
new_records["is_current"] = True
# Handle truly new records (not in existing data)
new_keys = set(incoming_df[key_col]) - set(existing_df[key_col])
brand_new = incoming_df[incoming_df[key_col].isin(new_keys)].copy()
brand_new["effective_date"] = now
brand_new["expiry_date"] = None
brand_new["is_current"] = True
return pd.concat([existing_df, new_records, brand_new], ignore_index=True)

Apache Airflow

Apache Airflow is the most widely used open-source orchestration platform for data pipelines. It allows you to define workflows as Directed Acyclic Graphs (DAGs) in Python code.

Core Concepts

┌─────────────────────────────────────────────────────────┐
│ Airflow Architecture │
│ │
│ ┌───────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Scheduler │──▶│ Executor │──▶│ Workers │ │
│ │ │ │ │ │ (run tasks) │ │
│ └───────────┘ └────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ ┌────────────┐ │
│ │ Metadata │ │ Web UI │ │
│ │ DB │ │ (monitor) │ │
│ └───────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────┘
ConceptDescription
DAGA Directed Acyclic Graph that defines the workflow structure and dependencies
TaskA single unit of work within a DAG (e.g., run a query, call an API)
OperatorA template for a task (PythonOperator, BashOperator, SQLOperator)
SensorA special operator that waits for an external condition to be met
XComCross-communication mechanism for passing small data between tasks
ConnectionStored credentials for external systems (databases, APIs, cloud)
VariableGlobal configuration values accessible from any DAG

Writing an Airflow DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.external_task import ExternalTaskSensor
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-alerts@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="daily_orders_pipeline",
default_args=default_args,
description="Extract orders, transform, and load into warehouse",
schedule_interval="0 2 * * *", # Run at 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["orders", "warehouse"],
) as dag:
def extract_orders(**context):
"""Extract orders from the source API."""
import requests
execution_date = context["ds"]
response = requests.get(
f"https://api.example.com/orders?date={execution_date}",
timeout=60,
)
response.raise_for_status()
orders = response.json()
# Push to XCom for downstream tasks
context["ti"].xcom_push(key="order_count", value=len(orders))
return orders
def validate_data(**context):
"""Validate extracted data meets quality expectations."""
order_count = context["ti"].xcom_pull(
task_ids="extract", key="order_count"
)
if order_count == 0:
raise ValueError(
f"No orders extracted for {context['ds']}"
)
extract = PythonOperator(
task_id="extract",
python_callable=extract_orders,
)
validate = PythonOperator(
task_id="validate",
python_callable=validate_data,
)
transform_load = PostgresOperator(
task_id="transform_and_load",
postgres_conn_id="warehouse",
sql="sql/transform_orders.sql",
parameters={"execution_date": "{{ ds }}"},
)
update_aggregates = PostgresOperator(
task_id="update_aggregates",
postgres_conn_id="warehouse",
sql="sql/update_daily_aggregates.sql",
)
# Define task dependencies
extract >> validate >> transform_load >> update_aggregates

Airflow Best Practices

PracticeWhy
Keep DAG files lightweightDAG parsing happens every few seconds
Use catchup=False for new DAGsPrevents accidental historical backfills
Set meaningful retries and retry_delayTransient failures are common
Use task groups for organizationImproves readability in the web UI
Parameterize with Jinja templatesUse {{ ds }}, {{ execution_date }} for date logic
Test DAGs locally before deployingUse airflow dags test command

dbt (Data Build Tool)

dbt is the standard tool for the “T” in ELT. It allows data teams to write modular SQL transformations that are version-controlled, tested, and documented.

How dbt Works

┌───────────────────────────────────────────────────────┐
│ dbt Workflow │
│ │
│ 1. Write SQL models (SELECT statements) │
│ 2. Define dependencies between models │
│ 3. dbt compiles SQL and runs it in the warehouse │
│ 4. dbt runs tests to validate the output │
│ 5. dbt generates documentation automatically │
│ │
│ models/ │
│ ├── staging/ │
│ │ ├── stg_orders.sql │
│ │ └── stg_customers.sql │
│ ├── intermediate/ │
│ │ └── int_order_items.sql │
│ └── marts/ │
│ ├── fct_orders.sql │
│ └── dim_customers.sql │
└───────────────────────────────────────────────────────┘

dbt Model Examples

-- models/staging/stg_orders.sql
-- Staging models clean and standardize raw source data
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
user_id AS customer_id,
LOWER(status) AS order_status,
amount_cents / 100.0 AS order_amount,
created_at::timestamp AS ordered_at,
updated_at::timestamp AS last_updated_at
FROM source
WHERE id IS NOT NULL
)
SELECT * FROM renamed

Orchestration and Scheduling

Scheduling Strategies

StrategyCron ExpressionUse Case
Daily at 2 AM0 2 * * *Most batch pipelines
Hourly0 * * * *Near-real-time reporting
Every 15 minutes*/15 * * * *Frequent data refresh
Weekdays at 6 AM0 6 * * 1-5Business-hours reporting
First of month0 0 1 * *Monthly aggregation
Event-drivenN/A (trigger-based)Respond to upstream events

Orchestration Tools Compared

FeatureAirflowDagsterPrefectMage
LanguagePythonPythonPythonPython
SchedulingCron-basedCron + sensorsCron + eventsCron + events
UIFunctionalModernModernModern
TestingManualBuilt-inBuilt-inBuilt-in
Data awarenessLow (task-centric)High (asset-centric)MediumHigh
Learning curveModerateModerateLowLow
CommunityVery largeGrowingGrowingGrowing
Best forComplex workflowsData-asset pipelinesSimple orchestrationInteractive pipelines

Pipeline Reliability Patterns

Building pipelines that run correctly once is easy. Building pipelines that run correctly every time requires deliberate design.

Idempotency

An idempotent pipeline produces the same result whether it runs once or multiple times for the same input.

# NON-IDEMPOTENT: Running twice doubles the data
def load_orders_bad(df, engine):
df.to_sql("orders", engine, if_exists="append", index=False)
# IDEMPOTENT: Running twice produces the same result
def load_orders_good(df, engine, execution_date):
with engine.begin() as conn:
# Delete existing data for this date partition
conn.execute(
text("DELETE FROM orders WHERE order_date = :dt"),
{"dt": execution_date},
)
# Insert fresh data
df.to_sql("orders", conn, if_exists="append", index=False)

Retry and Dead Letter Queues

┌──────────┐ ┌──────────┐ ┌──────────┐
│ Source │────▶│ Pipeline │────▶│ Target │
└──────────┘ └────┬─────┘ └──────────┘
On failure:
┌───────▼────────┐
│ Retry (3x with │
│ exp. backoff) │
└───────┬────────┘
Still failing:
┌───────▼────────┐
│ Dead Letter │
│ Queue (DLQ) │
│ for manual │
│ investigation │
└────────────────┘

Key Reliability Principles

PrincipleDescription
IdempotencySame input always produces same output, regardless of run count
AtomicityPipeline steps succeed or fail as a unit — no partial writes
Retry with backoffTransient failures are retried with exponential delay
Dead letter queuesFailed records are captured for investigation, not silently dropped
Data validationValidate data at each stage — row counts, schema, null checks
AlertingNotify the team immediately when a pipeline fails or data quality drops

Quick Reference: Pipeline Checklist

AreaQuestion
ExtractionIs the extract incremental or full? How do you detect changes?
TransformationAre transforms in SQL (ELT) or code (ETL)? Are they idempotent?
LoadingIs the load atomic? What happens on partial failure?
SchedulingWhat is the SLA? What happens if a run is delayed?
MonitoringAre you alerted on failure? Do you track row counts and freshness?
RecoveryCan you rerun a failed pipeline safely? Is backfill supported?
DocumentationIs the pipeline documented? Are data contracts defined?

Next Steps