Skip to content

Data Quality

Data quality is the measure of how well data serves its intended purpose. A pipeline that delivers data on time but fills a dashboard with incorrect numbers is worse than no pipeline at all — it creates confident bad decisions. Data quality is not a one-time check; it is a continuous discipline woven into every stage of the data lifecycle.


Dimensions of Data Quality

Data quality is not a single metric — it is a multi-dimensional concept. Each dimension captures a different aspect of “good” data.

DimensionDefinitionExample Check
AccuracyData correctly represents the real-world entityCustomer email matches the actual email address
CompletenessRequired fields are populated, no unexpected NULLs99% of orders have a shipping_address
ConsistencySame data does not contradict itself across systemsOrder count in source matches order count in warehouse
TimelinessData is available when expectedDaily sales report is ready by 7 AM
UniquenessNo unintended duplicatesEach order_id appears exactly once
ValidityData conforms to defined rules and formatsEmail contains @, age is between 0 and 150
┌──────────────────────────────────────────────────────────┐
│ Data Quality Dimensions │
│ │
│ ┌──────────┐ ┌────────────┐ ┌─────────────┐ │
│ │ Accuracy │ │Completeness│ │ Consistency │ │
│ │ │ │ │ │ │ │
│ │ Is it │ │ Is it all │ │ Does it │ │
│ │ correct? │ │ there? │ │ agree across │ │
│ │ │ │ │ │ systems? │ │
│ └──────────┘ └────────────┘ └─────────────┘ │
│ │
│ ┌──────────┐ ┌────────────┐ ┌─────────────┐ │
│ │Timeliness│ │ Uniqueness │ │ Validity │ │
│ │ │ │ │ │ │ │
│ │ Is it on │ │ Is it free │ │ Does it │ │
│ │ time? │ │ of dupes? │ │ follow the │ │
│ │ │ │ │ │ rules? │ │
│ └──────────┘ └────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────┘

Data Validation

Data validation is the practice of programmatically checking data at each stage of the pipeline to catch issues before they propagate downstream.

Where to Validate

┌──────────┐ Validate ┌───────────┐ Validate ┌──────────┐ Validate ┌──────────┐
│ Source │────(1)──────▶│ Staging │────(2)──────▶│Transform │────(3)──────▶│ Serve │
└──────────┘ └───────────┘ └──────────┘ └──────────┘
(1) Source validation: Row count, schema match, freshness
(2) Staging validation: Nulls, duplicates, data types, ranges
(3) Transform validation: Business rules, referential integrity, aggregation checks

Common Validation Checks

