Industrialiser les Transformations et Garantir la Qualité
Bienvenue dans ce module où tu vas apprendre à industrialiser tes transformations SQL et à garantir la qualité de tes données avec les outils standards de l’industrie.
Prérequis
Module
Compétence
Pourquoi ?
✅ 20
Spark SQL
SQL avancé
✅ 23
Table Formats
Delta Lake comme cible
✅ 24
Streaming
Données incrémentales
Objectifs
À la fin de ce module, tu seras capable de :
Comprendre le concept de Transformation-as-Code (TaC)
Implémenter des tests de qualité avec dbt et Great Expectations
Créer des modèles incrémentaux performants
Déployer dbt dans un pipeline CI/CD
1. Introduction — Le Problème à Résoudre
1.1 Le chaos des transformations SQL
Imagine une équipe data typique sans dbt :
SITUATION TYPIQUE SANS DBT :
📁 scripts/
├── transform_users_v2_final_FINAL.sql ← Quelle version utiliser ?
├── transform_users_v2_final.sql
├── transform_users_v2.sql
├── create_dashboard_table_john.sql ← Qui a écrit ça ?
├── fix_bug_urgent.sql ← Ça fait quoi exactement ?
└── old/
└── ... 50 fichiers abandonnés
PROBLÈMES :
• Aucune traçabilité (qui a modifié quoi ?)
• Pas de tests (les données sont-elles correctes ?)
• Pas de documentation (c'est quoi cette colonne ?)
• Ordre d'exécution manuel (quelle requête lancer en premier ?)
• Pas de review de code (erreurs passent en production)
1.2 La solution : Transformation-as-Code (TaC)
Transformation-as-Code applique les principes du développement logiciel aux transformations de données :
Principe Dev
Application Data
Versioning
Transformations dans Git
Tests unitaires
Tests sur les données
Documentation
Auto-générée depuis le code
Code Review
PR avant mise en production
CI/CD
Déploiement automatisé
1.3 Qu’est-ce que dbt ?
dbt (Data Build Tool) est un outil open-source qui permet de transformer les données dans ton Data Warehouse/Lakehouse en utilisant uniquement du SQL.
dbt s’installe via pip avec un adapter spécifique à ta base de données :
Voir le code
# Installation de dbtinstallation ='''# Adapter selon ton Data Warehouse/Lakehouse :pip install dbt-core # Core (obligatoire)# + UN adapter selon ta cible :pip install dbt-postgres # PostgreSQLpip install dbt-snowflake # Snowflakepip install dbt-bigquery # BigQuerypip install dbt-spark # Spark (Databricks, EMR, local)pip install dbt-duckdb # DuckDB (local, léger)pip install dbt-trino # Trino/Presto# Vérifier l'installationdbt --version'''print(installation)print("\n💡 Pour ce module, on utilise dbt-duckdb (léger, local) ou dbt-spark.")
2.2 Initialisation d’un projet
La commande dbt init crée la structure de base :
Voir le code
# Créer un nouveau projet dbtinit_project ='''# Créer un nouveau projetdbt init my_dbt_project# Structure créée :my_dbt_project/│├── dbt_project.yml # Configuration principale du projet├── profiles.yml # Connexions aux bases (souvent dans ~/.dbt/)│├── models/ # ⭐ TES TRANSFORMATIONS SQL│ └── example/│ ├── my_first_model.sql│ └── schema.yml # Tests et documentation│├── seeds/ # Données statiques (CSV)│├── snapshots/ # Historisation (SCD Type 2)│├── macros/ # Fonctions Jinja réutilisables│├── tests/ # Tests SQL custom│├── analyses/ # Requêtes ad-hoc (non matérialisées)│└── target/ # Fichiers compilés (généré)'''print(init_project)
2.3 Le fichier dbt_project.yml
C’est le fichier de configuration principal de ton projet. Il définit le nom, la version, et les paramètres par défaut.
Voir le code
# dbt_project.yml - Configuration principaledbt_project_yml ='''# dbt_project.ymlname: 'ecommerce_analytics' # Nom du projet (sans espaces, sans tirets)version: '1.0.0'config-version: 2# Profil de connexion (défini dans profiles.yml)profile: 'ecommerce_analytics'# Chemins des différents composantsmodel-paths: ["models"]analysis-paths: ["analyses"]test-paths: ["tests"]seed-paths: ["seeds"]macro-paths: ["macros"]snapshot-paths: ["snapshots"]target-path: "target" # Où dbt écrit les fichiers compilésclean-targets: ["target", "dbt_packages"]# ═══════════════════════════════════════════════════════════════# Configuration des modèles (TRÈS IMPORTANT)# ═══════════════════════════════════════════════════════════════models: ecommerce_analytics: # Doit correspondre au "name" ci-dessus # Configuration par défaut pour TOUS les modèles +materialized: view # Par défaut : créer des vues # Configuration par dossier staging: # Dossier models/staging/ +materialized: view +schema: staging # Écrire dans le schéma "staging" silver: +materialized: table # Tables complètes +schema: silver gold: +materialized: incremental # Incrémental pour la performance +schema: gold'''print(dbt_project_yml)
2.4 Le fichier profiles.yml
Ce fichier contient les informations de connexion à ta base de données. Il est généralement stocké dans ~/.dbt/profiles.yml (pas dans le repo Git pour des raisons de sécurité).
Voir le code
# profiles.yml - Connexions aux bases de donnéesprofiles_yml ='''# ~/.dbt/profiles.ymlecommerce_analytics: # Doit correspondre au "profile" dans dbt_project.yml target: dev # Environnement par défaut outputs: # ═══════════════════════════════════════════════════════════ # Environnement DEV (développement local) # ═══════════════════════════════════════════════════════════ dev: type: duckdb # Adapter DuckDB (léger, local) path: /tmp/dev.duckdb threads: 4 # ═══════════════════════════════════════════════════════════ # Environnement PROD (Spark/Databricks) # ═══════════════════════════════════════════════════════════ prod: type: spark method: thrift # ou "odbc", "http" host: spark-thrift-server.company.com port: 10001 schema: analytics threads: 8 # ═══════════════════════════════════════════════════════════ # Exemple Snowflake # ═══════════════════════════════════════════════════════════ snowflake_prod: type: snowflake account: xy12345.us-east-1 user: "{{ env_var('SNOWFLAKE_USER') }}" # Variable d'environnement password: "{{ env_var('SNOWFLAKE_PASSWORD') }}" role: TRANSFORMER database: ANALYTICS warehouse: TRANSFORM_WH schema: GOLD threads: 8'''print(profiles_yml)print("\n💡 Utilise env_var() pour les secrets, jamais en dur !")
3. Les Models — Cœur de dbt
3.1 Qu’est-ce qu’un Model ?
Un model dbt est simplement un fichier SQL qui contient une requête SELECT. dbt se charge de créer la table ou vue correspondante.
FICHIER SQL CE QUE DBT FAIT
┌─────────────────────────┐ ┌─────────────────────────┐
│ -- models/users.sql │ │ CREATE TABLE users AS │
│ │ ───▶ │ SELECT │
│ SELECT │ │ id, │
│ id, │ │ name, │
│ name, │ │ email │
│ email │ │ FROM raw.users │
│ FROM raw.users │ │ │
└─────────────────────────┘ └─────────────────────────┘
Tu écris le SELECT, dbt gère le CREATE/INSERT !
Voir le code
# Exemple de model simplemodel_simple ='''-- models/staging/stg_orders.sql-- Configuration du model (optionnel, peut être dans dbt_project.yml){{ config( materialized='view', schema='staging') }}-- Le SELECT qui définit le modelSELECT order_id, customer_id, order_date, CAST(amount AS DECIMAL(10, 2)) AS amount, status, created_atFROM {{ source('raw', 'orders') }} -- Référence à une source externeWHERE order_date >= '2024-01-01' -- Filtrer les données récentes'''print(model_simple)print("\n💡 Points clés :")print("• {{ config(...) }} : Configuration spécifique au model")print("• {{ source(...) }} : Référence à une table externe")print("• Pas de CREATE TABLE : dbt le génère automatiquement")
3.2 La fonction ref() — La clé de dbt
ref() est LA fonction la plus importante de dbt. Elle permet de référencer un autre model, et dbt : 1. Comprend la dépendance entre les models 2. Exécute les models dans le bon ordre 3. Génère le lineage (traçabilité)
SANS ref() (MAUVAIS) AVEC ref() (BON)
┌─────────────────────────┐ ┌─────────────────────────┐
│ SELECT * │ │ SELECT * │
│ FROM analytics.users │ │ FROM {{ ref('users') }} │
│ │ │ │
│ ❌ Schéma hardcodé │ │ ✅ dbt résout le schéma │
│ ❌ Pas de dépendance │ │ ✅ Dépendance trackée │
│ ❌ Pas de lineage │ │ ✅ Lineage automatique │
└─────────────────────────┘ └─────────────────────────┘
Voir le code
# Exemple avec ref()model_with_ref ='''-- models/silver/silver_orders.sql{{ config(materialized='table') }}SELECT o.order_id, o.customer_id, c.customer_name, c.customer_email, o.order_date, o.amount, o.statusFROM {{ ref('stg_orders') }} o -- Référence au model stg_ordersLEFT JOIN {{ ref('stg_customers') }} c -- Référence au model stg_customers ON o.customer_id = c.customer_idWHERE o.status != 'cancelled''''print(model_with_ref)print("\n📊 Ce que dbt comprend :")print("")print(" stg_orders ──────┐")print(" ├──▶ silver_orders")print(" stg_customers ───┘")print("")print("dbt exécutera stg_orders et stg_customers AVANT silver_orders.")
3.3 Les Sources — Référencer les données externes
Les sources représentent les données qui existent AVANT dbt (tables raw, données ingérées par Kafka/Spark, etc.).
On les définit dans un fichier schema.yml :
Voir le code
# Définition des sourcessources_yml ='''# models/staging/schema.ymlversion: 2sources: - name: raw # Nom logique de la source description: "Données brutes ingérées par Kafka/Spark" database: bronze # Base de données (optionnel) schema: public # Schéma tables: - name: orders description: "Commandes brutes" columns: - name: order_id description: "Identifiant unique de la commande" - name: amount description: "Montant en euros" # ═══════════════════════════════════════════════════════ # FRESHNESS : Vérifier que les données arrivent # ═══════════════════════════════════════════════════════ loaded_at_field: created_at # Colonne de timestamp freshness: warn_after: {count: 12, period: hour} # Warning si > 12h error_after: {count: 24, period: hour} # Erreur si > 24h - name: customers description: "Clients" freshness: warn_after: {count: 24, period: hour} error_after: {count: 48, period: hour}'''print(sources_yml)print("\n💡 La freshness permet de détecter les problèmes d'ingestion !")print(" Commande : dbt source freshness")
La matérialisation définit COMMENT dbt crée l’objet dans la base de données :
Matérialisation
Description
Quand l’utiliser
view
Crée une vue SQL
Staging, peu de données
table
Crée une table (full refresh)
Silver, taille moyenne
incremental
Ajoute seulement les nouvelles lignes
Gold, gros volumes
ephemeral
CTE (pas de table créée)
Sous-requêtes réutilisables
Voir le code
# Comparaison des matérialisationsmaterializations ='''# ═══════════════════════════════════════════════════════════════# VIEW : Pas de stockage, requête à chaque lecture# ═══════════════════════════════════════════════════════════════{{ config(materialized='view') }}SELECT * FROM {{ source('raw', 'events') }}-- dbt génère : CREATE VIEW ... AS SELECT ...-- ✅ Toujours à jour-- ❌ Lent si requête complexe# ═══════════════════════════════════════════════════════════════# TABLE : Stockage physique, reconstruction complète# ═══════════════════════════════════════════════════════════════{{ config(materialized='table') }}SELECT * FROM {{ ref('stg_events') }}-- dbt génère : CREATE TABLE ... AS SELECT ... (ou DROP + CREATE)-- ✅ Lecture rapide-- ❌ Rebuild complet à chaque run# ═══════════════════════════════════════════════════════════════# INCREMENTAL : Seulement les nouvelles données# ═══════════════════════════════════════════════════════════════{{ config( materialized='incremental', unique_key='event_id' # Pour le MERGE/upsert) }}SELECT * FROM {{ ref('stg_events') }}{% if is_incremental() %} -- Condition pour les runs incrémentauxWHERE event_time > (SELECT MAX(event_time) FROM {{ this }}){% endif %}-- dbt génère : INSERT INTO ... SELECT ... WHERE ...-- ✅ Très rapide pour gros volumes-- ❌ Plus complexe à maintenir# ═══════════════════════════════════════════════════════════════# EPHEMERAL : CTE, pas de table# ═══════════════════════════════════════════════════════════════{{ config(materialized='ephemeral') }}SELECT DISTINCT customer_id FROM {{ ref('orders') }}-- dbt l'injecte comme CTE dans les models qui le ref()-- ✅ Pas de table supplémentaire-- ❌ Recalculé à chaque utilisation'''print(materializations)
Exercice 1 : Créer un projet dbt avec 3 models
Objectif : Créer une structure dbt avec staging → silver.
-- models/staging/stg_orders.sql{{ config(materialized='view') }}SELECT order_id, customer_id,CAST(amount ASDECIMAL(10,2)) AS amount, order_dateFROM {{ source('raw', 'orders') }}-- models/staging/stg_customers.sql{{ config(materialized='view') }}SELECT customer_id,TRIM(name) AS customer_name,LOWER(email) AS emailFROM {{ source('raw', 'customers') }}-- models/silver/silver_orders.sql{{ config(materialized='table') }}SELECT o.order_id, o.customer_id, c.customer_name, c.email, o.amount, o.order_dateFROM {{ ref('stg_orders') }} oLEFTJOIN {{ ref('stg_customers') }} c USING (customer_id)
4. Tests de Qualité dans dbt
4.1 Pourquoi tester les données ?
Les données, comme le code, peuvent avoir des bugs :
Problème
Conséquence
Test dbt
Doublons sur order_id
Revenus gonflés
unique
customer_id manquant
Joins échouent
not_null
Status invalide
Dashboard cassé
accepted_values
FK inexistante
Données orphelines
relationships
4.2 Tests Génériques (built-in)
dbt fournit 4 tests de base, définis dans schema.yml :
Voir le code
# Tests génériques dans schema.ymlgeneric_tests ='''# models/silver/schema.ymlversion: 2models: - name: silver_orders description: "Table des commandes enrichies avec infos clients" columns: - name: order_id description: "Identifiant unique de la commande" tests: - unique # Pas de doublons - not_null # Jamais NULL - name: customer_id description: "Référence au client" tests: - not_null - relationships: # Clé étrangère to: ref('stg_customers') field: customer_id - name: status description: "Statut de la commande" tests: - accepted_values: # Valeurs autorisées values: ['pending', 'shipped', 'delivered', 'cancelled'] - name: amount description: "Montant en euros" tests: - not_null'''print(generic_tests)print("\n# Exécuter les tests :")print("dbt test # Tous les tests")print("dbt test --select silver_orders # Tests d'un model")
4.3 Tests Singuliers (custom)
Pour des validations complexes, tu peux écrire des tests SQL custom. Un test réussit s’il retourne 0 lignes.
Voir le code
# Tests singuliers (custom SQL)singular_tests ='''-- tests/assert_positive_amounts.sql-- Ce test ÉCHOUE si la requête retourne des lignes-- (on cherche les cas problématiques)SELECT order_id, amountFROM {{ ref('silver_orders') }}WHERE amount <= 0-- Si cette requête retourne des lignes = montants négatifs ou nuls = ÉCHEC-- ═══════════════════════════════════════════════════════════════-- tests/assert_orders_have_recent_dates.sqlSELECT order_id, order_dateFROM {{ ref('silver_orders') }}WHERE order_date < '2020-01-01' OR order_date > CURRENT_DATE-- Détecte les dates suspectes (trop anciennes ou dans le futur)-- ═══════════════════════════════════════════════════════════════-- tests/assert_revenue_consistency.sql-- Vérifier que le total des commandes = total du dashboardSELECT 1FROM ( SELECT SUM(amount) AS orders_total FROM {{ ref('silver_orders') }}) ordersJOIN ( SELECT SUM(revenue) AS dashboard_total FROM {{ ref('gold_dashboard') }}) dashboardON 1=1WHERE ABS(orders_total - dashboard_total) > 0.01 -- Tolérance'''print(singular_tests)
4.4 Packages dbt : Tests avancés
Le dbt Hub contient des packages avec des tests supplémentaires. Les plus utiles :
Package
Tests fournis
dbt-utils
surrogate_key, not_null_proportion, at_least_one
dbt-expectations
Tests style Great Expectations dans dbt
dbt-audit-helper
compare_relations pour les migrations
Voir le code
# Installation et utilisation des packages dbtpackages ='''# packages.yml (à la racine du projet)packages: - package: dbt-labs/dbt_utils version: 1.1.1 - package: calogica/dbt_expectations version: 0.10.1 - package: dbt-labs/audit_helper version: 0.9.0# Installer les packages :# dbt deps'''usage_examples ='''# Utilisation dans schema.ymlmodels: - name: silver_orders columns: - name: order_id tests: - unique - not_null - name: amount tests: # Test dbt-expectations : valeur entre 0 et 10000 - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 10000 # Test dbt-utils : au moins 95% non-null - dbt_utils.not_null_proportion: at_least: 0.95 - name: email tests: # Test dbt-expectations : format email - dbt_expectations.expect_column_values_to_match_regex: regex: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"'''print("# packages.yml")print(packages)print("\n# Utilisation des tests avancés")print(usage_examples)
Exercice 2 : Ajouter des tests de qualité
Objectif : Ajouter des tests génériques et un test singulier.
# 1. Dans schema.yml, ajouter pour silver_orders :# - order_id : unique, not_null# - amount : not_null# - status : accepted_values# 2. Créer un test singulier qui vérifie :# - Aucune commande avec amount > 100000 (fraude potentielle)
dbt génère automatiquement un site de documentation à partir des fichiers schema.yml :
Voir le code
# Documentation complète dans schema.ymldocumentation ='''# models/silver/schema.ymlversion: 2models: - name: silver_orders description: | Table des commandes enrichies avec les informations clients. **Source** : Kafka topic `orders` via Spark Streaming **Refresh** : Incrémental toutes les heures **Owner** : data-engineering@company.com ## Règles métier - Les commandes annulées sont exclues - Les montants sont en EUR columns: - name: order_id description: "Identifiant unique de la commande (UUID)" tests: [unique, not_null] - name: customer_id description: "Référence au client. Voir `stg_customers`." - name: amount description: | Montant total de la commande en EUR. Inclut les taxes, exclut les frais de livraison. - name: order_date description: "Date de création de la commande (timezone UTC)"'''print(documentation)print("\n# Générer et servir la documentation :")print("dbt docs generate # Génère le site dans target/")print("dbt docs serve # Lance un serveur local (http://localhost:8080)")
5.2 Le Lineage Graph
Le lineage (traçabilité) montre d’où viennent les données et où elles vont :
Great Expectations (GE) est un framework Python pour la validation et le profiling des données. Il complète dbt :
Aspect
dbt
Great Expectations
Focus
Transformation + Tests simples
Tests avancés + Profiling
Langage
SQL/YAML
Python
Output
Pass/Fail
Rapport HTML détaillé
Profiling
Non
Oui (auto-génération)
6.2 Concepts clés de GE
┌─────────────────────────────────────────────────────────────────┐
│ GREAT EXPECTATIONS │
│ │
│ DATASOURCE EXPECTATION SUITE CHECKPOINT │
│ ┌──────────┐ ┌──────────────────┐ ┌──────────┐ │
│ │ Connexion│ │ Liste de règles │ │ Exécution│ │
│ │ aux data │ ───▶ │ (expectations) │ ───▶│ + Rapport│ │
│ │ │ │ │ │ │ │
│ │ - Spark │ │ - not_null │ │ - Valide │ │
│ │ - Pandas │ │ - between │ │ - HTML │ │
│ │ - SQL │ │ - regex │ │ - Slack │ │
│ └──────────┘ └──────────────────┘ └──────────┘ │
│ │
│ DATA DOCS : Site web avec tous les rapports de validation │
└─────────────────────────────────────────────────────────────────┘
Voir le code
# Installation et setup de Great Expectationsge_setup ='''# Installationpip install great-expectations# Initialiser un projet GEgreat_expectations init# Structure créée :great_expectations/├── great_expectations.yml # Configuration principale├── expectations/ # Suites d'expectations├── checkpoints/ # Définitions des checkpoints├── plugins/ # Extensions custom└── uncommitted/ └── data_docs/ # Rapports HTML générés'''print(ge_setup)
Voir le code
# Utilisation de Great Expectations avec Pythonge_example ='''import great_expectations as gxfrom great_expectations.core.expectation_configuration import ExpectationConfiguration# ═══════════════════════════════════════════════════════════════# 1. Créer un contexte GE# ═══════════════════════════════════════════════════════════════context = gx.get_context()# ═══════════════════════════════════════════════════════════════# 2. Connecter une datasource (Pandas, Spark, SQL)# ═══════════════════════════════════════════════════════════════# Exemple avec Pandasdatasource = context.sources.add_pandas(name="my_pandas_ds")data_asset = datasource.add_dataframe_asset(name="orders_df")# Charger les donnéesimport pandas as pddf = pd.read_parquet("s3://gold/orders/")batch_request = data_asset.build_batch_request(dataframe=df)# ═══════════════════════════════════════════════════════════════# 3. Créer une Expectation Suite# ═══════════════════════════════════════════════════════════════suite = context.add_expectation_suite(expectation_suite_name="orders_suite")# Ajouter des expectationssuite.add_expectation( ExpectationConfiguration( expectation_type="expect_column_values_to_not_be_null", kwargs={"column": "order_id"} ))suite.add_expectation( ExpectationConfiguration( expectation_type="expect_column_values_to_be_between", kwargs={ "column": "amount", "min_value": 0, "max_value": 50000, "mostly": 0.99 # 99% des valeurs doivent être dans cette plage } ))suite.add_expectation( ExpectationConfiguration( expectation_type="expect_column_values_to_match_regex", kwargs={ "column": "email", "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$" } ))context.save_expectation_suite(suite)# ═══════════════════════════════════════════════════════════════# 4. Créer et exécuter un Checkpoint# ═══════════════════════════════════════════════════════════════checkpoint = context.add_or_update_checkpoint( name="orders_checkpoint", validations=[ { "batch_request": batch_request, "expectation_suite_name": "orders_suite" } ])# Exécuter la validationresult = checkpoint.run()print(f"Validation réussie : {result.success}")# ═══════════════════════════════════════════════════════════════# 5. Générer les Data Docs (rapport HTML)# ═══════════════════════════════════════════════════════════════context.build_data_docs()context.open_data_docs() # Ouvre dans le navigateur'''print(ge_example)
6.3 Data Profiling automatique
GE peut analyser tes données et générer automatiquement des expectations :
Voir le code
# Profiling automatique avec Great Expectationsprofiling ='''from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler# Créer un profilerprofiler = UserConfigurableProfiler( profile_dataset=validator, # Validator avec les données excluded_expectations=None, ignored_columns=[], not_null_only=False, primary_or_compound_key=["order_id"], semantic_types_dict=None, table_expectations_only=False, value_set_threshold="many" # "many", "few", ou un nombre)# Générer la suite d'expectations basée sur les données actuellessuite = profiler.build_suite()# Afficher les expectations généréesprint(f"Nombre d'expectations générées : {len(suite.expectations)}")for exp in suite.expectations[:5]: print(f" - {exp.expectation_type}")# Sauvegardercontext.save_expectation_suite(suite, expectation_suite_name="orders_profiled")'''print(profiling)print("\n💡 Le profiling auto-génère des expectations comme :")print("• expect_column_values_to_be_in_set (pour les catégories)")print("• expect_column_mean_to_be_between (pour les numériques)")print("• expect_column_proportion_of_unique_values_to_be_between")
Exercice 3 : Profiler une table Gold avec Great Expectations
Objectif : Créer une suite d’expectations pour valider une table Gold.
# 1. Charger la table gold_orders dans un DataFrame Pandas# 2. Créer une expectation suite avec :# - order_id : not_null, unique# - amount : between 0 and 100000, mostly 0.99# - status : in_set ['pending', 'shipped', 'delivered']# 3. Exécuter la validation# 4. Générer le rapport HTML
L’incrémental est crucial pour les gros volumes. Voici comment l’implémenter proprement :
Voir le code
# Model incrémental avec Delta Lakeincremental_model ='''-- models/gold/gold_daily_revenue.sql{{ config( materialized='incremental', unique_key='date_customer_key', -- Pour le MERGE incremental_strategy='merge', -- MERGE INTO (vs append, delete+insert) file_format='delta', -- Format Delta Lake partition_by=['order_date'] -- Partitionnement) }}WITH source_data AS ( SELECT order_date, customer_id, SUM(amount) AS daily_revenue, COUNT(*) AS order_count, -- Clé unique pour le MERGE CONCAT(order_date, '-', customer_id) AS date_customer_key FROM {{ ref('silver_orders') }} -- ═══════════════════════════════════════════════════════════ -- FILTRE INCRÉMENTAL : Seulement les nouvelles données -- ═══════════════════════════════════════════════════════════ {% if is_incremental() %} WHERE order_date > ( SELECT MAX(order_date) - INTERVAL 1 DAY -- Marge de sécurité FROM {{ this }} ) {% endif %} GROUP BY order_date, customer_id)SELECT * FROM source_data'''print(incremental_model)print("\n💡 Explication :")print("• is_incremental() : True si la table existe et ce n'est pas un --full-refresh")print("• {{ this }} : Référence à la table actuelle")print("• INTERVAL 1 DAY : Marge pour les données en retard (late data)")print("")print("# Commandes :")print("dbt run --select gold_daily_revenue # Run incrémental")print("dbt run --select gold_daily_revenue --full-refresh # Rebuild complet")
7.3 Macros Jinja
Les macros sont des fonctions réutilisables écrites en Jinja. Très utiles pour éviter la duplication de code.
Voir le code
# Macros Jinja réutilisablesmacros ='''-- macros/deduplicate.sql{% macro deduplicate(relation, partition_by, order_by) %} {# Déduplique une table en gardant la ligne la plus récente. Args: relation: La table source partition_by: Colonne(s) pour identifier les doublons order_by: Colonne pour déterminer quelle ligne garder #} SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY {{ partition_by }} ORDER BY {{ order_by }} DESC ) AS _row_num FROM {{ relation }} ) WHERE _row_num = 1{% endmacro %}-- ═══════════════════════════════════════════════════════════════-- macros/generate_surrogate_key.sql{% macro generate_surrogate_key(columns) %} {# Génère une clé surrogate à partir de plusieurs colonnes. #} MD5(CONCAT( {% for col in columns %} COALESCE(CAST({{ col }} AS STRING), '_null_') {% if not loop.last %}, '-', {% endif %} {% endfor %} )){% endmacro %}-- ═══════════════════════════════════════════════════════════════-- macros/cents_to_euros.sql{% macro cents_to_euros(column_name) %} ROUND({{ column_name }} / 100.0, 2){% endmacro %}'''usage ='''-- Utilisation dans un model :-- models/silver/silver_orders.sql{{ config(materialized='table') }}WITH deduplicated AS ({{ deduplicate( relation=ref('stg_orders'), partition_by='order_id', order_by='updated_at' ) }})SELECT{{ generate_surrogate_key(['order_id', 'customer_id']) }} AS order_sk, order_id, customer_id,{{ cents_to_euros('amount_cents') }} AS amount, order_dateFROM deduplicated'''print("# Macros")print(macros)print("\n# Utilisation")print(usage)
Exercice 4 : Créer un model incrémental
Objectif : Créer un model Gold incrémental pour les métriques journalières.
-- Créer gold_daily_metrics qui :-- 1. Agrège par date : total_orders, total_revenue, avg_order_value-- 2. Est incrémental sur order_date-- 3. Utilise une marge de 2 jours pour les late data
💡 Solution
{{ config(materialized='incremental', unique_key='order_date', incremental_strategy='merge') }}SELECT order_date,COUNT(*) AS total_orders,SUM(amount) AS total_revenue,AVG(amount) AS avg_order_valueFROM {{ ref('silver_orders') }}{% if is_incremental() %}WHERE order_date >= (SELECTMAX(order_date) -INTERVAL2DAYFROM {{ this }}){% endif %}GROUPBY order_date
8. CI/CD et Orchestration
8.1 Workflow Git avec dbt
WORKFLOW RECOMMANDÉ :
1. DEVELOP (feature branch)
└─▶ Modifier les models
└─▶ dbt run --select +modified_model+ (test local)
└─▶ dbt test --select +modified_model+
2. PULL REQUEST
└─▶ CI automatique : dbt build (run + test)
└─▶ Review par un collègue
└─▶ Merge si tout est vert ✅
3. DEPLOY (main branch)
└─▶ CD automatique : dbt run --target prod
└─▶ dbt test --target prod
# Models du mini-projetmodels ='''-- ═══════════════════════════════════════════════════════════════-- models/gold/gold_daily_revenue.sql-- ═══════════════════════════════════════════════════════════════{{ config( materialized='incremental', unique_key='revenue_date', incremental_strategy='merge') }}SELECT order_date AS revenue_date, COUNT(DISTINCT order_id) AS total_orders, COUNT(DISTINCT customer_id) AS unique_customers, SUM(amount) AS total_revenue, AVG(amount) AS avg_order_value, CURRENT_TIMESTAMP() AS updated_atFROM {{ ref('silver_orders') }}{% if is_incremental() %}WHERE order_date >= (SELECT MAX(revenue_date) - INTERVAL 2 DAY FROM {{ this }}){% endif %}GROUP BY order_date-- ═══════════════════════════════════════════════════════════════-- models/gold/gold_customer_summary.sql-- ═══════════════════════════════════════════════════════════════{{ config(materialized='table') }}SELECT customer_id, customer_name, email, COUNT(DISTINCT order_id) AS lifetime_orders, SUM(amount) AS lifetime_revenue, AVG(amount) AS avg_order_value, MIN(order_date) AS first_order_date, MAX(order_date) AS last_order_date, DATEDIFF(day, MIN(order_date), MAX(order_date)) AS customer_tenure_daysFROM {{ ref('silver_orders') }}GROUP BY customer_id, customer_name, email'''print(models)
Quiz
Q1. Qu’est-ce que la Transformation-as-Code ?
R
Appliquer les principes du développement logiciel (versioning, tests, CI/CD) aux transformations de données.
Q2. Quel est le rôle de ref() dans dbt ?
R
Référencer un autre model dbt pour créer une dépendance et permettre le lineage automatique.
Q3. Différence entre table et incremental ?
R
table = rebuild complet à chaque run. incremental = ajoute seulement les nouvelles lignes.
Q4. Test générique vs test singulier ?
R
Générique = built-in (unique, not_null). Singulier = SQL custom dans tests/.
Q5. Comment dbt gère l’ordre d’exécution ?
R
Via le DAG des dépendances créé par les appels à ref() et source().
Q6. Comment générer la documentation dbt ?
R
dbt docs generate puis dbt docs serve.
Q7. Qu’est-ce qu’un Checkpoint dans Great Expectations ?
R
Exécution d’une suite d’expectations sur une datasource avec génération de rapport.
Q8. Comment intégrer dbt dans CI/CD ?
R
GitHub Actions : dbt build sur PR, dbt run --target prod sur merge.
Q9. Avantage de l’incrémental vs full-refresh ?
R
Performance : traite seulement les nouvelles données, pas tout recalculer.
Q10. Rôle de Jinja dans dbt ?
R
Templating SQL : variables, conditions, boucles, macros réutilisables.