dbt (Data Build Tool) & Data Quality

Industrialiser les Transformations et Garantir la Qualité

Bienvenue dans ce module où tu vas apprendre à industrialiser tes transformations SQL et à garantir la qualité de tes données avec les outils standards de l’industrie.


Prérequis

Module Compétence Pourquoi ?
✅ 20 Spark SQL SQL avancé
✅ 23 Table Formats Delta Lake comme cible
✅ 24 Streaming Données incrémentales

Objectifs

À la fin de ce module, tu seras capable de :

  • Comprendre le concept de Transformation-as-Code (TaC)
  • Maîtriser dbt : Models, Sources, Tests, Documentation
  • Implémenter des tests de qualité avec dbt et Great Expectations
  • Créer des modèles incrémentaux performants
  • Déployer dbt dans un pipeline CI/CD

1. Introduction — Le Problème à Résoudre

1.1 Le chaos des transformations SQL

Imagine une équipe data typique sans dbt :

SITUATION TYPIQUE SANS DBT :

📁 scripts/
├── transform_users_v2_final_FINAL.sql      ← Quelle version utiliser ?
├── transform_users_v2_final.sql
├── transform_users_v2.sql
├── create_dashboard_table_john.sql         ← Qui a écrit ça ?
├── fix_bug_urgent.sql                      ← Ça fait quoi exactement ?
└── old/
    └── ... 50 fichiers abandonnés

