🤖 Data Engineering for ML

Bienvenue dans ce module où tu vas apprendre à construire l’infrastructure data qui alimente les systèmes de Machine Learning. En tant que Data Engineer, tu ne crées pas les modèles, mais tu construis les pipelines, Feature Stores, et systèmes de monitoring qui rendent le ML possible en production.


Prérequis

Niveau Compétence
✅ Requis PySpark DataFrame API (M19)
✅ Requis Delta Lake (M20)
✅ Requis Airflow (M22, M28)
✅ Requis Data Quality avec Great Expectations (M23)
💡 Recommandé Notions de base en ML (features, training, inference)

🎯 Objectifs du module

À la fin de ce module, tu seras capable de :

  • Construire des Feature Pipelines robustes avec Spark
  • Déployer et alimenter un Feature Store (Feast)
  • Créer des Training Datasets sans data leakage
  • Implémenter la Data Validation spécifique au ML
  • Mettre en place le Data Monitoring (drift detection)
  • Comprendre l’intégration avec MLflow (côté data)

1. Le Rôle du Data Engineer dans le ML

1.1 ML Lifecycle vu par le Data Engineer

┌─────────────────────────────────────────────────────────────────────────────┐
│                     DATA ENGINEER SCOPE IN ML                               │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    DATA ENGINEER CONSTRUIT                          │  │
│   │                                                                     │  │
│   │   Raw Data ──▶ Data Pipelines ──▶ Feature Pipelines ──▶ Feature    │  │
│   │                                                          Store     │  │
│   │                                                            │        │  │
│   │   Training Data ◀── Serving Data ◀── Data Validation ◀────┘        │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                          │                                  │
│                                          ▼                                  │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    DATA SCIENTIST UTILISE                           │  │
│   │                                                                     │  │
│   │   Feature Store ──▶ Model Training ──▶ Model Registry ──▶ Serving  │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 Data Engineer vs Data Scientist vs ML Engineer

Rôle Responsabilités
Data Engineer Pipelines de données, Feature Store infra, Data Quality, Monitoring data
Data Scientist Feature design, Model training, Experimentation, Evaluation
ML Engineer Model deployment, Model serving, Model monitoring, MLOps

1.3 Problèmes classiques que le DE doit résoudre

Problème Description Solution DE
Training-Serving Skew Features différentes en training vs production Feature Store unique
Data Leakage Utiliser des données du futur pour prédire le passé Point-in-time joins
Reproducibility Impossible de recréer un training dataset Dataset versioning
Feature Inconsistency Calcul différent selon les équipes Feature pipelines centralisés
Stale Features Features pas à jour en production Refresh pipelines, CDC

2. Feature Pipelines avec Spark

2.1 Qu’est-ce qu’une Feature ?

Une feature est une variable dérivée des données brutes, utilisée comme input pour un modèle ML.

Raw Data Features dérivées
Transactions individuelles total_transactions_30d, avg_amount_30d
Clics sur un site pages_viewed_7d, time_on_site_avg
Historique d’achats days_since_last_purchase, favorite_category

2.2 Transformations courantes

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("FeaturePipeline") \
    .master("local[*]") \
    .getOrCreate()
Voir le code
# Créer des données d'exemple
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("FeaturePipeline") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Données de transactions
transactions_data = [
    ("C001", "TXN001", 150.0, "Electronics", "2024-01-15"),
    ("C001", "TXN002", 25.0, "Food", "2024-01-18"),
    ("C001", "TXN003", 200.0, "Electronics", "2024-01-25"),
    ("C002", "TXN004", 75.0, "Clothing", "2024-01-10"),
    ("C002", "TXN005", 50.0, "Food", "2024-01-20"),
    ("C003", "TXN006", 500.0, "Electronics", "2024-01-05"),
    ("C003", "TXN007", 30.0, "Food", "2024-01-08"),
    ("C003", "TXN008", 120.0, "Clothing", "2024-01-22"),
    ("C001", "TXN009", 80.0, "Food", "2024-02-01"),
    ("C002", "TXN010", 300.0, "Electronics", "2024-02-05"),
]

transactions_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("transaction_id", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("category", StringType(), False),
    StructField("transaction_date", StringType(), False),
])

transactions_df = spark.createDataFrame(transactions_data, transactions_schema) \
    .withColumn("transaction_date", F.to_date("transaction_date"))

print("📦 Transactions brutes :")
transactions_df.show()
Voir le code
# ═══════════════════════════════════════════════════════════════
# FEATURE 1 : Agrégations simples
# ═══════════════════════════════════════════════════════════════

aggregation_features = transactions_df \
    .groupBy("customer_id") \
    .agg(
        F.count("*").alias("total_transactions"),
        F.sum("amount").alias("total_spent"),
        F.avg("amount").alias("avg_transaction_amount"),
        F.min("amount").alias("min_transaction_amount"),
        F.max("amount").alias("max_transaction_amount"),
        F.stddev("amount").alias("stddev_transaction_amount"),
        F.countDistinct("category").alias("unique_categories"),
        F.max("transaction_date").alias("last_transaction_date"),
        F.min("transaction_date").alias("first_transaction_date"),
    )

print("📊 Features d'agrégation :")
aggregation_features.show(truncate=False)
Voir le code
# ═══════════════════════════════════════════════════════════════
# FEATURE 2 : Window Functions (features temporelles)
# ═══════════════════════════════════════════════════════════════