Check TypeExampleWhen to Use
SchemaColumn names and types match expected schemaAfter ingestion
Null checkcustomer_id is never NULLAfter ingestion, after transform
Uniquenessorder_id has no duplicatesAfter ingestion
Rangeorder_amount is between 0 and 1,000,000After transform
ReferentialEvery customer_id in facts exists in dim_customersAfter loading
FreshnessMost recent record is within the last 24 hoursBefore serving
VolumeRow count is within expected range (not zero, not 10x normal)After ingestion
DistributionColumn value distribution has not changed dramaticallyAfter transform
import pandas as pd
from dataclasses import dataclass
from typing import Optional
@dataclass
class ValidationResult:
check_name: str
passed: bool
details: str
severity: str # "critical", "warning", "info"
class DataValidator:
"""Validate a DataFrame against a set of quality rules."""
def __init__(self, df: pd.DataFrame):
self.df = df
self.results: list[ValidationResult] = []
def check_not_null(self, column: str, severity: str = "critical"):
null_count = self.df[column].isnull().sum()
self.results.append(ValidationResult(
check_name=f"not_null({column})",
passed=null_count == 0,
details=f"{null_count} null values found",
severity=severity,
))
return self
def check_unique(self, column: str, severity: str = "critical"):
duplicate_count = self.df[column].duplicated().sum()
self.results.append(ValidationResult(
check_name=f"unique({column})",
passed=duplicate_count == 0,
details=f"{duplicate_count} duplicates found",
severity=severity,
))
return self
def check_range(self, column: str, min_val: float,
max_val: float, severity: str = "warning"):
out_of_range = (
(self.df[column] < min_val) | (self.df[column] > max_val)
).sum()
self.results.append(ValidationResult(
check_name=f"range({column}, {min_val}-{max_val})",
passed=out_of_range == 0,
details=f"{out_of_range} values out of range",
severity=severity,
))
return self
def check_row_count(self, min_rows: int, max_rows: int,
severity: str = "critical"):
count = len(self.df)
self.results.append(ValidationResult(
check_name=f"row_count({min_rows}-{max_rows})",
passed=min_rows <= count <= max_rows,
details=f"Actual row count: {count}",
severity=severity,
))
return self
def check_referential_integrity(self, column: str,
reference_values: set,
severity: str = "critical"):
orphans = set(self.df[column].dropna()) - reference_values
self.results.append(ValidationResult(
check_name=f"referential({column})",
passed=len(orphans) == 0,
details=f"{len(orphans)} orphan keys found",
severity=severity,
))
return self
def validate(self) -> bool:
"""Run all checks and return True if all critical checks passed."""
critical_failures = [
r for r in self.results
if not r.passed and r.severity == "critical"
]
for r in self.results:
status = "PASS" if r.passed else "FAIL"
print(f" [{status}] {r.check_name}: {r.details}")
return len(critical_failures) == 0
# Usage
orders = pd.read_parquet("s3://warehouse/staging/orders/")
valid_customer_ids = set(
pd.read_sql("SELECT customer_id FROM dim_customers", engine)["customer_id"]
)
is_valid = (
DataValidator(orders)
.check_not_null("order_id")
.check_not_null("customer_id")
.check_unique("order_id")
.check_range("amount", 0, 1_000_000)
.check_row_count(1000, 500_000)
.check_referential_integrity("customer_id", valid_customer_ids)
.validate()
)
if not is_valid:
raise RuntimeError("Data validation failed — pipeline halted")

Great Expectations

Great Expectations is the leading open-source framework for data validation, documentation, and profiling. It provides a rich library of built-in “expectations” (validation rules) and generates human-readable data docs.

Core Concepts

ConceptDescription
ExpectationA declarative assertion about data (e.g., “column X should never be NULL”)
Expectation SuiteA collection of expectations for a specific dataset
CheckpointA runnable validation job that tests data against an expectation suite
Data DocsAuto-generated HTML documentation of validation results
Data SourceConnection to the data being validated (Pandas, Spark, SQL)

Example

import great_expectations as gx
# Initialize context
context = gx.get_context()
# Connect to data
datasource = context.sources.add_pandas("orders_source")
data_asset = datasource.add_dataframe_asset(name="orders")
# Build expectation suite
suite = context.add_expectation_suite("orders_quality_suite")
# Add expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=1000000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "processing", "shipped",
"delivered", "cancelled"]
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000, max_value=500000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnMeanToBeBetween(
column="amount", min_value=10, max_value=500
)
)
# Run validation checkpoint
checkpoint = context.add_or_update_checkpoint(
name="orders_checkpoint",
validations=[{
"batch_request": batch_request,
"expectation_suite_name": "orders_quality_suite",
}],
)
result = checkpoint.run()
if not result.success:
# Alert the team, halt the pipeline
failed_expectations = [
r for r in result.results
if not r.success
]
raise RuntimeError(
f"Data validation failed: {len(failed_expectations)} checks failed"
)

Data Contracts

A data contract is a formal agreement between the producer and consumer of a dataset. It defines the schema, quality expectations, SLAs, and ownership — treating data as a product with a clear interface.

┌──────────────────────────────────────────────────────────┐
│ Data Contract │
│ │
│ Producer: Orders Service (Backend Team) │
│ Consumer: Analytics Team, ML Team │
│ │
│ Schema: │
│ - order_id: STRING (required, unique) │
│ - customer_id: STRING (required, FK to customers) │
│ - amount: DECIMAL (required, range 0-1000000) │
│ - status: ENUM (pending | shipped | done) │
│ - created_at: TIMESTAMP (required) │
│ │
│ SLA: │
│ - Freshness: data available within 1 hour of event │
│ - Completeness: less than 0.1% null rate on required │
│ - Availability: 99.9% uptime for the data endpoint │
│ │
│ Owner: backend-team@company.com │
│ On-call: #data-incidents Slack channel │
└──────────────────────────────────────────────────────────┘