PROBLÈMES :
• Aucune traçabilité (qui a modifié quoi ?)
• Pas de tests (les données sont-elles correctes ?)
• Pas de documentation (c'est quoi cette colonne ?)
• Ordre d'exécution manuel (quelle requête lancer en premier ?)
• Pas de review de code (erreurs passent en production)

1.2 La solution : Transformation-as-Code (TaC)

Transformation-as-Code applique les principes du développement logiciel aux transformations de données :

Principe Dev Application Data
Versioning Transformations dans Git
Tests unitaires Tests sur les données
Documentation Auto-générée depuis le code
Code Review PR avant mise en production
CI/CD Déploiement automatisé

1.3 Qu’est-ce que dbt ?

dbt (Data Build Tool) est un outil open-source qui permet de transformer les données dans ton Data Warehouse/Lakehouse en utilisant uniquement du SQL.

┌─────────────────────────────────────────────────────────────────┐
│                        dbt - Data Build Tool                    │
│                                                                 │
│   ENTRÉE              TRAITEMENT              SORTIE            │
│   ┌──────────┐       ┌──────────────┐       ┌──────────┐       │
│   │  Fichiers│       │              │       │  Tables  │       │
│   │  SQL +   │  ───▶ │  dbt run     │  ───▶ │  ou Vues │       │
│   │  Jinja   │       │              │       │  testées │       │
│   └──────────┘       └──────────────┘       └──────────┘       │
│                                                                 │
│   + Tests automatiques                                          │
│   + Documentation générée                                       │
│   + Lineage (traçabilité)                                       │
│   + Gestion des dépendances                                     │
└─────────────────────────────────────────────────────────────────┘

1.4 Ce que dbt fait et ne fait PAS

dbt FAIT ✅ dbt NE FAIT PAS ❌
Transformer (T de ELT) Extraire les données (E)
Tester les données Charger les données (L)
Documenter Orchestrer (scheduling)
Gérer les dépendances Calcul distribué
Versionner Ingestion temps réel

dbt s’intègre avec : Spark (pour le calcul), Airflow (pour l’orchestration), Delta/Iceberg (pour le stockage).

1.5 dbt Core vs dbt Cloud

Aspect dbt Core dbt Cloud
Prix Gratuit (open-source) Payant (SaaS)
Installation CLI local IDE web
Scheduling Manuel (Airflow, cron) Intégré
CI/CD À configurer Intégré
IDE VS Code, vim, etc. IDE web dédié
Collaboration Via Git Via plateforme

Dans ce module, nous utilisons dbt Core (CLI) car c’est ce que tu utiliseras dans un environnement Spark/Kubernetes.

1.6 dbt dans l’architecture Data

Où se place dbt dans ton architecture ?

┌─────────────────────────────────────────────────────────────────────────┐
│                         ARCHITECTURE MODERNE                            │
│                                                                         │
│   INGESTION          STOCKAGE           TRANSFORMATION      SERVING     │
│   ┌─────────┐       ┌─────────┐        ┌─────────────┐    ┌─────────┐  │
│   │ Kafka   │       │ Bronze  │        │             │    │ BI Tool │  │
│   │ Spark   │  ───▶ │ (Raw)   │  ───▶  │    dbt      │───▶│ ML      │  │
│   │ Airbyte │       │ Silver  │        │             │    │ API     │  │
│   └─────────┘       │ Gold    │        └─────────────┘    └─────────┘  │
│                     └─────────┘               │                        │
│                     Delta Lake            Tests &                      │
│                     Iceberg               Documentation                │
│                                                                         │
│   ┌─────────────────────────────────────────────────────────────────┐  │
│   │                    ORCHESTRATION (Airflow / K8s)                │  │
│   └─────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘

2. Installation et Structure d’un Projet dbt

2.1 Installation

dbt s’installe via pip avec un adapter spécifique à ta base de données :

Voir le code
# Installation de dbt

installation = '''
# Adapter selon ton Data Warehouse/Lakehouse :

pip install dbt-core              # Core (obligatoire)

# + UN adapter selon ta cible :
pip install dbt-postgres          # PostgreSQL
pip install dbt-snowflake         # Snowflake
pip install dbt-bigquery          # BigQuery
pip install dbt-spark             # Spark (Databricks, EMR, local)
pip install dbt-duckdb            # DuckDB (local, léger)
pip install dbt-trino             # Trino/Presto

# Vérifier l'installation
dbt --version
'''

print(installation)
print("\n💡 Pour ce module, on utilise dbt-duckdb (léger, local) ou dbt-spark.")

2.2 Initialisation d’un projet

La commande dbt init crée la structure de base :

Voir le code
# Créer un nouveau projet dbt

init_project = '''
# Créer un nouveau projet
dbt init my_dbt_project

# Structure créée :
my_dbt_project/

├── dbt_project.yml          # Configuration principale du projet
├── profiles.yml             # Connexions aux bases (souvent dans ~/.dbt/)

├── models/                  # ⭐ TES TRANSFORMATIONS SQL
│   └── example/
│       ├── my_first_model.sql
│       └── schema.yml       # Tests et documentation

├── seeds/                   # Données statiques (CSV)

├── snapshots/               # Historisation (SCD Type 2)

├── macros/                  # Fonctions Jinja réutilisables

├── tests/                   # Tests SQL custom

├── analyses/                # Requêtes ad-hoc (non matérialisées)

└── target/                  # Fichiers compilés (généré)
'''

print(init_project)

2.3 Le fichier dbt_project.yml

C’est le fichier de configuration principal de ton projet. Il définit le nom, la version, et les paramètres par défaut.

Voir le code
# dbt_project.yml - Configuration principale

dbt_project_yml = '''
# dbt_project.yml

name: 'ecommerce_analytics'      # Nom du projet (sans espaces, sans tirets)
version: '1.0.0'
config-version: 2

# Profil de connexion (défini dans profiles.yml)
profile: 'ecommerce_analytics'

# Chemins des différents composants
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"           # Où dbt écrit les fichiers compilés
clean-targets: ["target", "dbt_packages"]

# ═══════════════════════════════════════════════════════════════
# Configuration des modèles (TRÈS IMPORTANT)
# ═══════════════════════════════════════════════════════════════

models:
  ecommerce_analytics:           # Doit correspondre au "name" ci-dessus
    
    # Configuration par défaut pour TOUS les modèles
    +materialized: view          # Par défaut : créer des vues
    
    # Configuration par dossier
    staging:                     # Dossier models/staging/
      +materialized: view
      +schema: staging           # Écrire dans le schéma "staging"
    
    silver:
      +materialized: table       # Tables complètes
      +schema: silver
    
    gold:
      +materialized: incremental # Incrémental pour la performance
      +schema: gold
'''

print(dbt_project_yml)

2.4 Le fichier profiles.yml

Ce fichier contient les informations de connexion à ta base de données. Il est généralement stocké dans ~/.dbt/profiles.yml (pas dans le repo Git pour des raisons de sécurité).

Voir le code
# profiles.yml - Connexions aux bases de données

profiles_yml = '''
# ~/.dbt/profiles.yml

ecommerce_analytics:             # Doit correspondre au "profile" dans dbt_project.yml
  
  target: dev                    # Environnement par défaut
  
  outputs:
    # ═══════════════════════════════════════════════════════════
    # Environnement DEV (développement local)
    # ═══════════════════════════════════════════════════════════
    dev:
      type: duckdb               # Adapter DuckDB (léger, local)
      path: /tmp/dev.duckdb
      threads: 4
    
    # ═══════════════════════════════════════════════════════════
    # Environnement PROD (Spark/Databricks)
    # ═══════════════════════════════════════════════════════════
    prod:
      type: spark
      method: thrift             # ou "odbc", "http"
      host: spark-thrift-server.company.com
      port: 10001
      schema: analytics
      threads: 8
    
    # ═══════════════════════════════════════════════════════════
    # Exemple Snowflake
    # ═══════════════════════════════════════════════════════════
    snowflake_prod:
      type: snowflake
      account: xy12345.us-east-1
      user: "{{ env_var('SNOWFLAKE_USER') }}"      # Variable d'environnement
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role: TRANSFORMER
      database: ANALYTICS
      warehouse: TRANSFORM_WH
      schema: GOLD
      threads: 8
'''

print(profiles_yml)
print("\n💡 Utilise env_var() pour les secrets, jamais en dur !")

3. Les Models — Cœur de dbt

3.1 Qu’est-ce qu’un Model ?

Un model dbt est simplement un fichier SQL qui contient une requête SELECT. dbt se charge de créer la table ou vue correspondante.

FICHIER SQL                           CE QUE DBT FAIT
┌─────────────────────────┐          ┌─────────────────────────┐
│ -- models/users.sql    │          │ CREATE TABLE users AS   │
│                        │   ───▶   │ SELECT                  │
│ SELECT                 │          │   id,                   │
│   id,                  │          │   name,                 │
│   name,                │          │   email                 │
│   email                │          │ FROM raw.users          │
│ FROM raw.users         │          │                         │
└─────────────────────────┘          └─────────────────────────┘

Tu écris le SELECT, dbt gère le CREATE/INSERT !
Voir le code
# Exemple de model simple

model_simple = '''
-- models/staging/stg_orders.sql

-- Configuration du model (optionnel, peut être dans dbt_project.yml)
{{ config(
    materialized='view',
    schema='staging'
) }}

-- Le SELECT qui définit le model
SELECT
    order_id,
    customer_id,
    order_date,
    CAST(amount AS DECIMAL(10, 2)) AS amount,
    status,
    created_at
FROM {{ source('raw', 'orders') }}  -- Référence à une source externe
WHERE order_date >= '2024-01-01'    -- Filtrer les données récentes
'''

print(model_simple)
print("\n💡 Points clés :")
print("• {{ config(...) }} : Configuration spécifique au model")
print("• {{ source(...) }} : Référence à une table externe")
print("• Pas de CREATE TABLE : dbt le génère automatiquement")

3.2 La fonction ref() — La clé de dbt

ref() est LA fonction la plus importante de dbt. Elle permet de référencer un autre model, et dbt : 1. Comprend la dépendance entre les models 2. Exécute les models dans le bon ordre 3. Génère le lineage (traçabilité)

SANS ref() (MAUVAIS)                 AVEC ref() (BON)
┌─────────────────────────┐         ┌─────────────────────────┐
│ SELECT *                │         │ SELECT *                │
│ FROM analytics.users    │         │ FROM {{ ref('users') }} │
│                         │         │                         │
│ ❌ Schéma hardcodé      │         │ ✅ dbt résout le schéma │
│ ❌ Pas de dépendance    │         │ ✅ Dépendance trackée   │
│ ❌ Pas de lineage       │         │ ✅ Lineage automatique  │
└─────────────────────────┘         └─────────────────────────┘
Voir le code
# Exemple avec ref()

model_with_ref = '''
-- models/silver/silver_orders.sql

{{ config(materialized='table') }}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_email,
    o.order_date,
    o.amount,
    o.status
FROM {{ ref('stg_orders') }} o           -- Référence au model stg_orders
LEFT JOIN {{ ref('stg_customers') }} c   -- Référence au model stg_customers
    ON o.customer_id = c.customer_id
WHERE o.status != 'cancelled'
'''

print(model_with_ref)
print("\n📊 Ce que dbt comprend :")
print("")
print("   stg_orders ──────┐")
print("                    ├──▶ silver_orders")
print("   stg_customers ───┘")
print("")
print("dbt exécutera stg_orders et stg_customers AVANT silver_orders.")

3.3 Les Sources — Référencer les données externes

Les sources représentent les données qui existent AVANT dbt (tables raw, données ingérées par Kafka/Spark, etc.).

On les définit dans un fichier schema.yml :

Voir le code
# Définition des sources

sources_yml = '''
# models/staging/schema.yml

version: 2

sources:
  - name: raw                      # Nom logique de la source
    description: "Données brutes ingérées par Kafka/Spark"
    database: bronze               # Base de données (optionnel)
    schema: public                 # Schéma
    
    tables:
      - name: orders
        description: "Commandes brutes"
        columns:
          - name: order_id
            description: "Identifiant unique de la commande"
          - name: amount
            description: "Montant en euros"
        
        # ═══════════════════════════════════════════════════════
        # FRESHNESS : Vérifier que les données arrivent
        # ═══════════════════════════════════════════════════════
        loaded_at_field: created_at  # Colonne de timestamp
        freshness:
          warn_after: {count: 12, period: hour}   # Warning si > 12h
          error_after: {count: 24, period: hour}  # Erreur si > 24h
      
      - name: customers
        description: "Clients"
        freshness:
          warn_after: {count: 24, period: hour}
          error_after: {count: 48, period: hour}
'''

print(sources_yml)
print("\n💡 La freshness permet de détecter les problèmes d'ingestion !")
print("   Commande : dbt source freshness")

3.4 source() vs ref()

Fonction Utilisation Exemple
source() Données externes à dbt Tables raw, Kafka, APIs
ref() Données créées par dbt Autres models
Flux de données :

┌─────────────────┐       ┌─────────────────┐       ┌─────────────────┐
│   RAW TABLES    │       │   STAGING       │       │   SILVER/GOLD   │
│   (externes)    │       │   (dbt)         │       │   (dbt)         │
│                 │       │                 │       │                 │
│   raw.orders    │──────▶│   stg_orders    │──────▶│   silver_orders │
│                 │       │                 │       │                 │
└─────────────────┘       └─────────────────┘       └─────────────────┘
        │                         │                         │
   source()                   ref()                     ref()

3.5 Matérialisations

La matérialisation définit COMMENT dbt crée l’objet dans la base de données :

Matérialisation Description Quand l’utiliser
view Crée une vue SQL Staging, peu de données
table Crée une table (full refresh) Silver, taille moyenne
incremental Ajoute seulement les nouvelles lignes Gold, gros volumes
ephemeral CTE (pas de table créée) Sous-requêtes réutilisables
Voir le code
# Comparaison des matérialisations

materializations = '''
# ═══════════════════════════════════════════════════════════════
# VIEW : Pas de stockage, requête à chaque lecture
# ═══════════════════════════════════════════════════════════════
{{ config(materialized='view') }}
SELECT * FROM {{ source('raw', 'events') }}

-- dbt génère : CREATE VIEW ... AS SELECT ...
-- ✅ Toujours à jour
-- ❌ Lent si requête complexe

# ═══════════════════════════════════════════════════════════════
# TABLE : Stockage physique, reconstruction complète
# ═══════════════════════════════════════════════════════════════
{{ config(materialized='table') }}
SELECT * FROM {{ ref('stg_events') }}

-- dbt génère : CREATE TABLE ... AS SELECT ... (ou DROP + CREATE)
-- ✅ Lecture rapide
-- ❌ Rebuild complet à chaque run

# ═══════════════════════════════════════════════════════════════
# INCREMENTAL : Seulement les nouvelles données
# ═══════════════════════════════════════════════════════════════
{{ config(
    materialized='incremental',
    unique_key='event_id'           # Pour le MERGE/upsert
) }}

SELECT * FROM {{ ref('stg_events') }}
{% if is_incremental() %}           -- Condition pour les runs incrémentaux
WHERE event_time > (SELECT MAX(event_time) FROM {{ this }})
{% endif %}

-- dbt génère : INSERT INTO ... SELECT ... WHERE ...
-- ✅ Très rapide pour gros volumes
-- ❌ Plus complexe à maintenir

# ═══════════════════════════════════════════════════════════════
# EPHEMERAL : CTE, pas de table
# ═══════════════════════════════════════════════════════════════
{{ config(materialized='ephemeral') }}
SELECT DISTINCT customer_id FROM {{ ref('orders') }}

-- dbt l'injecte comme CTE dans les models qui le ref()
-- ✅ Pas de table supplémentaire
-- ❌ Recalculé à chaque utilisation
'''

print(materializations)

Exercice 1 : Créer un projet dbt avec 3 models

Objectif : Créer une structure dbt avec staging → silver.

Structure à créer :

ecommerce_dbt/
├── dbt_project.yml
├── models/
│   ├── staging/
│   │   ├── schema.yml          # Sources + tests
│   │   ├── stg_orders.sql      # Nettoyage orders
│   │   └── stg_customers.sql   # Nettoyage customers
│   └── silver/
│       └── silver_orders.sql   # Join orders + customers
💡 Solution
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
SELECT
    order_id,
    customer_id,
    CAST(amount AS DECIMAL(10,2)) AS amount,
    order_date
FROM {{ source('raw', 'orders') }}

-- models/staging/stg_customers.sql
{{ config(materialized='view') }}
SELECT
    customer_id,
    TRIM(name) AS customer_name,
    LOWER(email) AS email
FROM {{ source('raw', 'customers') }}

-- models/silver/silver_orders.sql
{{ config(materialized='table') }}
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.email,
    o.amount,
    o.order_date
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('stg_customers') }} c USING (customer_id)