# Définir les fenêtres
window_30d = Window.partitionBy("customer_id") \
    .orderBy(F.col("transaction_date").cast("long")) \
    .rangeBetween(-30 * 86400, 0)  # 30 jours en secondes

window_7d = Window.partitionBy("customer_id") \
    .orderBy(F.col("transaction_date").cast("long")) \
    .rangeBetween(-7 * 86400, 0)

# Features rolling
rolling_features = transactions_df \
    .withColumn("transaction_ts", F.col("transaction_date").cast("timestamp")) \
    .withColumn("amount_sum_30d", F.sum("amount").over(window_30d)) \
    .withColumn("amount_avg_30d", F.avg("amount").over(window_30d)) \
    .withColumn("txn_count_30d", F.count("*").over(window_30d)) \
    .withColumn("amount_sum_7d", F.sum("amount").over(window_7d)) \
    .withColumn("txn_count_7d", F.count("*").over(window_7d))

print("📈 Features avec Window Functions :")
rolling_features.select(
    "customer_id", "transaction_date", "amount",
    "amount_sum_30d", "txn_count_30d", "amount_sum_7d"
).show()
Voir le code
# ═══════════════════════════════════════════════════════════════
# FEATURE 3 : Encoding catégoriel
# ═══════════════════════════════════════════════════════════════

# One-Hot Encoding manuel (pivot)
category_pivot = transactions_df \
    .groupBy("customer_id") \
    .pivot("category") \
    .agg(F.count("*")) \
    .fillna(0)

print("🏷️ One-Hot Encoding des catégories :")
category_pivot.show()

# Catégorie favorite
favorite_category = transactions_df \
    .groupBy("customer_id", "category") \
    .agg(F.sum("amount").alias("category_total")) \
    .withColumn(
        "rank",
        F.row_number().over(
            Window.partitionBy("customer_id").orderBy(F.desc("category_total"))
        )
    ) \
    .filter(F.col("rank") == 1) \
    .select("customer_id", F.col("category").alias("favorite_category"))

print("⭐ Catégorie favorite par client :")
favorite_category.show()
Voir le code
# ═══════════════════════════════════════════════════════════════
# FEATURE 4 : Features de récence (RFM)
# ═══════════════════════════════════════════════════════════════

reference_date = "2024-02-10"

rfm_features = transactions_df \
    .groupBy("customer_id") \
    .agg(
        F.max("transaction_date").alias("last_transaction"),
        F.count("*").alias("frequency"),
        F.sum("amount").alias("monetary")
    ) \
    .withColumn(
        "recency_days",
        F.datediff(F.lit(reference_date), F.col("last_transaction"))
    ) \
    .withColumn(
        "is_active_30d",
        F.when(F.col("recency_days") <= 30, 1).otherwise(0)
    )

print("📅 Features RFM (Recency, Frequency, Monetary) :")
rfm_features.show()
Voir le code
# ═══════════════════════════════════════════════════════════════
# PIPELINE COMPLET : Assembler toutes les features
# ═══════════════════════════════════════════════════════════════

def build_customer_features(transactions_df, reference_date):
    """
    Pipeline complet de feature engineering pour les clients.
    
    Args:
        transactions_df: DataFrame des transactions
        reference_date: Date de référence pour les calculs de récence
    
    Returns:
        DataFrame avec toutes les features client
    """
    
    # 1. Agrégations de base
    base_features = transactions_df \
        .groupBy("customer_id") \
        .agg(
            F.count("*").alias("total_transactions"),
            F.sum("amount").alias("total_spent"),
            F.avg("amount").alias("avg_transaction_amount"),
            F.stddev("amount").alias("stddev_amount"),
            F.countDistinct("category").alias("unique_categories"),
            F.max("transaction_date").alias("last_transaction_date"),
            F.min("transaction_date").alias("first_transaction_date"),
        )
    
    # 2. Features de récence
    recency_features = base_features \
        .withColumn(
            "recency_days",
            F.datediff(F.lit(reference_date), F.col("last_transaction_date"))
        ) \
        .withColumn(
            "customer_tenure_days",
            F.datediff(F.lit(reference_date), F.col("first_transaction_date"))
        ) \
        .withColumn(
            "is_active_30d",
            F.when(F.col("recency_days") <= 30, 1).otherwise(0)
        )
    
    # 3. Catégorie favorite
    favorite_cat = transactions_df \
        .groupBy("customer_id", "category") \
        .agg(F.count("*").alias("cat_count")) \
        .withColumn(
            "rank",
            F.row_number().over(
                Window.partitionBy("customer_id").orderBy(F.desc("cat_count"))
            )
        ) \
        .filter(F.col("rank") == 1) \
        .select("customer_id", F.col("category").alias("favorite_category"))
    
    # 4. One-hot des catégories
    category_ohe = transactions_df \
        .groupBy("customer_id") \
        .pivot("category") \
        .agg(F.count("*")) \
        .fillna(0)
    
    # Renommer les colonnes OHE
    for col_name in category_ohe.columns:
        if col_name != "customer_id":
            category_ohe = category_ohe.withColumnRenamed(
                col_name, f"category_{col_name.lower()}_count"
            )
    
    # 5. Joindre toutes les features
    final_features = recency_features \
        .join(favorite_cat, "customer_id", "left") \
        .join(category_ohe, "customer_id", "left") \
        .withColumn("feature_timestamp", F.lit(reference_date).cast("timestamp"))
    
    return final_features

# Exécuter le pipeline
customer_features = build_customer_features(transactions_df, "2024-02-10")

