Phase 1 Capstone: The 'End-to-End' Production Pipeline

Integration Hell (Fragmented Architecture)
Pipeline
Orchestration
MLflow
Capstone
Refactoring

Abstract

We have spent 19 days building the components of a robust AI system: ingestion, cleaning, training, evaluation, bias auditing, and documentation. However, a pile of high-quality car parts is not a car. If these components live in separate Jupyter Notebooks or disconnected scripts, you do not have a system; you have a science project. Today, we execute the Phase 1 Capstone. We will refactor our isolated learnings into a single, atomic, reproducible pipeline. This passes the "Monday Morning Test": Can a new engineer join the team, run one command, and rebuild the entire model state from raw data to deployment artifact?


1. Why This Topic Matters

The primary failure mode at this stage is Integration Entropy.

  • The "Notebook Trap": You have clean_data_v3.ipynb and train_model_final_v2.ipynb. In which order do they run? Did you restart the kernel? The process exists only in your head.
  • The Audit Gap: A regulator asks, "How was this model trained?" You show them the model file. They ask, "What data produced this?" You point to a CSV. They ask, "How was that CSV cleaned?" You hesitate.
  • The "Bus Factor": If you leave, the model dies because no one else knows the secret sequence of scripts required to update it.

The Engineering Standard: The process is the product. The model artifact is just the output of the process.


2. Core Concepts & Mental Models

The Directed Acyclic Graph (DAG)

We must treat our workflow as a DAG, a one-way flow of data through transformation stages.

  1. Ingest: Raw Data Validated DataFrame.
  2. Process: DataFrame Clean Features.
  3. Train: Features Model Artifact.
  4. Verify: Model Artifact Metrics & Bias Report.
  5. Publish: Artifact + Report Model Registry / Model Card.

Artifact Lineage

Every step must produce an immutable artifact saved to disk (or object storage).

  • Bad: Steps pass variables in memory (RAM). If step 3 fails, you lose step 2.
  • Good: Step 1 saves raw.parquet. Step 2 reads raw.parquet and saves clean.parquet. This allows for "Resume" capabilities and debugging.

3. Theoretical Foundations (Trade-offs)

Orchestration: Script vs. Framework

  • The Script (Make/Python): Simple, readable, zero infrastructure. Great for single-server training. We will use this today.
  • The Framework (Airflow/Dagster/Kubeflow): Scalable, distributed, complex. Necessary when data is too big for one machine or when dependencies are cross-team.

The Decision: Do not implement Airflow for a team of one. Start with a solid pipeline.py. If the logic is sound, porting to Airflow later is trivial.


4. Production-Grade Implementation