4. Tests de Qualité dans dbt

4.1 Pourquoi tester les données ?

Les données, comme le code, peuvent avoir des bugs :

Problème Conséquence Test dbt
Doublons sur order_id Revenus gonflés unique
customer_id manquant Joins échouent not_null
Status invalide Dashboard cassé accepted_values
FK inexistante Données orphelines relationships

4.2 Tests Génériques (built-in)

dbt fournit 4 tests de base, définis dans schema.yml :

Voir le code
# Tests génériques dans schema.yml

generic_tests = '''
# models/silver/schema.yml

version: 2

models:
  - name: silver_orders
    description: "Table des commandes enrichies avec infos clients"
    
    columns:
      - name: order_id
        description: "Identifiant unique de la commande"
        tests:
          - unique              # Pas de doublons
          - not_null            # Jamais NULL
      
      - name: customer_id
        description: "Référence au client"
        tests:
          - not_null
          - relationships:      # Clé étrangère
              to: ref('stg_customers')
              field: customer_id
      
      - name: status
        description: "Statut de la commande"
        tests:
          - accepted_values:    # Valeurs autorisées
              values: ['pending', 'shipped', 'delivered', 'cancelled']
      
      - name: amount
        description: "Montant en euros"
        tests:
          - not_null
'''

print(generic_tests)
print("\n# Exécuter les tests :")
print("dbt test                    # Tous les tests")
print("dbt test --select silver_orders  # Tests d'un model")

