Data Quality
Data quality is the measure of how well data serves its intended purpose. A pipeline that delivers data on time but fills a dashboard with incorrect numbers is worse than no pipeline at all — it creates confident bad decisions. Data quality is not a one-time check; it is a continuous discipline woven into every stage of the data lifecycle.
Dimensions of Data Quality
Data quality is not a single metric — it is a multi-dimensional concept. Each dimension captures a different aspect of “good” data.
| Dimension | Definition | Example Check |
|---|---|---|
| Accuracy | Data correctly represents the real-world entity | Customer email matches the actual email address |
| Completeness | Required fields are populated, no unexpected NULLs | 99% of orders have a shipping_address |
| Consistency | Same data does not contradict itself across systems | Order count in source matches order count in warehouse |
| Timeliness | Data is available when expected | Daily sales report is ready by 7 AM |
| Uniqueness | No unintended duplicates | Each order_id appears exactly once |
| Validity | Data conforms to defined rules and formats | Email contains @, age is between 0 and 150 |
┌──────────────────────────────────────────────────────────┐│ Data Quality Dimensions ││ ││ ┌──────────┐ ┌────────────┐ ┌─────────────┐ ││ │ Accuracy │ │Completeness│ │ Consistency │ ││ │ │ │ │ │ │ ││ │ Is it │ │ Is it all │ │ Does it │ ││ │ correct? │ │ there? │ │ agree across │ ││ │ │ │ │ │ systems? │ ││ └──────────┘ └────────────┘ └─────────────┘ ││ ││ ┌──────────┐ ┌────────────┐ ┌─────────────┐ ││ │Timeliness│ │ Uniqueness │ │ Validity │ ││ │ │ │ │ │ │ ││ │ Is it on │ │ Is it free │ │ Does it │ ││ │ time? │ │ of dupes? │ │ follow the │ ││ │ │ │ │ │ rules? │ ││ └──────────┘ └────────────┘ └─────────────┘ │└──────────────────────────────────────────────────────────┘Data Validation
Data validation is the practice of programmatically checking data at each stage of the pipeline to catch issues before they propagate downstream.
Where to Validate
┌──────────┐ Validate ┌───────────┐ Validate ┌──────────┐ Validate ┌──────────┐│ Source │────(1)──────▶│ Staging │────(2)──────▶│Transform │────(3)──────▶│ Serve │└──────────┘ └───────────┘ └──────────┘ └──────────┘
(1) Source validation: Row count, schema match, freshness(2) Staging validation: Nulls, duplicates, data types, ranges(3) Transform validation: Business rules, referential integrity, aggregation checksCommon Validation Checks
| Check Type | Example | When to Use |
|---|---|---|
| Schema | Column names and types match expected schema | After ingestion |
| Null check | customer_id is never NULL | After ingestion, after transform |
| Uniqueness | order_id has no duplicates | After ingestion |
| Range | order_amount is between 0 and 1,000,000 | After transform |
| Referential | Every customer_id in facts exists in dim_customers | After loading |
| Freshness | Most recent record is within the last 24 hours | Before serving |
| Volume | Row count is within expected range (not zero, not 10x normal) | After ingestion |
| Distribution | Column value distribution has not changed dramatically | After transform |
import pandas as pdfrom dataclasses import dataclassfrom typing import Optional
@dataclassclass ValidationResult: check_name: str passed: bool details: str severity: str # "critical", "warning", "info"
class DataValidator: """Validate a DataFrame against a set of quality rules."""
def __init__(self, df: pd.DataFrame): self.df = df self.results: list[ValidationResult] = []
def check_not_null(self, column: str, severity: str = "critical"): null_count = self.df[column].isnull().sum() self.results.append(ValidationResult( check_name=f"not_null({column})", passed=null_count == 0, details=f"{null_count} null values found", severity=severity, )) return self
def check_unique(self, column: str, severity: str = "critical"): duplicate_count = self.df[column].duplicated().sum() self.results.append(ValidationResult( check_name=f"unique({column})", passed=duplicate_count == 0, details=f"{duplicate_count} duplicates found", severity=severity, )) return self
def check_range(self, column: str, min_val: float, max_val: float, severity: str = "warning"): out_of_range = ( (self.df[column] < min_val) | (self.df[column] > max_val) ).sum() self.results.append(ValidationResult( check_name=f"range({column}, {min_val}-{max_val})", passed=out_of_range == 0, details=f"{out_of_range} values out of range", severity=severity, )) return self
def check_row_count(self, min_rows: int, max_rows: int, severity: str = "critical"): count = len(self.df) self.results.append(ValidationResult( check_name=f"row_count({min_rows}-{max_rows})", passed=min_rows <= count <= max_rows, details=f"Actual row count: {count}", severity=severity, )) return self
def check_referential_integrity(self, column: str, reference_values: set, severity: str = "critical"): orphans = set(self.df[column].dropna()) - reference_values self.results.append(ValidationResult( check_name=f"referential({column})", passed=len(orphans) == 0, details=f"{len(orphans)} orphan keys found", severity=severity, )) return self
def validate(self) -> bool: """Run all checks and return True if all critical checks passed.""" critical_failures = [ r for r in self.results if not r.passed and r.severity == "critical" ] for r in self.results: status = "PASS" if r.passed else "FAIL" print(f" [{status}] {r.check_name}: {r.details}") return len(critical_failures) == 0
# Usageorders = pd.read_parquet("s3://warehouse/staging/orders/")valid_customer_ids = set( pd.read_sql("SELECT customer_id FROM dim_customers", engine)["customer_id"])
is_valid = ( DataValidator(orders) .check_not_null("order_id") .check_not_null("customer_id") .check_unique("order_id") .check_range("amount", 0, 1_000_000) .check_row_count(1000, 500_000) .check_referential_integrity("customer_id", valid_customer_ids) .validate())
if not is_valid: raise RuntimeError("Data validation failed — pipeline halted")class DataValidator { constructor(data) { this.data = data; this.results = []; }
checkNotNull(column, severity = "critical") { const nullCount = this.data.filter( (row) => row[column] == null || row[column] === "" ).length; this.results.push({ checkName: `not_null(${column})`, passed: nullCount === 0, details: `${nullCount} null values found`, severity, }); return this; }
checkUnique(column, severity = "critical") { const seen = new Set(); let dupeCount = 0; for (const row of this.data) { if (seen.has(row[column])) dupeCount++; seen.add(row[column]); } this.results.push({ checkName: `unique(${column})`, passed: dupeCount === 0, details: `${dupeCount} duplicates found`, severity, }); return this; }
checkRange(column, min, max, severity = "warning") { const outOfRange = this.data.filter( (row) => row[column] < min || row[column] > max ).length; this.results.push({ checkName: `range(${column}, ${min}-${max})`, passed: outOfRange === 0, details: `${outOfRange} values out of range`, severity, }); return this; }
checkRowCount(min, max, severity = "critical") { const count = this.data.length; this.results.push({ checkName: `row_count(${min}-${max})`, passed: count >= min && count <= max, details: `Actual row count: ${count}`, severity, }); return this; }
validate() { const criticalFailures = this.results.filter( (r) => !r.passed && r.severity === "critical" ); for (const r of this.results) { const status = r.passed ? "PASS" : "FAIL"; console.log(` [${status}] ${r.checkName}: ${r.details}`); } return criticalFailures.length === 0; }}
// Usageconst isValid = new DataValidator(orders) .checkNotNull("order_id") .checkNotNull("customer_id") .checkUnique("order_id") .checkRange("amount", 0, 1000000) .checkRowCount(1000, 500000) .validate();
if (!isValid) { throw new Error("Data validation failed — pipeline halted");}Great Expectations
Great Expectations is the leading open-source framework for data validation, documentation, and profiling. It provides a rich library of built-in “expectations” (validation rules) and generates human-readable data docs.
Core Concepts
| Concept | Description |
|---|---|
| Expectation | A declarative assertion about data (e.g., “column X should never be NULL”) |
| Expectation Suite | A collection of expectations for a specific dataset |
| Checkpoint | A runnable validation job that tests data against an expectation suite |
| Data Docs | Auto-generated HTML documentation of validation results |
| Data Source | Connection to the data being validated (Pandas, Spark, SQL) |
Example
import great_expectations as gx
# Initialize contextcontext = gx.get_context()
# Connect to datadatasource = context.sources.add_pandas("orders_source")data_asset = datasource.add_dataframe_asset(name="orders")
# Build expectation suitesuite = context.add_expectation_suite("orders_quality_suite")
# Add expectationssuite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"))suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="order_id"))suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column="amount", min_value=0, max_value=1000000 ))suite.add_expectation( gx.expectations.ExpectColumnValuesToBeInSet( column="status", value_set=["pending", "processing", "shipped", "delivered", "cancelled"] ))suite.add_expectation( gx.expectations.ExpectTableRowCountToBeBetween( min_value=1000, max_value=500000 ))suite.add_expectation( gx.expectations.ExpectColumnMeanToBeBetween( column="amount", min_value=10, max_value=500 ))
# Run validation checkpointcheckpoint = context.add_or_update_checkpoint( name="orders_checkpoint", validations=[{ "batch_request": batch_request, "expectation_suite_name": "orders_quality_suite", }],)
result = checkpoint.run()
if not result.success: # Alert the team, halt the pipeline failed_expectations = [ r for r in result.results if not r.success ] raise RuntimeError( f"Data validation failed: {len(failed_expectations)} checks failed" )Data Contracts
A data contract is a formal agreement between the producer and consumer of a dataset. It defines the schema, quality expectations, SLAs, and ownership — treating data as a product with a clear interface.
┌──────────────────────────────────────────────────────────┐│ Data Contract ││ ││ Producer: Orders Service (Backend Team) ││ Consumer: Analytics Team, ML Team ││ ││ Schema: ││ - order_id: STRING (required, unique) ││ - customer_id: STRING (required, FK to customers) ││ - amount: DECIMAL (required, range 0-1000000) ││ - status: ENUM (pending | shipped | done) ││ - created_at: TIMESTAMP (required) ││ ││ SLA: ││ - Freshness: data available within 1 hour of event ││ - Completeness: less than 0.1% null rate on required ││ - Availability: 99.9% uptime for the data endpoint ││ ││ Owner: backend-team@company.com ││ On-call: #data-incidents Slack channel │└──────────────────────────────────────────────────────────┘Data Contract as Code
apiVersion: v1kind: DataContractmetadata: name: orders version: "2.1.0" owner: backend-team contact: backend-team@company.com slack: "#orders-data"
schema: type: object properties: order_id: type: string description: "Unique order identifier" required: true unique: true customer_id: type: string description: "Foreign key to customers dataset" required: true amount: type: number description: "Order total in USD" required: true minimum: 0 maximum: 1000000 status: type: string description: "Current order status" required: true enum: ["pending", "processing", "shipped", "delivered", "cancelled"] created_at: type: timestamp description: "When the order was placed" required: true
quality: freshness: max_delay: "1 hour" completeness: max_null_rate: 0.001 # 0.1% volume: min_rows_per_day: 1000 max_rows_per_day: 500000
sla: availability: "99.9%" support_hours: "24/7" incident_response: "30 minutes"from dataclasses import dataclass, fieldfrom enum import Enum
class Severity(Enum): CRITICAL = "critical" WARNING = "warning" INFO = "info"
@dataclassclass ColumnContract: name: str dtype: str required: bool = True unique: bool = False min_value: float = None max_value: float = None allowed_values: list[str] = None description: str = ""
@dataclassclass QualityContract: max_null_rate: float = 0.001 max_freshness_hours: int = 1 min_daily_rows: int = 0 max_daily_rows: int = float("inf")
@dataclassclass DataContract: name: str version: str owner: str columns: list[ColumnContract] quality: QualityContract = field(default_factory=QualityContract)
def validate_schema(self, df) -> list[str]: """Check that a DataFrame conforms to this contract.""" errors = [] for col in self.columns: if col.name not in df.columns: if col.required: errors.append( f"Missing required column: {col.name}" ) else: null_rate = df[col.name].isnull().mean() if null_rate > self.quality.max_null_rate: errors.append( f"Column {col.name} null rate " f"{null_rate:.4f} exceeds max " f"{self.quality.max_null_rate}" ) return errors
# Define the contractorders_contract = DataContract( name="orders", version="2.1.0", owner="backend-team", columns=[ ColumnContract("order_id", "string", unique=True), ColumnContract("customer_id", "string"), ColumnContract("amount", "float", min_value=0, max_value=1_000_000), ColumnContract("status", "string", allowed_values=["pending", "processing", "shipped", "delivered", "cancelled"]), ColumnContract("created_at", "timestamp"), ],)Data Lineage
Data lineage tracks where data comes from, how it is transformed, and where it goes. It answers the question: “If this number on the dashboard is wrong, which pipeline, transformation, or source is responsible?”
┌──────────────────────────────────────────────────────────────────┐│ Data Lineage Graph ││ ││ Source Systems Transformations Consumption ││ ││ ┌──────────┐ ││ │ Orders │──┐ ┌──────────────┐ ┌──────────────┐ ││ │ Database │ ├────▶│ stg_orders │───▶│ │ ││ └──────────┘ │ └──────────────┘ │ │ ││ │ │ fct_orders │──┐ ││ ┌──────────┐ │ ┌──────────────┐ │ │ │ ││ │ Payments │──┘────▶│ stg_payments │───▶│ │ │ ││ │ API │ └──────────────┘ └──────────────┘ │ ││ └──────────┘ │ ││ │ ││ ┌──────────┐ ┌──────────────┐ │ ││ │ CRM │──────▶ │dim_customers │──────────────────────┤ ││ │ System │ └──────────────┘ │ ││ └──────────┘ │ ││ │ ││ ┌─────────────────────────────┐ │ ││ │ Revenue Dashboard (Looker) │◀─┘ ││ └─────────────────────────────┘ ││ ┌─────────────────────────────┐ ││ │ ML Churn Model │◀─────┘│ └─────────────────────────────┘ │└──────────────────────────────────────────────────────────────────┘Lineage Tools
| Tool | Type | Key Feature |
|---|---|---|
| DataHub | Open-source catalog | Automatic lineage from Airflow, Spark, dbt |
| Amundsen | Open-source catalog | Search-oriented data discovery |
| OpenLineage | Open standard | Vendor-neutral lineage specification |
| dbt | Transform tool | Built-in column-level lineage |
| Atlan | Commercial catalog | Active metadata management |
| Monte Carlo | Commercial observability | Automated lineage + anomaly detection |
Data Governance
Data governance is the set of policies, processes, and standards that ensure data is managed as a strategic asset. It covers who can access data, how data is classified, and how compliance requirements are met.
Key Governance Components
| Component | Description | Example |
|---|---|---|
| Data Classification | Categorize data by sensitivity level | Public, Internal, Confidential, Restricted |
| Access Control | Define who can read, write, and share data | Role-based access with least privilege |
| Data Retention | How long data is kept and when it is deleted | Delete PII after 2 years per GDPR |
| Audit Logging | Track who accessed or modified data | Query logs, access logs, change history |
| Data Stewardship | Assign ownership and accountability | Each dataset has a designated owner |
| Compliance | Meet regulatory requirements | GDPR, HIPAA, SOC 2, CCPA |
Classification Example
| Level | Definition | Examples | Access |
|---|---|---|---|
| Public | No business impact if disclosed | Product catalog, public docs | Anyone |
| Internal | For internal use only | Revenue reports, roadmaps | Employees |
| Confidential | Could cause damage if disclosed | Customer PII, financial data | Need-to-know |
| Restricted | Severe damage if disclosed | SSNs, payment card data, passwords | Minimal, audited |
Data Observability
Data observability extends the principles of application observability (metrics, logs, traces) to data systems. It provides continuous, automated monitoring of data health.
The Five Pillars of Data Observability
| Pillar | Question It Answers | How to Monitor |
|---|---|---|
| Freshness | Is data arriving on time? | Track the timestamp of the most recent record |
| Volume | Is the right amount of data arriving? | Monitor row counts over time, alert on anomalies |
| Schema | Has the data structure changed? | Detect column additions, removals, or type changes |
| Distribution | Are data values within expected ranges? | Monitor statistical properties (mean, stddev, percentiles) |
| Lineage | Where did this data come from? What depends on it? | Track upstream/downstream dependencies |
┌──────────────────────────────────────────────────────────┐│ Data Observability Dashboard ││ ││ Freshness: ████████████████████░░ 95% on-time ││ Volume: ████████████████████████ Normal range ││ Schema: ████████████████████████ No changes ││ Distribution: ██████████████████░░░░ 2 anomalies ││ Lineage: ████████████████████████ All mapped ││ ││ Recent Alerts: ││ [WARNING] fct_orders: row count 40% below expected ││ [WARNING] stg_payments: amount mean shifted 2 stddev ││ [OK] dim_customers: all checks passing │└──────────────────────────────────────────────────────────┘Building Observability In
from datetime import datetime, timedeltaimport statistics
class DataObserver: """Monitor data health metrics and detect anomalies."""
def __init__(self, table_name: str, db_engine): self.table_name = table_name self.engine = db_engine self.alerts = []
def check_freshness(self, max_delay_hours: int = 1): """Alert if the most recent record is older than expected.""" result = self.engine.execute( f"SELECT MAX(updated_at) FROM {self.table_name}" ).scalar() if result is None: self.alerts.append( f"CRITICAL: {self.table_name} has no data" ) return age = datetime.utcnow() - result if age > timedelta(hours=max_delay_hours): self.alerts.append( f"WARNING: {self.table_name} is " f"{age.total_seconds()/3600:.1f}h stale " f"(max: {max_delay_hours}h)" )
def check_volume(self, expected_min: int, expected_max: int, partition_column: str = None, partition_value: str = None): """Alert if row count is outside expected range.""" query = f"SELECT COUNT(*) FROM {self.table_name}" if partition_column and partition_value: query += f" WHERE {partition_column} = '{partition_value}'" count = self.engine.execute(query).scalar() if count < expected_min or count > expected_max: self.alerts.append( f"WARNING: {self.table_name} row count " f"{count} outside range " f"[{expected_min}, {expected_max}]" )
def check_distribution(self, column: str, expected_mean: float, max_stddev_shift: float = 2.0): """Alert if a column's mean has shifted significantly.""" result = self.engine.execute( f"SELECT AVG({column}), STDDEV({column}) " f"FROM {self.table_name}" ).fetchone() actual_mean, stddev = result if stddev and abs(actual_mean - expected_mean) > ( max_stddev_shift * stddev ): self.alerts.append( f"WARNING: {self.table_name}.{column} " f"mean shifted to {actual_mean:.2f} " f"(expected ~{expected_mean:.2f})" )
def report(self): """Print and return all alerts.""" if not self.alerts: print(f"[OK] {self.table_name}: all checks passing") else: for alert in self.alerts: print(f" {alert}") return self.alerts
# Usage in a pipelineobserver = DataObserver("fact_orders", engine)observer.check_freshness(max_delay_hours=2)observer.check_volume(expected_min=5000, expected_max=100000, partition_column="order_date", partition_value="2024-06-15")observer.check_distribution("amount", expected_mean=75.0)alerts = observer.report()
if any("CRITICAL" in a for a in alerts): send_pagerduty_alert(alerts)elif alerts: send_slack_notification(alerts)Quick Reference: Data Quality Checklist
| Stage | Check | Tool |
|---|---|---|
| Ingestion | Schema matches contract | Great Expectations, custom validators |
| Ingestion | Row count within expected range | Airflow sensors, custom checks |
| Ingestion | No unexpected NULLs in required columns | Great Expectations, dbt tests |
| Transform | Uniqueness of primary keys | dbt tests, Great Expectations |
| Transform | Referential integrity between tables | dbt tests |
| Transform | Business rule validation | Custom SQL tests |
| Serving | Data freshness within SLA | Data observability tools |
| Serving | Distribution anomaly detection | Monte Carlo, custom monitoring |
| Ongoing | Lineage is tracked and up to date | DataHub, OpenLineage |
| Ongoing | Data contracts are versioned and enforced | Contract YAML + CI/CD |