A contract-driven data quality pipeline that profiles datasets, builds historical baselines, detects anomalies, computes data quality scores, performs root-cause analysis, and triggers alerts. Supports multiple datasets with strict isolation.
- YAML-based data contracts
- Dataset registry and isolation (dataset_id)
- Automated schema generation
- Data ingestion from CSV sources
- Profiling (record counts, null metrics)
- Rolling baseline construction (30-day window)
- Drift detection:
- Schema drift
- Distribution drift
- Referential drift
- Anomaly detection and aggregation
- Data Quality score computation
- Root cause analysis
- Alerting (console + mock email/Slack)
- Dockerized execution
Python 3 PostgreSQL pandas psycopg2 PyYAML Docker
DQ/ ├── run_pipeline.py
├── contract_validator.py
├── contracts/
├── registry/
├── schema/
├── ingestion/
├── profiling/
├── baseline/
├── drift/
├── anomaly/
├── aggregation/
├── scoring/
├── root_cause/
├── alerting/
├── migrations/
├── data/
│ └── raw/
└── docker/
Python 3.9+
PostgreSQL
Docker (optional)
Database Setup
-- Create database and apply schema:
psql -d data_quality_db -f migrations/create_all_tables.sql
--Required schemas: dq raw
-- Dataset Registration
Each dataset must be registered once.
python registry/dataset_registry.py contracts/data_contract.yaml --dataset orders_pipeline
python registry/dataset_registry.py contracts/adult_income.yaml --dataset adult_income
This inserts records into dq.dq_datasets and assigns unique dataset_id.
-- Running the Pipeline (Local)
python run_pipeline.py contracts/adult_income.yaml --dataset adult_income
python run_pipeline.py contracts/adult_income.yaml --dataset adult_income --run-date 2026-01-08
python run_pipeline.py contracts/adult_income.yaml --dataset adult_income --start-date 2026-01-01 --end-date 2026-01-07
Build image docker build -t dq-pipeline .
docker run --rm -e DB_HOST=host.docker.internal -e DB_NAME=data_quality_db -e DB_USER=postgres -e DB_PASSWORD=your_password -e DB_PORT=5432 -v C:\Users\LAKSHA\Downloads\DQ:/app dq-pipeline contracts/adult_income.yaml --dataset adult_income
Contract validation
Schema generation
Data ingestion
Schema drift detection
Profiling
Baseline construction
Baseline audit
Comparison engine
Distribution drift detection
Referential drift detection
Anomaly detection
Aggregation
Scoring
Root cause analysis
Alerting
Each step is logged in dq.dq_run_history.
dq_datasets dq_run_history dq_current_stats dq_baseline_stats dq_anomalies dq_aggregated_anomalies dq_score_history dq_root_causes
DQ Score computed | dataset_id=2 | score=50 | status=CRITICAL | top_issue=schema
ALERT TRIGGERED Run Date : 2026-01-08 DQ Score : 50 (CRITICAL) Top Issue : schema
Pipeline is idempotent.
Each dataset is fully isolated via dataset_id.
Backfills and reruns are supported.
CSV ingestion assumes files exist under data/raw/<dataset_name>/
MIT