4.3 Tests Singuliers (custom)

Pour des validations complexes, tu peux écrire des tests SQL custom. Un test réussit s’il retourne 0 lignes.

Voir le code
# Tests singuliers (custom SQL)

singular_tests = '''
-- tests/assert_positive_amounts.sql

-- Ce test ÉCHOUE si la requête retourne des lignes
-- (on cherche les cas problématiques)

SELECT
    order_id,
    amount
FROM {{ ref('silver_orders') }}
WHERE amount <= 0

-- Si cette requête retourne des lignes = montants négatifs ou nuls = ÉCHEC

-- ═══════════════════════════════════════════════════════════════

-- tests/assert_orders_have_recent_dates.sql

SELECT
    order_id,
    order_date
FROM {{ ref('silver_orders') }}
WHERE order_date < '2020-01-01'
   OR order_date > CURRENT_DATE

-- Détecte les dates suspectes (trop anciennes ou dans le futur)

-- ═══════════════════════════════════════════════════════════════

-- tests/assert_revenue_consistency.sql

-- Vérifier que le total des commandes = total du dashboard
SELECT 1
FROM (
    SELECT SUM(amount) AS orders_total FROM {{ ref('silver_orders') }}
) orders
JOIN (
    SELECT SUM(revenue) AS dashboard_total FROM {{ ref('gold_dashboard') }}
) dashboard
ON 1=1
WHERE ABS(orders_total - dashboard_total) > 0.01  -- Tolérance
'''

print(singular_tests)

4.4 Packages dbt : Tests avancés

Le dbt Hub contient des packages avec des tests supplémentaires. Les plus utiles :

Package Tests fournis
dbt-utils surrogate_key, not_null_proportion, at_least_one
dbt-expectations Tests style Great Expectations dans dbt
dbt-audit-helper compare_relations pour les migrations
Voir le code
# Installation et utilisation des packages dbt

packages = '''
# packages.yml (à la racine du projet)

packages:
  - package: dbt-labs/dbt_utils
    version: 1.1.1
  
  - package: calogica/dbt_expectations
    version: 0.10.1
  
  - package: dbt-labs/audit_helper
    version: 0.9.0

# Installer les packages :
# dbt deps
'''

usage_examples = '''
# Utilisation dans schema.yml

models:
  - name: silver_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      
      - name: amount
        tests:
          # Test dbt-expectations : valeur entre 0 et 10000
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 10000
          
          # Test dbt-utils : au moins 95% non-null
          - dbt_utils.not_null_proportion:
              at_least: 0.95
      
      - name: email
        tests:
          # Test dbt-expectations : format email
          - dbt_expectations.expect_column_values_to_match_regex:
              regex: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
'''

print("# packages.yml")
print(packages)
print("\n# Utilisation des tests avancés")
print(usage_examples)

Exercice 2 : Ajouter des tests de qualité

Objectif : Ajouter des tests génériques et un test singulier.

# 1. Dans schema.yml, ajouter pour silver_orders :
#    - order_id : unique, not_null
#    - amount : not_null
#    - status : accepted_values