print("🎯 Features client complètes :")
customer_features.show(truncate=False)
customer_features.printSchema()

2.3 Point-in-Time Correctness (Éviter le Data Leakage)

Data Leakage = utiliser des informations du futur pour prédire le passé.

❌ MAUVAIS (Data Leakage) :
   Pour prédire si le client achète le 15 janvier,
   on utilise ses transactions du 20 janvier → TRICHE !

✅ BON (Point-in-Time Correct) :
   Pour prédire si le client achète le 15 janvier,
   on utilise UNIQUEMENT ses transactions AVANT le 15 janvier.
def build_features_as_of(transactions_df, as_of_date):
    """
    Construire les features en utilisant UNIQUEMENT les données
    disponibles AVANT as_of_date.
    """
    # Filtrer les transactions AVANT la date
    filtered = transactions_df.filter(
        F.col("transaction_date") < as_of_date
    )
    
    return build_customer_features(filtered, as_of_date)

3. Infrastructure Feature Store

3.1 Pourquoi un Feature Store ?

Problème sans Feature Store | Solution avec Feature Store |

|—————————–|—————————–|| | Features calculées différemment en training vs serving | Single source of truth | | Duplication du code de features | Réutilisation | | Pas de découverte des features existantes | Feature discovery & catalog | | Point-in-time joins complexes | Built-in time-travel | | Latence élevée en serving | Online store low-latency |

3.2 Architecture Feature Store

┌─────────────────────────────────────────────────────────────────────────────┐
│                         FEATURE STORE ARCHITECTURE                          │
│                                                                             │
│   ┌─────────────────┐                                                       │
│   │  Feature        │     ┌─────────────────────────────────────────────┐  │
│   │  Pipelines      │────▶│           OFFLINE STORE                     │  │
│   │  (Spark/Airflow)│     │  (Data Warehouse / Delta Lake / Parquet)    │  │
│   └─────────────────┘     │  - Historical features                      │  │
│                           │  - Training data generation                 │  │
│                           │  - Backfill support                         │  │
│                           └──────────────────┬──────────────────────────┘  │
│                                              │                              │
│                                    Materialization Job                      │
│                                              │                              │
│                           ┌──────────────────▼──────────────────────────┐  │
│                           │           ONLINE STORE                      │  │
│                           │  (Redis / DynamoDB / Cassandra)             │  │
│                           │  - Latest feature values only               │  │
│                           │  - Low-latency serving (<10ms)              │  │
│                           │  - Real-time inference                      │  │
│                           └─────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

3.3 Feast : Feature Store Open Source

Feast (Feature Store) est le Feature Store open-source le plus populaire.

Installation

pip install feast[redis]

# Créer un projet Feast
feast init my_feature_store
cd my_feature_store

Structure du projet

my_feature_store/
├── feature_store.yaml      # Configuration
├── features.py             # Définition des features
└── data/
    └── customer_features.parquet

Configuration feature_store.yaml

project: my_ml_project
registry: data/registry.db
provider: local

offline_store:
  type: file
  # En production : type: snowflake / bigquery / redshift

online_store:
  type: redis
  connection_string: localhost:6379
  # Alternatives : dynamodb, datastore, sqlite (local)

entity_key_serialization_version: 2

Définition des features features.py

from datetime import timedelta
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.types import Float64, Int64, String

# 1. Définir l'entité (la clé)
customer = Entity(
    name="customer_id",
    value_type=ValueType.STRING,
    description="Unique customer identifier"
)

# 2. Définir la source de données
customer_features_source = FileSource(
    path="data/customer_features.parquet",
    timestamp_field="feature_timestamp",
)

# 3. Définir la Feature View
customer_features_view = FeatureView(
    name="customer_features",
    entities=[customer],
    ttl=timedelta(days=1),  # Time-to-live dans l'online store
    schema=[
        Feature(name="total_transactions", dtype=Int64),
        Feature(name="total_spent", dtype=Float64),
        Feature(name="avg_transaction_amount", dtype=Float64),
        Feature(name="recency_days", dtype=Int64),
        Feature(name="is_active_30d", dtype=Int64),
        Feature(name="favorite_category", dtype=String),
    ],
    source=customer_features_source,
    online=True,  # Matérialiser dans l'online store
)

Commandes Feast

# Appliquer les définitions
feast apply

# Matérialiser dans l'online store
feast materialize 2024-01-01 2024-02-10

# Matérialisation incrémentale
feast materialize-incremental $(date +%Y-%m-%d)

Utilisation Python

from feast import FeatureStore
from datetime import datetime
import pandas as pd

store = FeatureStore(repo_path=".")

# ═══════════════════════════════════════════════════════════════
# OFFLINE : Récupérer des features historiques (training)
# ═══════════════════════════════════════════════════════════════

entity_df = pd.DataFrame({
    "customer_id": ["C001", "C002", "C003"],
    "event_timestamp": [datetime(2024, 2, 1)] * 3
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_features:total_transactions",
        "customer_features:total_spent",
        "customer_features:recency_days",
    ]
).to_df()

# ═══════════════════════════════════════════════════════════════
# ONLINE : Récupérer les features en temps réel (serving)
# ═══════════════════════════════════════════════════════════════

online_features = store.get_online_features(
    features=[
        "customer_features:total_spent",
        "customer_features:is_active_30d",
    ],
    entity_rows=[{"customer_id": "C001"}]
).to_dict()