We will build run_pipeline.py. This is the Commander. It imports logic from our src modules (which represent previous days' work) and executes them in order, wrapping everything in robust error handling and MLflow tracking.

Architecture:

  • src/ingest.py (Day 2, 3)
  • src/features.py (Day 6, 13)
  • src/train.py (Day 8, 9)
  • src/audit.py (Day 11, 12)
  • config.yaml (Centralized configuration)

5. Hands-On Project / Exercise

Objective: Build the "One Command" pipeline. Scenario: Predicting Credit Default (The recurring theme of Phase 1).

Step 1: The Configuration (config.yaml)

Stop hardcoding paths.

project_name: "credit_risk_capstone"
data:
  raw_path: "data/raw/loans_2025.csv"
  processed_path: "data/processed/clean_features.parquet"
model:
  type: "xgboost"
  params:
    max_depth: 4
    learning_rate: 0.1
governance:
  protected_attribute: "gender"
  fairness_threshold: 0.8

Step 2: The Orchestrator (run_pipeline.py)

import mlflow
import yaml
import logging
import sys
from src.ingest import load_and_validate
from src.features import clean_and_featurize
from src.train import train_model
from src.audit import audit_fairness, generate_model_card

# Setup Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def load_config(path="config.yaml"):
    with open(path, 'r') as f:
        return yaml.safe_load(f)

def main():
    cfg = load_config()
    mlflow.set_experiment(cfg['project_name'])

    with mlflow.start_run() as run:
        logger.info("Pipeline Started")

        # 1. Ingestion (Day 2 & 3)
        logger.info("Step 1: Ingestion & Validation")
        raw_df = load_and_validate(cfg['data']['raw_path'])
        mlflow.log_param("input_rows", len(raw_df))

        # 2. Cleaning & Privacy (Day 6 & 13)
        logger.info("Step 2: Cleaning & Privacy Firewall")
        clean_df = clean_and_featurize(raw_df, remove_pii=True)
        # Save artifact for lineage
        clean_df.to_parquet(cfg['data']['processed_path'])
        mlflow.log_artifact(cfg['data']['processed_path'])

        # 3. Training (Day 8)
        logger.info(f"Step 3: Training {cfg['model']['type']}")
        model, metrics = train_model(
            clean_df,
            target="default",
            params=cfg['model']['params']
        )

        # 4. Evaluation (Day 9)
        logger.info(f"Step 4: Evaluation Results - AUC: {metrics['auc']:.4f}")
        mlflow.log_metrics(metrics)

        # 5. Bias Audit (Day 11)
        logger.info("Step 5: Fairness Audit")
        fairness_score = audit_fairness(
            model,
            clean_df,
            protected_group=cfg['governance']['protected_attribute']
        )
        mlflow.log_metric("disparate_impact", fairness_score)

        if fairness_score < cfg['governance']['fairness_threshold']:
            logger.warning(f"Fairness Alert: DIR {fairness_score:.2f} is below threshold!")
            mlflow.set_tag("compliance_status", "FAIL")
        else:
            mlflow.set_tag("compliance_status", "PASS")

        # 6. Model Card & Registration (Day 14)
        logger.info("Step 6: Generating Documentation")
        card_path = generate_model_card(model, metrics, fairness_score)
        mlflow.log_artifact(card_path)

        # 7. Serialize
        mlflow.sklearn.log_model(model, "model")
        logger.info("Pipeline Complete. Model registered.")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logger.error(f"Pipeline Failed: {e}")
        sys.exit(1)

Why this passes the test:

  • ** reproducible:** It reads from a config file.
  • Traceable: Every run logs to MLflow.
  • Safe: It includes the Privacy and Bias checks we engineered in Days 11-13.
  • Atomic: If ingestion fails, training never starts.

6. Ethical, Security & Safety Considerations

  • Audit Trails: By forcing all training through this script, we ensure no "rogue models" exist. If a model isn't in MLflow, it doesn't exist.
  • Version Control: This run_pipeline.py must be committed to Git. The data is not committed, but the logic is.
  • Environment Consistency: This script assumes a consistent Python environment. In Phase 2, we will containerize this script (Docker) to ensure it runs identically on your laptop and the cloud.

7. Business & Strategic Implications

The "Bus Factor" Insurance: This pipeline is an institutional asset. It transfers knowledge from the engineer's brain into the codebase.

Operational Velocity: When new data arrives next month, you don't need a PhD to retrain the model. You need a junior engineer to run python run_pipeline.py. This decoupling of designing the system vs. operating the system is crucial for scaling.


8. Phase 1 Review: The Journey So Far

We have completed the Foundations of Responsible AI Engineering.

  • Days 1-5: We learned to handle data, track experiments, and write clean code.
  • Days 6-10: We mastered data cleaning, baseline modeling, and evaluation metrics.
  • Days 11-14: We injected responsibility: Bias auditing, XAI, Privacy, and Documentation.
  • Days 15-19: We bridged the gap to the modern world (LLMs, Cloud, CI/CD) and business strategy.

What Comes Next? (Phase 2 Preview) We are leaving the "Lab." Phase 1 was about building a solid model on a single machine. Phase 2 is about Scale and Real-Time Systems.

  • Streaming Data (Kafka).
  • Vector Databases (RAG at scale).
  • Containerization (Docker/Kubernetes).
  • API Development (FastAPI).

9. Prerequisites & Next Steps

Prerequisites:

  • Completion of Days 1-19 exercises (or at least understanding the concepts).
  • A working Python environment with mlflow, pandas, scikit-learn, pyyaml.

Next Steps:


10. Further Reading & Resources