# 2. Créer un test singulier qui vérifie :
#    - Aucune commande avec amount > 100000 (fraude potentielle)
💡 Solution
# schema.yml
models:
  - name: silver_orders
    columns:
      - name: order_id
        tests: [unique, not_null]
      - name: amount
        tests: [not_null]
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'cancelled']
-- tests/assert_no_suspicious_amounts.sql
SELECT order_id, amount
FROM {{ ref('silver_orders') }}
WHERE amount > 100000

5. Documentation et Lineage

5.1 Documentation auto-générée

dbt génère automatiquement un site de documentation à partir des fichiers schema.yml :

Voir le code
# Documentation complète dans schema.yml

documentation = '''
# models/silver/schema.yml

version: 2

models:
  - name: silver_orders
    description: |
      Table des commandes enrichies avec les informations clients.
      
      **Source** : Kafka topic `orders` via Spark Streaming
      **Refresh** : Incrémental toutes les heures
      **Owner** : data-engineering@company.com
      
      ## Règles métier
      - Les commandes annulées sont exclues
      - Les montants sont en EUR
    
    columns:
      - name: order_id
        description: "Identifiant unique de la commande (UUID)"
        tests: [unique, not_null]
      
      - name: customer_id
        description: "Référence au client. Voir `stg_customers`."
      
      - name: amount
        description: |
          Montant total de la commande en EUR.
          Inclut les taxes, exclut les frais de livraison.
      
      - name: order_date
        description: "Date de création de la commande (timezone UTC)"
'''

print(documentation)
print("\n# Générer et servir la documentation :")
print("dbt docs generate    # Génère le site dans target/")
print("dbt docs serve       # Lance un serveur local (http://localhost:8080)")

5.2 Le Lineage Graph

Le lineage (traçabilité) montre d’où viennent les données et où elles vont :

LINEAGE GRAPH GÉNÉRÉ PAR DBT :

┌─────────────┐     ┌─────────────┐     ┌─────────────────┐     ┌──────────────┐
│   SOURCE    │     │   STAGING   │     │     SILVER      │     │     GOLD     │
│             │     │             │     │                 │     │              │
│ raw.orders  │────▶│ stg_orders  │────▶│                 │     │              │
│             │     │             │     │  silver_orders  │────▶│ gold_revenue │
│ raw.customers────▶│stg_customers│────▶│                 │     │              │
│             │     │             │     │                 │     │              │
└─────────────┘     └─────────────┘     └─────────────────┘     └──────────────┘

Tu peux voir :
• D'où vient chaque colonne (upstream)
• Qui utilise cette table (downstream)
• L'impact d'un changement

6. Great Expectations — Tests Avancés

6.1 Qu’est-ce que Great Expectations ?

Great Expectations (GE) est un framework Python pour la validation et le profiling des données. Il complète dbt :

Aspect dbt Great Expectations
Focus Transformation + Tests simples Tests avancés + Profiling
Langage SQL/YAML Python
Output Pass/Fail Rapport HTML détaillé
Profiling Non Oui (auto-génération)

6.2 Concepts clés de GE

┌─────────────────────────────────────────────────────────────────┐
│                    GREAT EXPECTATIONS                           │
│                                                                 │
│   DATASOURCE          EXPECTATION SUITE        CHECKPOINT       │
│   ┌──────────┐       ┌──────────────────┐     ┌──────────┐     │
│   │ Connexion│       │ Liste de règles  │     │ Exécution│     │
│   │ aux data │  ───▶ │ (expectations)   │ ───▶│ + Rapport│     │
│   │          │       │                  │     │          │     │
│   │ - Spark  │       │ - not_null       │     │ - Valide │     │
│   │ - Pandas │       │ - between        │     │ - HTML   │     │
│   │ - SQL    │       │ - regex          │     │ - Slack  │     │
│   └──────────┘       └──────────────────┘     └──────────┘     │
│                                                                 │
│   DATA DOCS : Site web avec tous les rapports de validation    │
└─────────────────────────────────────────────────────────────────┘
Voir le code
# Installation et setup de Great Expectations

ge_setup = '''
# Installation
pip install great-expectations

# Initialiser un projet GE
great_expectations init

# Structure créée :
great_expectations/
├── great_expectations.yml      # Configuration principale
├── expectations/               # Suites d'expectations
├── checkpoints/                # Définitions des checkpoints
├── plugins/                    # Extensions custom
└── uncommitted/
    └── data_docs/              # Rapports HTML générés
'''

print(ge_setup)
Voir le code
# Utilisation de Great Expectations avec Python

ge_example = '''
import great_expectations as gx
from great_expectations.core.expectation_configuration import ExpectationConfiguration

# ═══════════════════════════════════════════════════════════════
# 1. Créer un contexte GE
# ═══════════════════════════════════════════════════════════════

context = gx.get_context()

# ═══════════════════════════════════════════════════════════════
# 2. Connecter une datasource (Pandas, Spark, SQL)
# ═══════════════════════════════════════════════════════════════

# Exemple avec Pandas
datasource = context.sources.add_pandas(name="my_pandas_ds")
data_asset = datasource.add_dataframe_asset(name="orders_df")

# Charger les données
import pandas as pd
df = pd.read_parquet("s3://gold/orders/")
batch_request = data_asset.build_batch_request(dataframe=df)

# ═══════════════════════════════════════════════════════════════
# 3. Créer une Expectation Suite
# ═══════════════════════════════════════════════════════════════

suite = context.add_expectation_suite(expectation_suite_name="orders_suite")

# Ajouter des expectations
suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "order_id"}
    )
)

suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "amount",
            "min_value": 0,
            "max_value": 50000,
            "mostly": 0.99  # 99% des valeurs doivent être dans cette plage
        }
    )
)

suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_match_regex",
        kwargs={
            "column": "email",
            "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
        }
    )
)

context.save_expectation_suite(suite)

# ═══════════════════════════════════════════════════════════════
# 4. Créer et exécuter un Checkpoint
# ═══════════════════════════════════════════════════════════════

checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "orders_suite"
        }
    ]
)

# Exécuter la validation
result = checkpoint.run()

print(f"Validation réussie : {result.success}")

# ═══════════════════════════════════════════════════════════════
# 5. Générer les Data Docs (rapport HTML)
# ═══════════════════════════════════════════════════════════════

context.build_data_docs()
context.open_data_docs()  # Ouvre dans le navigateur
'''