print(online_features)
# {'customer_id': ['C001'], 'total_spent': [375.0], 'is_active_30d': [1]}

3.4 Pipeline d’alimentation du Feature Store

# feature_pipeline_dag.py (Airflow)

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': True,
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'feature_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
) as dag:
    
    # 1. Calculer les features avec Spark
    compute_features = SparkSubmitOperator(
        task_id='compute_customer_features',
        application='/jobs/compute_features.py',
        application_args=['--date', '{{ ds }}'],
        conf={
            'spark.executor.memory': '4g',
            'spark.executor.cores': '2',
        },
    )
    
    # 2. Valider les features
    validate_features = PythonOperator(
        task_id='validate_features',
        python_callable=run_feature_validation,
        op_kwargs={'date': '{{ ds }}'},
    )
    
    # 3. Matérialiser dans le Feature Store
    materialize = PythonOperator(
        task_id='materialize_features',
        python_callable=materialize_to_feast,
        op_kwargs={'end_date': '{{ ds }}'},
    )
    
    compute_features >> validate_features >> materialize

4. Training Data Pipelines

4.1 Générer des Datasets Reproductibles

Un bon training dataset doit être : - Reproductible : on peut le recréer exactement - Versionné : on sait quelle version a été utilisée - Point-in-time correct : pas de data leakage

4.2 Point-in-Time Joins

