Data Lineage: The Chain of Custody for AI

The 'Orphan Model' (Provenance Collapse)
Data Lineage
Governance
Amundsen
DataHub
Compliance

Abstract

An AI model is only as legitimate as the data that fed it. In production, a common and catastrophic failure mode is the "Orphan Model", a deployed system where the team cannot definitively trace a specific prediction back to the raw data rows that influenced it. When a regulator asks, "Did this denied loan application use the applicant's prohibited demographic data?", "I don't know" is not an answer; it is an admission of negligence. This article implements Data Lineage not as a documentation task, but as a graph-based engineering requirement, ensuring every artifact has a verifiable chain of custody from SQL source to inference endpoint.


1. Why This Topic Matters

The "move fast and break things" era left us with data swamps, thousands of undocumented S3 buckets and CSVs.

  • The Regulatory Hammer: Under GDPR and the EU AI Act, you must be able to prove data provenance. If a user exercises their "Right to be Forgotten," you must identify every downstream dataset and model that contains their data. Without lineage, this is impossible.
  • Debugging Distribution Shift: When a model suddenly degrades, lineage allows you to trace upstream. Did the marketing_leads table schema change? Did the currency_conversion ETL job fail?
  • The "Orphan" Risk: If you cannot trace the source, you cannot verify if the data had consent for commercial use. An "Orphan Model" is legally radioactive.

2. Core Concepts & Mental Models

The Lineage Graph

Think of your data ecosystem as a Directed Acyclic Graph (DAG).

  • Nodes: Datasets (SQL Tables, S3 Files), Jobs (ETL scripts), Models.
  • Edges: "Flows into", "Transforms", "Trains".

The Goal: A queryable map where Model_X depends on Feature_Set_Y which depends on Raw_Table_Z.

Active vs. Passive Lineage

  • Passive Lineage: Parsing SQL logs and code to "guess" relationships. (Easier to start, brittle).
  • Active Lineage: Jobs explicitly push metadata to a central catalog (like DataHub or Amundsen) during execution. (Requires code changes, robust).

3. Theoretical Foundations (Trade-offs)

Bureaucracy vs. Agility

This is the central tension.

  • The Bureaucracy Trap: Requiring developers to manually fill out forms for every new CSV slows velocity to a crawl. They will lie or skip it.
  • The Agility Trap: Letting developers create datasets without registration leads to a "Shadow Data" ecosystem that is impossible to audit.

The Engineering Resolution: Automated Metadata Extraction. Do not ask humans to document lineage; write code that extracts lineage from the build pipeline. Documentation should be a side-effect of deployment, not a prerequisite.


4. Production-Grade Implementation

We adopt a Metadata-First approach. Before a model is trained, its feature definitions, linked to their sources, must be registered.

Key Governance Policy: Data Expiry (TTL) Data is not an asset that appreciates indefinitely; it is a liability that accumulates risk.

  • Policy: Raw user activity logs have a Time-To-Live (TTL) of 3 years.
  • Implementation: The lineage system must flag models trained on "expired" data, forcing a retrain on fresher, compliant data.

5. Hands-On Project / Exercise

Objective: Build a lightweight "Lineage Resolver" that parses a Model Feature Configuration and generates a purely auditable JSON Data Dictionary mapping features back to their raw SQL sources.

Constraint: The system must identify the owner and the source table for every input.

Step 1: Define the Metadata Schema

We define our resources in code.

import json
from datetime import datetime
from typing import List, Dict

# 1. The Raw Source Registry (mocking a Data Catalog)
RAW_SOURCES = {
    "warehouse.users": {
        "owner": "data-eng-team@company.com",
        "description": "Core user table, PII scrubbed",
        "retention_policy": "3_years",
        "columns": ["user_id", "signup_date", "zip_code"]
    },
    "warehouse.transactions": {
        "owner": "payments-team@company.com",
        "description": "Ledger of all credit card txns",
        "retention_policy": "7_years",
        "columns": ["txn_id", "user_id", "amount", "timestamp"]
    }
}

# 2. The Model Feature Config
# This defines WHAT the model needs, and explicitly links to WHERE it comes from.
MODEL_CONFIG = {
    "model_id": "credit_risk_v2",
    "features": [
        {
            "name": "account_age_days",
            "source_table": "warehouse.users",
            "source_column": "signup_date",
            "transformation": "datediff(now(), signup_date)"
        },
        {
            "name": "avg_txn_amount",
            "source_table": "warehouse.transactions",
            "source_column": "amount",
            "transformation": "avg(amount) over 30 days"
        }
    ]
}