print(ge_example)

6.3 Data Profiling automatique

GE peut analyser tes données et générer automatiquement des expectations :

Voir le code
# Profiling automatique avec Great Expectations

profiling = '''
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler

# Créer un profiler
profiler = UserConfigurableProfiler(
    profile_dataset=validator,  # Validator avec les données
    excluded_expectations=None,
    ignored_columns=[],
    not_null_only=False,
    primary_or_compound_key=["order_id"],
    semantic_types_dict=None,
    table_expectations_only=False,
    value_set_threshold="many"  # "many", "few", ou un nombre
)

# Générer la suite d'expectations basée sur les données actuelles
suite = profiler.build_suite()

# Afficher les expectations générées
print(f"Nombre d'expectations générées : {len(suite.expectations)}")
for exp in suite.expectations[:5]:
    print(f"  - {exp.expectation_type}")

# Sauvegarder
context.save_expectation_suite(suite, expectation_suite_name="orders_profiled")
'''

print(profiling)
print("\n💡 Le profiling auto-génère des expectations comme :")
print("• expect_column_values_to_be_in_set (pour les catégories)")
print("• expect_column_mean_to_be_between (pour les numériques)")
print("• expect_column_proportion_of_unique_values_to_be_between")

Exercice 3 : Profiler une table Gold avec Great Expectations

Objectif : Créer une suite d’expectations pour valider une table Gold.

# 1. Charger la table gold_orders dans un DataFrame Pandas
# 2. Créer une expectation suite avec :
#    - order_id : not_null, unique
#    - amount : between 0 and 100000, mostly 0.99
#    - status : in_set ['pending', 'shipped', 'delivered']
# 3. Exécuter la validation
# 4. Générer le rapport HTML
💡 Solution
import great_expectations as gx

context = gx.get_context()

# Créer suite
suite = context.add_expectation_suite("gold_orders_suite")

suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={"column": "amount", "min_value": 0, "max_value": 100000, "mostly": 0.99}
))
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_in_set",
    kwargs={"column": "status", "value_set": ["pending", "shipped", "delivered"]}
))

context.save_expectation_suite(suite)

7. dbt avec Spark/Lakehouse

7.1 Configuration dbt-spark

Pour utiliser dbt avec Spark (Databricks, EMR, local) :

Voir le code
# Configuration dbt-spark

spark_config = '''
# ~/.dbt/profiles.yml pour Spark

ecommerce_analytics:
  target: dev
  outputs:
    
    # ═══════════════════════════════════════════════════════════
    # Databricks
    # ═══════════════════════════════════════════════════════════
    databricks:
      type: databricks
      catalog: main              # Unity Catalog
      schema: analytics
      host: adb-xxx.azuredatabricks.net
      http_path: /sql/1.0/warehouses/xxx
      token: "{{ env_var('DATABRICKS_TOKEN') }}"
      threads: 4
    
    # ═══════════════════════════════════════════════════════════
    # Spark Thrift Server (EMR, local)
    # ═══════════════════════════════════════════════════════════
    spark_thrift:
      type: spark
      method: thrift
      host: spark-thrift.company.com
      port: 10001
      user: dbt_user
      schema: analytics
      threads: 4
    
    # ═══════════════════════════════════════════════════════════
    # Spark local (pour dev)
    # ═══════════════════════════════════════════════════════════
    spark_local:
      type: spark
      method: session
      schema: default
      threads: 1
'''

print(spark_config)

7.2 Matérialisation incrémentale avec Delta Lake

L’incrémental est crucial pour les gros volumes. Voici comment l’implémenter proprement :

Voir le code
# Model incrémental avec Delta Lake

incremental_model = '''
-- models/gold/gold_daily_revenue.sql

{{ config(
    materialized='incremental',
    unique_key='date_customer_key',       -- Pour le MERGE
    incremental_strategy='merge',         -- MERGE INTO (vs append, delete+insert)
    file_format='delta',                  -- Format Delta Lake
    partition_by=['order_date']           -- Partitionnement
) }}

WITH source_data AS (
    SELECT
        order_date,
        customer_id,
        SUM(amount) AS daily_revenue,
        COUNT(*) AS order_count,
        -- Clé unique pour le MERGE
        CONCAT(order_date, '-', customer_id) AS date_customer_key
    FROM {{ ref('silver_orders') }}
    
    -- ═══════════════════════════════════════════════════════════
    -- FILTRE INCRÉMENTAL : Seulement les nouvelles données
    -- ═══════════════════════════════════════════════════════════
    {% if is_incremental() %}
    WHERE order_date > (
        SELECT MAX(order_date) - INTERVAL 1 DAY  -- Marge de sécurité
        FROM {{ this }}
    )
    {% endif %}
    
    GROUP BY order_date, customer_id
)

SELECT * FROM source_data
'''

print(incremental_model)
print("\n💡 Explication :")
print("• is_incremental() : True si la table existe et ce n'est pas un --full-refresh")
print("• {{ this }} : Référence à la table actuelle")
print("• INTERVAL 1 DAY : Marge pour les données en retard (late data)")
print("")
print("# Commandes :")
print("dbt run --select gold_daily_revenue          # Run incrémental")
print("dbt run --select gold_daily_revenue --full-refresh  # Rebuild complet")

7.3 Macros Jinja

Les macros sont des fonctions réutilisables écrites en Jinja. Très utiles pour éviter la duplication de code.

Voir le code
# Macros Jinja réutilisables