┌─────────────────────────────────────────────────────────────────────────────┐
│                       POINT-IN-TIME JOIN                                    │
│                                                                             │
│   Events (ce qu'on prédit)          Features (inputs du modèle)            │
│   ─────────────────────             ─────────────────────────              │
│                                                                             │
│   customer_id | event_date          customer_id | feature_date | features  │
│   C001        | 2024-02-01          C001        | 2024-01-15   | {...}     │
│   C001        | 2024-02-15          C001        | 2024-02-01   | {...}     │
│                                     C001        | 2024-02-10   | {...}     │
│                                                                             │
│   Pour l'event du 2024-02-01 :                                             │
│   → Utiliser les features du 2024-01-15 (la plus récente AVANT l'event)    │
│                                                                             │
│   Pour l'event du 2024-02-15 :                                             │
│   → Utiliser les features du 2024-02-10 (la plus récente AVANT l'event)    │
└─────────────────────────────────────────────────────────────────────────────┘
Voir le code
# ═══════════════════════════════════════════════════════════════
# POINT-IN-TIME JOIN avec Spark
# ═══════════════════════════════════════════════════════════════

# Créer des données d'exemple

# Events : ce qu'on veut prédire (ex: churn, achat)
events_data = [
    ("C001", "2024-02-01", 1),  # Client C001 a churné le 1er février
    ("C002", "2024-02-05", 0),  # Client C002 n'a pas churné
    ("C003", "2024-02-10", 1),  # Client C003 a churné le 10 février
]
events_df = spark.createDataFrame(
    events_data, 
    ["customer_id", "event_date", "label"]
).withColumn("event_date", F.to_date("event_date"))

# Features avec timestamps (plusieurs versions par client)
features_data = [
    ("C001", "2024-01-15", 5, 500.0),
    ("C001", "2024-01-25", 6, 550.0),
    ("C002", "2024-01-20", 3, 200.0),
    ("C002", "2024-02-01", 4, 280.0),
    ("C003", "2024-01-10", 10, 1000.0),
    ("C003", "2024-02-05", 11, 1100.0),
]
features_df = spark.createDataFrame(
    features_data,
    ["customer_id", "feature_date", "total_transactions", "total_spent"]
).withColumn("feature_date", F.to_date("feature_date"))

print("📅 Events (labels) :")
events_df.show()

print("📊 Features (avec historique) :")
features_df.show()
Voir le code
def point_in_time_join(events_df, features_df, entity_col, event_ts_col, feature_ts_col):
    """
    Join point-in-time correct : pour chaque event, récupérer
    les features les plus récentes AVANT l'event.
    
    Args:
        events_df: DataFrame avec les events et leurs timestamps
        features_df: DataFrame avec les features et leurs timestamps
        entity_col: Colonne de jointure (ex: customer_id)
        event_ts_col: Colonne timestamp dans events_df
        feature_ts_col: Colonne timestamp dans features_df
    
    Returns:
        DataFrame avec events + features point-in-time correct
    """
    
    # 1. Joindre sur l'entité + feature_date < event_date
    joined = events_df.alias("e").join(
        features_df.alias("f"),
        (F.col(f"e.{entity_col}") == F.col(f"f.{entity_col}")) &
        (F.col(f"f.{feature_ts_col}") < F.col(f"e.{event_ts_col}")),
        "left"
    )
    
    # 2. Garder uniquement la feature la plus récente avant l'event
    window = Window.partitionBy(f"e.{entity_col}", f"e.{event_ts_col}") \
                   .orderBy(F.col(f"f.{feature_ts_col}").desc())
    
    result = joined \
        .withColumn("_rank", F.row_number().over(window)) \
        .filter(F.col("_rank") == 1) \
        .drop("_rank", f"f.{entity_col}")
    
    return result

# Exécuter le join point-in-time
training_data = point_in_time_join(
    events_df, 
    features_df,
    entity_col="customer_id",
    event_ts_col="event_date",
    feature_ts_col="feature_date"
)

print("🎯 Training Dataset (Point-in-Time Correct) :")
training_data.select(
    "customer_id", "event_date", "label", 
    "feature_date", "total_transactions", "total_spent"
).show()

# Vérification : feature_date est toujours < event_date ✓

4.3 Dataset Versioning avec Delta Lake

# Sauvegarder le training dataset avec versioning

training_data.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("data/training_datasets/churn_model")

# Ajouter des métadonnées
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "data/training_datasets/churn_model")

# Voir l'historique des versions
delta_table.history().select(
    "version", "timestamp", "operation", "operationMetrics"
).show(truncate=False)

# Time travel : récupérer une version spécifique
training_v2 = spark.read \
    .format("delta") \
    .option("versionAsOf", 2) \
    .load("data/training_datasets/churn_model")

# Ou par timestamp
training_at_date = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2024-01-15 10:00:00") \
    .load("data/training_datasets/churn_model")

4.4 Data Splits

def create_train_val_test_split(df, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, seed=42):
    """
    Split un dataset en train/validation/test de manière reproductible.
    
    Pour les données temporelles, préférer un split par date !
    """
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 0.001
    
    # Random split
    train_df, val_df, test_df = df.randomSplit(
        [train_ratio, val_ratio, test_ratio], 
        seed=seed
    )
    
    return train_df, val_df, test_df

def create_temporal_split(df, date_col, train_end, val_end):
    """
    Split temporel (recommandé pour éviter le leakage) :
    - Train : données avant train_end
    - Val : données entre train_end et val_end  
    - Test : données après val_end
    """
    train_df = df.filter(F.col(date_col) < train_end)
    val_df = df.filter(
        (F.col(date_col) >= train_end) & 
        (F.col(date_col) < val_end)
    )
    test_df = df.filter(F.col(date_col) >= val_end)
    
    return train_df, val_df, test_df

5. Data Validation pour ML

5.1 Pourquoi la Data Quality est Critique pour ML

"Garbage In, Garbage Out" — mais en pire pour le ML !

Problème de data → Modèle apprend le bruit → Prédictions fausses en production
Problème de données Impact sur le ML
Missing values Modèle biaisé ou crash
Outliers extrêmes Poids aberrants
Data leakage Métriques sur-optimistes, crash en prod
Distribution drift Performance dégradée en prod
Class imbalance non détecté Modèle prédit toujours la classe majoritaire

5.2 Great Expectations pour ML Data

import great_expectations as gx

# Créer le contexte
context = gx.get_context()

# Créer une expectation suite pour les features ML
suite = context.add_expectation_suite("ml_features_validation")
Voir le code
# ═══════════════════════════════════════════════════════════════
# VALIDATIONS ML avec Great Expectations (conceptuel)
# ═══════════════════════════════════════════════════════════════

ml_feature_expectations = {
    "expectations": [
        # ─────────────────────────────────────────────────────────
        # 1. Complétude : pas de valeurs manquantes sur les features critiques
        # ─────────────────────────────────────────────────────────
        {
            "expectation_type": "expect_column_values_to_not_be_null",
            "kwargs": {"column": "customer_id"}
        },
        {
            "expectation_type": "expect_column_values_to_not_be_null",
            "kwargs": {"column": "total_transactions"}
        },
        
        # ─────────────────────────────────────────────────────────
        # 2. Plage de valeurs : détecter les outliers
        # ─────────────────────────────────────────────────────────
        {
            "expectation_type": "expect_column_values_to_be_between",
            "kwargs": {
                "column": "total_spent",
                "min_value": 0,
                "max_value": 100000  # Alerter si > 100k
            }
        },
        {
            "expectation_type": "expect_column_values_to_be_between",
            "kwargs": {
                "column": "recency_days",
                "min_value": 0,
                "max_value": 365
            }
        },
        
        # ─────────────────────────────────────────────────────────
        # 3. Distribution : moyennes et écarts-types attendus
        # ─────────────────────────────────────────────────────────
        {
            "expectation_type": "expect_column_mean_to_be_between",
            "kwargs": {
                "column": "avg_transaction_amount",
                "min_value": 50,
                "max_value": 500
            }
        },
        {
            "expectation_type": "expect_column_stdev_to_be_between",
            "kwargs": {
                "column": "total_spent",
                "min_value": 10,
                "max_value": 5000
            }
        },
        
        # ─────────────────────────────────────────────────────────
        # 4. Cardinalité : vérifier les catégories
        # ─────────────────────────────────────────────────────────
        {
            "expectation_type": "expect_column_values_to_be_in_set",
            "kwargs": {
                "column": "favorite_category",
                "value_set": ["Electronics", "Clothing", "Food", "Other", None]
            }
        },
        
        # ─────────────────────────────────────────────────────────
        # 5. Unicité : pas de doublons
        # ─────────────────────────────────────────────────────────
        {
            "expectation_type": "expect_column_values_to_be_unique",
            "kwargs": {"column": "customer_id"}
        },
        
        # ─────────────────────────────────────────────────────────
        # 6. Volume : nombre de lignes attendu
        # ─────────────────────────────────────────────────────────
        {
            "expectation_type": "expect_table_row_count_to_be_between",
            "kwargs": {
                "min_value": 1000,
                "max_value": 1000000
            }
        },
    ]
}

print("✅ Expectations ML définies :")
for exp in ml_feature_expectations["expectations"]:
    print(f"  - {exp['expectation_type']} sur {exp['kwargs'].get('column', 'table')}")

5.3 Validations Spécifiques ML

# ═══════════════════════════════════════════════════════════════
# Validation du label (target)
# ═══════════════════════════════════════════════════════════════

def validate_classification_labels(df, label_col, expected_classes):
    """
    Valider les labels pour un problème de classification.
    """
    # 1. Pas de nulls dans le label
    null_count = df.filter(F.col(label_col).isNull()).count()
    assert null_count == 0, f"Found {null_count} null labels!"
    
    # 2. Labels dans les classes attendues
    actual_classes = set(df.select(label_col).distinct().toPandas()[label_col].tolist())
    unexpected = actual_classes - set(expected_classes)
    assert len(unexpected) == 0, f"Unexpected labels: {unexpected}"
    
    # 3. Vérifier le class imbalance
    class_counts = df.groupBy(label_col).count().toPandas()
    min_count = class_counts['count'].min()
    max_count = class_counts['count'].max()
    imbalance_ratio = max_count / min_count
    
    if imbalance_ratio > 10:
        print(f"⚠️ WARNING: Class imbalance ratio = {imbalance_ratio:.1f}")
        print(f"   Consider using class weights or resampling.")
    
    return True

# ═══════════════════════════════════════════════════════════════
# Validation anti-leakage
# ═══════════════════════════════════════════════════════════════

def validate_no_leakage(df, event_date_col, feature_date_col):
    """
    Vérifier qu'aucune feature n'est du futur par rapport à l'event.
    """
    leakage_count = df.filter(
        F.col(feature_date_col) >= F.col(event_date_col)
    ).count()
    
    if leakage_count > 0:
        raise ValueError(f"🚨 DATA LEAKAGE DETECTED! {leakage_count} rows have future features!")
    
    print("✅ No data leakage detected")
    return True

6. Serving Data Infrastructure

6.1 Patterns de Serving

Pattern Latence Use Case
Batch precompute Minutes-Hours Scoring quotidien, recommendations
Online store lookup <10ms Personnalisation temps réel
On-demand compute 100ms+ Features complexes à la demande
┌─────────────────────────────────────────────────────────────────────────────┐
│                     SERVING DATA PATTERNS                                   │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │  BATCH PRECOMPUTE                                                   │  │
│   │                                                                     │  │
│   │  Features ──▶ Batch Scoring ──▶ Predictions Table ──▶ Application  │  │
│   │  (Spark)       (Spark MLlib)     (Delta/Postgres)      (lookup)    │  │
│   │                                                                     │  │
│   │  ⏱️ Latency: Minutes/Hours    👍 Simple, scalable                   │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │  ONLINE STORE LOOKUP                                                │  │
│   │                                                                     │  │
│   │  Request ──▶ Feature Store ──▶ ML Model ──▶ Response               │  │
│   │              (Redis <10ms)     (API)        (real-time)            │  │
│   │                                                                     │  │
│   │  ⏱️ Latency: <50ms total      👍 Real-time personalization         │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │  ON-DEMAND COMPUTE                                                  │  │
│   │                                                                     │  │
│   │  Request ──▶ Compute Features ──▶ ML Model ──▶ Response            │  │
│   │              (on-the-fly)         (API)        (computed)          │  │
│   │                                                                     │  │
│   │  ⏱️ Latency: 100ms+           👍 Always fresh, complex features    │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 Batch Scoring Pipeline

# batch_scoring_dag.py (Airflow)

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG('batch_scoring', schedule_interval='@daily') as dag:
    
    # 1. Récupérer les features du jour
    prepare_features = SparkSubmitOperator(
        task_id='prepare_scoring_features',
        application='/jobs/prepare_features.py',
    )
    
    # 2. Scorer avec le modèle
    score = SparkSubmitOperator(
        task_id='batch_score',
        application='/jobs/batch_score.py',
        application_args=['--model-uri', 'models:/churn_model/Production'],
    )
    
    # 3. Écrire les prédictions
    write_predictions = SparkSubmitOperator(
        task_id='write_predictions',
        application='/jobs/write_predictions.py',
    )
    
    prepare_features >> score >> write_predictions

6.3 Online Store avec Redis

import redis
import json

# Connexion Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# ═══════════════════════════════════════════════════════════════
# Écriture : Matérialisation des features
# ═══════════════════════════════════════════════════════════════

def materialize_to_redis(features_df):
    """Écrire les features dans Redis pour serving temps réel."""
    
    for row in features_df.collect():
        customer_id = row['customer_id']
        features = {
            'total_transactions': row['total_transactions'],
            'total_spent': row['total_spent'],
            'recency_days': row['recency_days'],
            'is_active_30d': row['is_active_30d'],
        }
        
        # Clé : customer_features:{customer_id}
        r.hset(f"customer_features:{customer_id}", mapping=features)
        
        # TTL : 24 heures
        r.expire(f"customer_features:{customer_id}", 86400)

# ═══════════════════════════════════════════════════════════════
# Lecture : Serving temps réel
# ═══════════════════════════════════════════════════════════════

def get_features_for_scoring(customer_id):
    """Récupérer les features pour le scoring temps réel."""
    
    features = r.hgetall(f"customer_features:{customer_id}")
    
    if not features:
        return None
    
    # Convertir bytes → types Python
    return {
        k.decode(): float(v.decode())
        for k, v in features.items()
    }

# Usage
# features = get_features_for_scoring("C001")
# prediction = model.predict([features])

7. Data Monitoring pour ML

7.1 Types de Drift

Type Description Ce que le DE monitore
Data Drift Distribution des inputs change ✅ Features distributions
Concept Drift Relation input→output change ⚠️ Alerter le DS
Prediction Drift Distribution des outputs change ⚠️ Alerter le DS
┌─────────────────────────────────────────────────────────────────────────────┐
│                         DATA DRIFT DETECTION                                │
│                                                                             │
│   Training Data Distribution          Production Data Distribution          │
│   ─────────────────────────           ─────────────────────────            │
│                                                                             │
│   amount:                             amount:                               │
│   mean = 150                          mean = 280  ← DRIFT!                 │
│   std = 50                            std = 120   ← DRIFT!                 │
│                                                                             │
│        ▲                                    ▲                               │
│       ╱╲                                  ╱    ╲                            │
│      ╱  ╲                               ╱      ╲                            │
│     ╱    ╲                            ╱         ╲                           │
│   ─╱──────╲───▶                    ──╱───────────╲──▶                       │
│                                                                             │
│   Si drift détecté → Alerter → Potentiellement retrainer le modèle         │
└─────────────────────────────────────────────────────────────────────────────┘

7.2 Evidently AI pour Data Monitoring

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metrics import (
    DataDriftTable,
    DatasetDriftMetric,
    ColumnDriftMetric,
)
from evidently.test_suite import TestSuite
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
)

# ═══════════════════════════════════════════════════════════════
# RAPPORT DE DRIFT
# ═══════════════════════════════════════════════════════════════

# reference_data = données de training
# current_data = données de production récentes

column_mapping = ColumnMapping(
    numerical_features=['total_transactions', 'total_spent', 'avg_transaction_amount'],
    categorical_features=['favorite_category'],
)

# Créer le rapport de drift
drift_report = Report(metrics=[
    DatasetDriftMetric(),
    DataDriftTable(),
])

drift_report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

# Sauvegarder en HTML
drift_report.save_html("reports/drift_report.html")

# Ou obtenir les résultats en dict
results = drift_report.as_dict()
dataset_drift = results['metrics'][0]['result']['dataset_drift']
print(f"Dataset drift detected: {dataset_drift}")

# ═══════════════════════════════════════════════════════════════
# TESTS AUTOMATISÉS (pour CI/CD)
# ═══════════════════════════════════════════════════════════════

drift_tests = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),  # Moins de 30% de colonnes en drift
    TestColumnDrift(column_name='total_spent'),
    TestColumnDrift(column_name='recency_days'),
])