Data Contract as Code

contracts/orders.yaml
apiVersion: v1
kind: DataContract
metadata:
name: orders
version: "2.1.0"
owner: backend-team
contact: backend-team@company.com
slack: "#orders-data"
schema:
type: object
properties:
order_id:
type: string
description: "Unique order identifier"
required: true
unique: true
customer_id:
type: string
description: "Foreign key to customers dataset"
required: true
amount:
type: number
description: "Order total in USD"
required: true
minimum: 0
maximum: 1000000
status:
type: string
description: "Current order status"
required: true
enum: ["pending", "processing", "shipped", "delivered", "cancelled"]
created_at:
type: timestamp
description: "When the order was placed"
required: true
quality:
freshness:
max_delay: "1 hour"
completeness:
max_null_rate: 0.001 # 0.1%
volume:
min_rows_per_day: 1000
max_rows_per_day: 500000
sla:
availability: "99.9%"
support_hours: "24/7"
incident_response: "30 minutes"

Data Lineage

Data lineage tracks where data comes from, how it is transformed, and where it goes. It answers the question: “If this number on the dashboard is wrong, which pipeline, transformation, or source is responsible?”

┌──────────────────────────────────────────────────────────────────┐
│ Data Lineage Graph │
│ │
│ Source Systems Transformations Consumption │
│ │
│ ┌──────────┐ │
│ │ Orders │──┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Database │ ├────▶│ stg_orders │───▶│ │ │
│ └──────────┘ │ └──────────────┘ │ │ │
│ │ │ fct_orders │──┐ │
│ ┌──────────┐ │ ┌──────────────┐ │ │ │ │
│ │ Payments │──┘────▶│ stg_payments │───▶│ │ │ │
│ │ API │ └──────────────┘ └──────────────┘ │ │
│ └──────────┘ │ │
│ │ │
│ ┌──────────┐ ┌──────────────┐ │ │
│ │ CRM │──────▶ │dim_customers │──────────────────────┤ │
│ │ System │ └──────────────┘ │ │
│ └──────────┘ │ │
│ │ │
│ ┌─────────────────────────────┐ │ │
│ │ Revenue Dashboard (Looker) │◀─┘ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ ML Churn Model │◀─────┘
│ └─────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘

Lineage Tools

ToolTypeKey Feature
DataHubOpen-source catalogAutomatic lineage from Airflow, Spark, dbt
AmundsenOpen-source catalogSearch-oriented data discovery
OpenLineageOpen standardVendor-neutral lineage specification
dbtTransform toolBuilt-in column-level lineage
AtlanCommercial catalogActive metadata management
Monte CarloCommercial observabilityAutomated lineage + anomaly detection

Data Governance

Data governance is the set of policies, processes, and standards that ensure data is managed as a strategic asset. It covers who can access data, how data is classified, and how compliance requirements are met.

Key Governance Components

ComponentDescriptionExample
Data ClassificationCategorize data by sensitivity levelPublic, Internal, Confidential, Restricted
Access ControlDefine who can read, write, and share dataRole-based access with least privilege
Data RetentionHow long data is kept and when it is deletedDelete PII after 2 years per GDPR
Audit LoggingTrack who accessed or modified dataQuery logs, access logs, change history
Data StewardshipAssign ownership and accountabilityEach dataset has a designated owner
ComplianceMeet regulatory requirementsGDPR, HIPAA, SOC 2, CCPA

Classification Example

LevelDefinitionExamplesAccess
PublicNo business impact if disclosedProduct catalog, public docsAnyone
InternalFor internal use onlyRevenue reports, roadmapsEmployees
ConfidentialCould cause damage if disclosedCustomer PII, financial dataNeed-to-know
RestrictedSevere damage if disclosedSSNs, payment card data, passwordsMinimal, audited

Data Observability

Data observability extends the principles of application observability (metrics, logs, traces) to data systems. It provides continuous, automated monitoring of data health.

The Five Pillars of Data Observability