macros = '''
-- macros/deduplicate.sql

{% macro deduplicate(relation, partition_by, order_by) %}
    {#
        Déduplique une table en gardant la ligne la plus récente.
        
        Args:
            relation: La table source
            partition_by: Colonne(s) pour identifier les doublons
            order_by: Colonne pour déterminer quelle ligne garder
    #}
    
    SELECT * FROM (
        SELECT
            *,
            ROW_NUMBER() OVER (
                PARTITION BY {{ partition_by }}
                ORDER BY {{ order_by }} DESC
            ) AS _row_num
        FROM {{ relation }}
    )
    WHERE _row_num = 1
{% endmacro %}

-- ═══════════════════════════════════════════════════════════════

-- macros/generate_surrogate_key.sql

{% macro generate_surrogate_key(columns) %}
    {#
        Génère une clé surrogate à partir de plusieurs colonnes.
    #}
    
    MD5(CONCAT(
        {% for col in columns %}
            COALESCE(CAST({{ col }} AS STRING), '_null_')
            {% if not loop.last %}, '-', {% endif %}
        {% endfor %}
    ))
{% endmacro %}

-- ═══════════════════════════════════════════════════════════════

-- macros/cents_to_euros.sql

{% macro cents_to_euros(column_name) %}
    ROUND({{ column_name }} / 100.0, 2)
{% endmacro %}
'''

usage = '''
-- Utilisation dans un model :

-- models/silver/silver_orders.sql

{{ config(materialized='table') }}

WITH deduplicated AS (
    {{ deduplicate(
        relation=ref('stg_orders'),
        partition_by='order_id',
        order_by='updated_at'
    ) }}
)

SELECT
    {{ generate_surrogate_key(['order_id', 'customer_id']) }} AS order_sk,
    order_id,
    customer_id,
    {{ cents_to_euros('amount_cents') }} AS amount,
    order_date
FROM deduplicated
'''

print("# Macros")
print(macros)
print("\n# Utilisation")
print(usage)

Exercice 4 : Créer un model incrémental

Objectif : Créer un model Gold incrémental pour les métriques journalières.

-- Créer gold_daily_metrics qui :
-- 1. Agrège par date : total_orders, total_revenue, avg_order_value
-- 2. Est incrémental sur order_date
-- 3. Utilise une marge de 2 jours pour les late data
💡 Solution
{{ config(
    materialized='incremental',
    unique_key='order_date',
    incremental_strategy='merge'
) }}

SELECT
    order_date,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value
FROM {{ ref('silver_orders') }}
{% if is_incremental() %}
WHERE order_date >= (SELECT MAX(order_date) - INTERVAL 2 DAY FROM {{ this }})
{% endif %}
GROUP BY order_date

8. CI/CD et Orchestration

8.1 Workflow Git avec dbt

WORKFLOW RECOMMANDÉ :

1. DEVELOP (feature branch)
   └─▶ Modifier les models
   └─▶ dbt run --select +modified_model+  (test local)
   └─▶ dbt test --select +modified_model+

2. PULL REQUEST
   └─▶ CI automatique : dbt build (run + test)
   └─▶ Review par un collègue
   └─▶ Merge si tout est vert ✅

3. DEPLOY (main branch)
   └─▶ CD automatique : dbt run --target prod
   └─▶ dbt test --target prod
Voir le code
# GitHub Actions pour dbt CI/CD

github_actions = '''
# .github/workflows/dbt-ci.yml

name: dbt CI/CD

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  # ═══════════════════════════════════════════════════════════════
  # CI : Tests sur Pull Request
  # ═══════════════════════════════════════════════════════════════
  dbt-test:
    runs-on: ubuntu-latest
    if: github.event_name == 'pull_request'
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dbt
        run: pip install dbt-core dbt-snowflake
      
      - name: Setup dbt profile
        run: |
          mkdir -p ~/.dbt
          echo "${{ secrets.DBT_PROFILES }}" > ~/.dbt/profiles.yml
      
      - name: dbt deps
        run: dbt deps
      
      - name: dbt build (run + test)
        run: dbt build --target ci
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

  # ═══════════════════════════════════════════════════════════════
  # CD : Déploiement sur main
  # ═══════════════════════════════════════════════════════════════
  dbt-deploy:
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main' && github.event_name == 'push'
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dbt
        run: pip install dbt-core dbt-snowflake
      
      - name: Setup dbt profile
        run: |
          mkdir -p ~/.dbt
          echo "${{ secrets.DBT_PROFILES }}" > ~/.dbt/profiles.yml
      
      - name: dbt deps
        run: dbt deps
      
      - name: dbt run (production)
        run: dbt run --target prod
      
      - name: dbt test (production)
        run: dbt test --target prod
'''

print(github_actions)

8.2 Orchestration avec Airflow

Voir le code
# DAG Airflow pour dbt

airflow_dag = '''
# dags/dbt_daily_run.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

# Configuration
DBT_PROJECT_DIR = "/opt/dbt/ecommerce_analytics"
DBT_PROFILES_DIR = "/opt/dbt/profiles"

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "retries": 1,
}

with DAG(
    dag_id="dbt_daily_run",
    default_args=default_args,
    description="Exécution quotidienne de dbt",
    schedule_interval="0 6 * * *",  # Tous les jours à 6h
    start_date=days_ago(1),
    catchup=False,
    tags=["dbt", "analytics"],
) as dag:
    
    # Vérifier la fraîcheur des sources
    dbt_source_freshness = BashOperator(
        task_id="dbt_source_freshness",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt source freshness --profiles-dir {DBT_PROFILES_DIR}"
    )
    
    # Exécuter les models
    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt run --profiles-dir {DBT_PROFILES_DIR}"
    )
    
    # Exécuter les tests
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt test --profiles-dir {DBT_PROFILES_DIR}"
    )
    
    # Générer la documentation
    dbt_docs = BashOperator(
        task_id="dbt_docs_generate",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt docs generate --profiles-dir {DBT_PROFILES_DIR}"
    )
    
    # Ordre d'exécution
    dbt_source_freshness >> dbt_run >> dbt_test >> dbt_docs
'''