drift_tests.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)

# Vérifier si les tests passent
test_results = drift_tests.as_dict()
all_passed = all(t['status'] == 'SUCCESS' for t in test_results['tests'])

if not all_passed:
    print("🚨 DRIFT ALERT: Some tests failed!")
    # Envoyer une alerte (Slack, PagerDuty, etc.)
Voir le code
# ═══════════════════════════════════════════════════════════════
# MONITORING PIPELINE SIMPLIFIÉ (sans Evidently)
# ═══════════════════════════════════════════════════════════════

def compute_feature_stats(df, feature_cols):
    """
    Calculer les statistiques de base pour le monitoring.
    """
    stats = {}
    
    for col in feature_cols:
        col_stats = df.select(
            F.mean(col).alias("mean"),
            F.stddev(col).alias("std"),
            F.min(col).alias("min"),
            F.max(col).alias("max"),
            F.expr(f"percentile_approx({col}, 0.5)").alias("median"),
            (F.count(F.when(F.col(col).isNull(), 1)) / F.count("*")).alias("null_rate"),
        ).collect()[0]
        
        stats[col] = {
            "mean": col_stats["mean"],
            "std": col_stats["std"],
            "min": col_stats["min"],
            "max": col_stats["max"],
            "median": col_stats["median"],
            "null_rate": col_stats["null_rate"],
        }
    
    return stats