PillarQuestion It AnswersHow to Monitor
FreshnessIs data arriving on time?Track the timestamp of the most recent record
VolumeIs the right amount of data arriving?Monitor row counts over time, alert on anomalies
SchemaHas the data structure changed?Detect column additions, removals, or type changes
DistributionAre data values within expected ranges?Monitor statistical properties (mean, stddev, percentiles)
LineageWhere did this data come from? What depends on it?Track upstream/downstream dependencies
┌──────────────────────────────────────────────────────────┐
│ Data Observability Dashboard │
│ │
│ Freshness: ████████████████████░░ 95% on-time │
│ Volume: ████████████████████████ Normal range │
│ Schema: ████████████████████████ No changes │
│ Distribution: ██████████████████░░░░ 2 anomalies │
│ Lineage: ████████████████████████ All mapped │
│ │
│ Recent Alerts: │
│ [WARNING] fct_orders: row count 40% below expected │
│ [WARNING] stg_payments: amount mean shifted 2 stddev │
│ [OK] dim_customers: all checks passing │
└──────────────────────────────────────────────────────────┘

Building Observability In

from datetime import datetime, timedelta
import statistics
class DataObserver:
"""Monitor data health metrics and detect anomalies."""
def __init__(self, table_name: str, db_engine):
self.table_name = table_name
self.engine = db_engine
self.alerts = []
def check_freshness(self, max_delay_hours: int = 1):
"""Alert if the most recent record is older than expected."""
result = self.engine.execute(
f"SELECT MAX(updated_at) FROM {self.table_name}"
).scalar()
if result is None:
self.alerts.append(
f"CRITICAL: {self.table_name} has no data"
)
return
age = datetime.utcnow() - result
if age > timedelta(hours=max_delay_hours):
self.alerts.append(
f"WARNING: {self.table_name} is "
f"{age.total_seconds()/3600:.1f}h stale "
f"(max: {max_delay_hours}h)"
)
def check_volume(self, expected_min: int, expected_max: int,
partition_column: str = None,
partition_value: str = None):
"""Alert if row count is outside expected range."""
query = f"SELECT COUNT(*) FROM {self.table_name}"
if partition_column and partition_value:
query += f" WHERE {partition_column} = '{partition_value}'"
count = self.engine.execute(query).scalar()
if count < expected_min or count > expected_max:
self.alerts.append(
f"WARNING: {self.table_name} row count "
f"{count} outside range "
f"[{expected_min}, {expected_max}]"
)
def check_distribution(self, column: str,
expected_mean: float,
max_stddev_shift: float = 2.0):
"""Alert if a column's mean has shifted significantly."""
result = self.engine.execute(
f"SELECT AVG({column}), STDDEV({column}) "
f"FROM {self.table_name}"
).fetchone()
actual_mean, stddev = result
if stddev and abs(actual_mean - expected_mean) > (
max_stddev_shift * stddev
):
self.alerts.append(
f"WARNING: {self.table_name}.{column} "
f"mean shifted to {actual_mean:.2f} "
f"(expected ~{expected_mean:.2f})"
)
def report(self):
"""Print and return all alerts."""
if not self.alerts:
print(f"[OK] {self.table_name}: all checks passing")
else:
for alert in self.alerts:
print(f" {alert}")
return self.alerts
# Usage in a pipeline
observer = DataObserver("fact_orders", engine)
observer.check_freshness(max_delay_hours=2)
observer.check_volume(expected_min=5000, expected_max=100000,
partition_column="order_date",
partition_value="2024-06-15")
observer.check_distribution("amount", expected_mean=75.0)
alerts = observer.report()
if any("CRITICAL" in a for a in alerts):
send_pagerduty_alert(alerts)
elif alerts:
send_slack_notification(alerts)

Quick Reference: Data Quality Checklist

StageCheckTool
IngestionSchema matches contractGreat Expectations, custom validators
IngestionRow count within expected rangeAirflow sensors, custom checks
IngestionNo unexpected NULLs in required columnsGreat Expectations, dbt tests
TransformUniqueness of primary keysdbt tests, Great Expectations
TransformReferential integrity between tablesdbt tests
TransformBusiness rule validationCustom SQL tests
ServingData freshness within SLAData observability tools
ServingDistribution anomaly detectionMonte Carlo, custom monitoring
OngoingLineage is tracked and up to dateDataHub, OpenLineage
OngoingData contracts are versioned and enforcedContract YAML + CI/CD

Next Steps