print(airflow_dag)

9. Mini-Projet : Couche Gold Industrialisée

Objectif

Créer une couche Gold complète avec dbt : models, tests, documentation, incrémental.

ARCHITECTURE DU PROJET :

┌──────────────────────────────────────────────────────────────────┐
│                         dbt Project                              │
│                                                                  │
│  STAGING                 SILVER                    GOLD          │
│  ┌──────────┐           ┌──────────┐           ┌──────────────┐ │
│  │stg_orders│──────────▶│silver_   │──────────▶│gold_daily_   │ │
│  │          │           │orders    │           │revenue       │ │
│  └──────────┘           └──────────┘           │(incremental) │ │
│  ┌──────────┐                │                 └──────────────┘ │
│  │stg_      │────────────────┘                                  │
│  │customers │                                  ┌──────────────┐ │
│  └──────────┘                                  │gold_customer │ │
│                                                │_summary      │ │
│                                                └──────────────┘ │
│                                                                  │
│  + Tests sur chaque model                                        │
│  + Documentation complète                                        │
│  + Great Expectations pour validation finale                     │
└──────────────────────────────────────────────────────────────────┘
Voir le code
# Structure complète du mini-projet

project_structure = '''
ecommerce_dbt/
├── dbt_project.yml
├── packages.yml

├── models/
│   ├── staging/
│   │   ├── _sources.yml         # Sources raw
│   │   ├── stg_orders.sql
│   │   └── stg_customers.sql
│   │
│   ├── silver/
│   │   ├── _silver__models.yml  # Tests + docs
│   │   └── silver_orders.sql
│   │
│   └── gold/
│       ├── _gold__models.yml    # Tests + docs
│       ├── gold_daily_revenue.sql      # Incrémental
│       └── gold_customer_summary.sql   # Table

├── macros/
│   └── deduplicate.sql

├── tests/
│   ├── assert_positive_revenue.sql
│   └── assert_no_future_dates.sql

└── great_expectations/
    └── expectations/
        └── gold_orders_suite.json
'''

print(project_structure)
Voir le code
# Models du mini-projet

models = '''
-- ═══════════════════════════════════════════════════════════════
-- models/gold/gold_daily_revenue.sql
-- ═══════════════════════════════════════════════════════════════

{{ config(
    materialized='incremental',
    unique_key='revenue_date',
    incremental_strategy='merge'
) }}

SELECT
    order_date AS revenue_date,
    COUNT(DISTINCT order_id) AS total_orders,
    COUNT(DISTINCT customer_id) AS unique_customers,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    CURRENT_TIMESTAMP() AS updated_at
FROM {{ ref('silver_orders') }}
{% if is_incremental() %}
WHERE order_date >= (SELECT MAX(revenue_date) - INTERVAL 2 DAY FROM {{ this }})
{% endif %}
GROUP BY order_date

-- ═══════════════════════════════════════════════════════════════
-- models/gold/gold_customer_summary.sql
-- ═══════════════════════════════════════════════════════════════

{{ config(materialized='table') }}

SELECT
    customer_id,
    customer_name,
    email,
    COUNT(DISTINCT order_id) AS lifetime_orders,
    SUM(amount) AS lifetime_revenue,
    AVG(amount) AS avg_order_value,
    MIN(order_date) AS first_order_date,
    MAX(order_date) AS last_order_date,
    DATEDIFF(day, MIN(order_date), MAX(order_date)) AS customer_tenure_days
FROM {{ ref('silver_orders') }}
GROUP BY customer_id, customer_name, email
'''

print(models)

Quiz

Q1. Qu’est-ce que la Transformation-as-Code ?
R Appliquer les principes du développement logiciel (versioning, tests, CI/CD) aux transformations de données.
Q2. Quel est le rôle de ref() dans dbt ?
R Référencer un autre model dbt pour créer une dépendance et permettre le lineage automatique.
Q3. Différence entre table et incremental ?
R table = rebuild complet à chaque run. incremental = ajoute seulement les nouvelles lignes.
Q4. Test générique vs test singulier ?
R Générique = built-in (unique, not_null). Singulier = SQL custom dans tests/.
Q5. Comment dbt gère l’ordre d’exécution ?
R Via le DAG des dépendances créé par les appels à ref() et source().
Q6. Comment générer la documentation dbt ?
R dbt docs generate puis dbt docs serve.
Q7. Qu’est-ce qu’un Checkpoint dans Great Expectations ?
R Exécution d’une suite d’expectations sur une datasource avec génération de rapport.
Q8. Comment intégrer dbt dans CI/CD ?
R GitHub Actions : dbt build sur PR, dbt run --target prod sur merge.
Q9. Avantage de l’incrémental vs full-refresh ?
R Performance : traite seulement les nouvelles données, pas tout recalculer.
Q10. Rôle de Jinja dans dbt ?
R Templating SQL : variables, conditions, boucles, macros réutilisables.

📚 Ressources


➡️ Prochaine étape

👉 Module 26 : 26_projet_integrateur.ipynb — Projet Final

Synthèse de toutes les compétences : Kafka → Spark on K8s → Delta Lake → dbt


📝 Récapitulatif

Concept Appris
dbt Models, ref(), source(), matérialisations
Tests Génériques, singuliers, dbt-expectations
Documentation schema.yml, dbt docs, lineage
Great Expectations Expectations, Checkpoints, Data Docs
Incrémental is_incremental(), unique_key, merge
CI/CD GitHub Actions, Airflow

🎉 Félicitations ! Tu maîtrises maintenant dbt et la Data Quality.

Retour au sommet