def detect_drift(reference_stats, current_stats, threshold=0.2):
    """
    Détecter le drift en comparant les statistiques.
    
    Méthode simple : alerte si la moyenne change de plus de threshold%.
    """
    alerts = []
    
    for col in reference_stats:
        ref_mean = reference_stats[col]["mean"]
        cur_mean = current_stats[col]["mean"]
        
        if ref_mean != 0:
            pct_change = abs(cur_mean - ref_mean) / abs(ref_mean)
            
            if pct_change > threshold:
                alerts.append({
                    "column": col,
                    "reference_mean": ref_mean,
                    "current_mean": cur_mean,
                    "pct_change": pct_change * 100,
                })
    
    return alerts

# Exemple d'utilisation
feature_cols = ["total_transactions", "total_spent", "avg_transaction_amount"]

# Simuler des données de référence et actuelles
reference_stats = compute_feature_stats(customer_features, feature_cols)

# Simuler un drift (en production, ce serait les nouvelles données)
drifted_data = customer_features.withColumn(
    "total_spent", F.col("total_spent") * 1.5  # +50% drift
)
current_stats = compute_feature_stats(drifted_data, feature_cols)

# Détecter le drift
alerts = detect_drift(reference_stats, current_stats, threshold=0.2)

print("📊 Monitoring Results:")
if alerts:
    print("🚨 DRIFT DETECTED:")
    for alert in alerts:
        print(f"   - {alert['column']}: {alert['pct_change']:.1f}% change")
        print(f"     (reference: {alert['reference_mean']:.2f} → current: {alert['current_mean']:.2f})")
else:
    print("✅ No significant drift detected")

8. MLflow pour Data Engineers

En tant que Data Engineer, tu n’as pas besoin de tout connaître de MLflow. Voici ce qui te concerne.