Step 2: The Lineage Resolver

This script acts as the "Audit Bot." It validates that sources exist and generates the dictionary.

class LineageTracer:
    def __init__(self, sources: Dict, model_config: Dict):
        self.sources = sources
        self.model = model_config

    def generate_dictionary(self):
        lineage_report = {
            "model_id": self.model["model_id"],
            "generated_at": datetime.now().isoformat(),
            "audit_status": "PASS",
            "lineage": []
        }

        for feature in self.model["features"]:
            source_table = feature["source_table"]

            # Validation: Does the source exist?
            if source_table not in self.sources:
                lineage_report["audit_status"] = "FAIL"
                print(f"CRITICAL ERROR: Feature '{feature['name']}' references unknown source '{source_table}'")
                return lineage_report

            # Extraction: Pull governance metadata
            source_meta = self.sources[source_table]

            entry = {
                "feature_name": feature["name"],
                "upstream_source": source_table,
                "upstream_column": feature["source_column"],
                "data_owner": source_meta["owner"],
                "retention_policy": source_meta["retention_policy"],
                "logic": feature["transformation"]
            }
            lineage_report["lineage"].append(entry)

        return lineage_report

# Execute
tracer = LineageTracer(RAW_SOURCES, MODEL_CONFIG)
dictionary = tracer.generate_dictionary()

print(json.dumps(dictionary, indent=2))

Output (The Artifact):

{
  "model_id": "credit_risk_v2",
  "generated_at": "2026-01-18T10:00:00",
  "audit_status": "PASS",
  "lineage": [
    {
      "feature_name": "account_age_days",
      "upstream_source": "warehouse.users",
      "upstream_column": "signup_date",
      "data_owner": "data-eng-team@company.com",
      "retention_policy": "3_years",
      "logic": "datediff(now(), signup_date)"
    },
    ...
  ]
}

Business Value: You can now query this JSON. "Find all models dependent on warehouse.users owned by data-eng-team." If that team discovers a data bug, they know exactly who to call.


6. Ethical, Security & Safety Considerations

  • The "Ghost Data" Problem:

  • Issue: You delete a user from the database, but their data persists in model_training_set_v1.csv on a developer's laptop.

  • Fix: Lineage systems must track copies (extracts). S3 buckets should have lifecycle policies that auto-delete untagged exports after 30 days.

  • Access Control Propagation:

  • If warehouse.transactions is "Highly Confidential," then any feature derived from it (like avg_txn_amount) must inherit that classification. The lineage graph enforces this security inheritance.


7. Business & Strategic Implications

Due Diligence & M&A: If your company is acquired, the acquirer will audit your IP.

  • Scenario: "You claimed this model is proprietary. Prove it wasn't trained on open-source datasets with 'Non-Commercial' licenses."
  • Defense: Your lineage graph is the proof of clean IP title.

Root Cause Analysis Speed: When a dashboard breaks, the question "Who changed the data?" usually takes 3 days of emails to answer. With lineage, it takes 3 seconds.


8. Code Examples / Pseudocode

Automated Schema Evolution Check (CI/CD):

def check_schema_compatibility(new_schema, model_expectations):
    """
    Runs in CI. Fails the build if a column used by a model
    is dropped or renamed in the source DB.
    """
    for feature in model_expectations:
        if feature.column_name not in new_schema.columns:
            raise BreakingChangeError(
                f"Schema change breaks Model {feature.model_id}. "
                f"Column {feature.column_name} is missing."
            )


9. Common Pitfalls & Misconceptions

  1. "Documentation is Lineage."
  • Correction: Wikis are where knowledge goes to die. If the lineage isn't generated by the code/system itself, it is already outdated.
  1. Tracking Everything.
  • Correction: Tracking every temporary scratchpad table creates noise. Focus on Key Data Elements (KDEs), data that actually feeds production models or regulatory reports.
  1. Ignoring Transformations.
  • Correction: Knowing source -> target isn't enough. You need to know how it changed. Did age get normalized? Did null become -1? The transformation logic is part of the lineage.

10. Prerequisites & Next Steps

Prerequisites:

  • Understanding of SQL tables and columns.
  • Basic JSON manipulation.

Next Steps:

  • We have secured the code (Day 17) and the data lineage (Day 18).
  • Now we must optimize the process of experimentation itself.
  • Move to Day 19: The ROI of AI: Translating F1 Scores to P&L.

11. Further Reading & Resources