8.1 Ce qu’un DE doit savoir

Composant MLflow Responsabilité DE Responsabilité DS/MLE
Tracking Logger les datasets utilisés Logger les métriques, paramètres
Projects Packager le code
Models Sauvegarder les modèles
Registry Savoir quel modèle est en Production Promouvoir les modèles

8.2 Logger les Datasets avec MLflow

import mlflow

# Dans ton pipeline de feature engineering
def log_dataset_to_mlflow(df, dataset_name, run_id=None):
    """
    Logger les métadonnées du dataset pour traçabilité.
    """
    with mlflow.start_run(run_id=run_id):
        # Logger les infos du dataset
        mlflow.log_param(f"{dataset_name}_rows", df.count())
        mlflow.log_param(f"{dataset_name}_cols", len(df.columns))
        mlflow.log_param(f"{dataset_name}_columns", ",".join(df.columns))
        
        # Logger les statistiques
        stats = df.describe().toPandas()
        stats.to_csv(f"/tmp/{dataset_name}_stats.csv")
        mlflow.log_artifact(f"/tmp/{dataset_name}_stats.csv")
        
        # Logger le chemin du dataset
        mlflow.log_param(f"{dataset_name}_path", f"s3://data/features/{dataset_name}")

8.3 Récupérer le modèle en Production

import mlflow

# Pour le batch scoring, récupérer le modèle en Production
model_name = "churn_model"

# Méthode 1 : par stage
model = mlflow.pyfunc.load_model(f"models:/{model_name}/Production")

# Méthode 2 : par version
model = mlflow.pyfunc.load_model(f"models:/{model_name}/3")

# Scorer
predictions = model.predict(features_df.toPandas())

8.4 MLflow avec Spark

import mlflow.spark

# Logger un modèle Spark MLlib
mlflow.spark.log_model(spark_model, "model")

# Charger pour batch scoring
loaded_model = mlflow.spark.load_model("models:/my_spark_model/Production")

# Scorer directement sur un DataFrame Spark
predictions_df = loaded_model.transform(features_df)

9. Exercices Pratiques

Exercice 1 : Feature Pipeline avec Window Functions

Créer un pipeline de features pour un modèle de détection de fraude avec : - Montant moyen des 5 dernières transactions - Nombre de transactions dans l’heure précédente - Écart par rapport au montant moyen habituel


Exercice 2 : Setup Feast Local

  1. Installer Feast
  2. Créer un projet avec les features customer
  3. Matérialiser les features
  4. Récupérer des features historiques

Exercice 3 : Training Dataset Point-in-Time

Créer un training dataset pour prédire le churn avec : - Events : clients qui ont churné (label=1) ou non (label=0) - Features : récupérées 7 jours AVANT l’event - Validation : vérifier qu’il n’y a pas de leakage


Exercice 4 : Data Validation Pipeline

Implémenter un pipeline de validation avec : - Vérification des nulls - Vérification des plages de valeurs - Détection d’outliers - Intégration Airflow


Exercice 5 : Data Drift Detection

  1. Créer un dataset de référence
  2. Simuler un drift sur certaines features
  3. Implémenter la détection automatique
  4. Générer une alerte si drift > 20%

10. Mini-Projet : ML Data Platform

Objectif

Construire une plateforme data complète pour alimenter un modèle de prédiction de churn.

┌─────────────────────────────────────────────────────────────────────────────┐
│                      MINI-PROJET : ML DATA PLATFORM                         │
│                                                                             │
│   Raw Data (CSV)                                                            │
│        │                                                                    │
│        ▼                                                                    │
│   ┌─────────────────┐                                                       │
│   │ Data Ingestion  │  Bronze Layer (Delta)                                 │
│   │   (Spark)       │                                                       │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│   ┌─────────────────┐                                                       │
│   │ Data Validation │  Great Expectations                                   │
│   │                 │                                                       │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│   ┌─────────────────┐                                                       │
│   │ Feature Pipeline│  Silver Layer (Features)                              │
│   │   (Spark)       │                                                       │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│   ┌─────────────────┐     ┌─────────────────┐                               │
│   │  Feature Store  │────▶│ Training Dataset│  Point-in-time correct       │
│   │    (Feast)      │     │   Generator     │                               │
│   └────────┬────────┘     └─────────────────┘                               │
│            │                                                                │
│            ▼                                                                │
│   ┌─────────────────┐                                                       │
│   │ Data Monitoring │  Drift detection                                      │
│   └─────────────────┘                                                       │
└─────────────────────────────────────────────────────────────────────────────┘

Livrables

  1. Data Ingestion : Script Spark pour ingérer les CSV → Delta
  2. Validation : Suite Great Expectations pour les données brutes
  3. Feature Pipeline : Job Spark complet avec toutes les features
  4. Feature Store : Configuration Feast + matérialisation
  5. Training Dataset : Générateur avec point-in-time join
  6. Monitoring : Script de détection de drift
  7. Orchestration : DAG Airflow qui orchestre le tout

Données

Utiliser les transactions créées dans ce notebook + générer des events de churn.

Critères de succès


📚 Ressources

Documentation

Articles

Outils

  • Feast — Feature Store open-source
  • Tecton — Feature Store managed
  • DVC — Data versioning

➡️ Prochaine étape

👉 Module suivant : 32_data_mesh_contracts — Data Mesh & Data Contracts


🎉 Félicitations ! Tu maîtrises maintenant l’infrastructure data pour le ML.

Retour au sommet