Spark SQL Deep Dive & Optimisation

Bienvenue dans ce module où tu vas maîtriser Spark SQL en profondeur. Tu découvriras les Window Functions, les agrégations avancées, le reshaping de données, et comment optimiser tes requêtes SQL.


Prérequis

Niveau Compétence
✅ Requis Module 19 : PySpark Advanced
✅ Requis Connaissances SQL de base
💡 Recommandé Expérience avec des requêtes analytiques

Objectifs du module

À la fin de ce module, tu seras capable de :

  • Maîtriser les Window Functions (ranking, lag/lead, frames)
  • Utiliser PIVOT/UNPIVOT pour reshaper les données
  • Appliquer GROUPING SETS, CUBE, ROLLUP pour des agrégations multidimensionnelles
  • Manipuler les données semi-structurées avec EXPLODE
  • Structurer des requêtes complexes avec CTEs
  • Optimiser les requêtes SQL (hints, statistiques)
  • Construire un datamart analytique complet
Voir le code
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Créer une SparkSession
spark = SparkSession.builder \
    .appName("Spark SQL Deep Dive") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"✅ Spark {spark.version} initialisé")
print(f"🔍 Spark UI : {spark.sparkContext.uiWebUrl}")

1. SQL dans Spark : Rappels & Fondamentaux

Rappel rapide — Les détails sur Catalyst et l’architecture sont dans le module 19.

1.1 Créer et utiliser des vues

Voir le code
# Créer des données de test
sales_data = [
    (1, "Electronics", "North", 1200, "2024-01-15"),
    (2, "Electronics", "South", 800, "2024-01-16"),
    (3, "Clothing", "North", 450, "2024-01-15"),
    (4, "Clothing", "South", 650, "2024-01-17"),
    (5, "Electronics", "North", 950, "2024-01-18"),
    (6, "Food", "South", 320, "2024-01-15"),
]

sales_df = spark.createDataFrame(sales_data, 
    ["id", "category", "region", "amount", "date"])

# Créer une vue temporaire
sales_df.createOrReplaceTempView("sales")

# Utiliser SQL
result = spark.sql("""
    SELECT category, SUM(amount) as total_sales
    FROM sales
    WHERE amount > 500
    GROUP BY category
    ORDER BY total_sales DESC
""")

result.show()

1.2 DataFrame API vs SQL

Aspect DataFrame API SQL
Typage Compile-time Runtime
Lisibilité Code Python Familier aux analystes
Réutilisabilité Fonctions Python CTEs, Vues
Performance Identique Identique
Debug Stack trace Python Erreurs SQL
IDE Support Autocomplétion Variable

💡 Règle : Utilise ce qui est le plus lisible pour ton équipe. Les deux sont optimisés par Catalyst.

Voir le code
# Équivalence DataFrame API vs SQL

# === DataFrame API ===
result_df = sales_df \
    .filter(col("amount") > 500) \
    .groupBy("category") \
    .agg(sum("amount").alias("total_sales")) \
    .orderBy(desc("total_sales"))

# === SQL ===
result_sql = spark.sql("""
    SELECT category, SUM(amount) as total_sales
    FROM sales
    WHERE amount > 500
    GROUP BY category
    ORDER BY total_sales DESC
""")

print("DataFrame API:")
result_df.show()

print("SQL:")
result_sql.show()

# Les plans sont identiques !
print("\n✅ Les deux approches génèrent le même plan d'exécution")

1.3 Lire un plan d’exécution SQL

Voir le code
# Créer des données pour démontrer les joins
products_data = [
    (1, "Laptop", "Electronics"),
    (2, "T-Shirt", "Clothing"),
    (3, "Apple", "Food"),
]
products_df = spark.createDataFrame(products_data, ["product_id", "name", "category"])
products_df.createOrReplaceTempView("products")

# Voir le plan d'exécution
spark.sql("""
    EXPLAIN FORMATTED
    SELECT s.*, p.name
    FROM sales s
    JOIN products p ON s.category = p.category
    WHERE s.amount > 500
""").show(truncate=False)

Ce qu’il faut repérer dans le plan :

Élément Signification Bon/Mauvais
BroadcastHashJoin Petite table broadcastée ✅ Bon
SortMergeJoin Shuffle des deux tables ⚠️ Coûteux
PushedFilters Filtre appliqué à la source ✅ Bon
Exchange Shuffle (redistribution) ⚠️ À surveiller

2. Window Functions — Le cœur analytique

🔥 Section la plus importante du module. Les Window Functions sont essentielles pour l’analytics.

2.1 Syntaxe et concepts

FUNCTION() OVER (
  PARTITION BY column1, column2   -- Grouper les données
  ORDER BY column3                -- Ordonner dans chaque groupe
  ROWS BETWEEN start AND end      -- Définir la fenêtre de calcul
)
🖼️ Comment fonctionne une Window Function :

Données originales          PARTITION BY category    ORDER BY date       Calcul sur la fenêtre
┌─────────────────┐         ┌─────────────────┐     ┌─────────────────┐  ┌─────────────────┐
│ cat │ date │ amt│         │ Electronics     │     │ 01-15 │ 1200   │  │ ROW_NUMBER = 1  │
│ Elec│ 01-15│1200│   →     │ ├─ 01-15 │ 1200│  →  │ 01-16 │  800   │  │ ROW_NUMBER = 2  │
│ Elec│ 01-16│ 800│         │ ├─ 01-16 │  800│     │ 01-18 │  950   │  │ ROW_NUMBER = 3  │
│ Elec│ 01-18│ 950│         │ └─ 01-18 │  950│     └─────────────────┘  └─────────────────┘
│ Clth│ 01-15│ 450│         │                 │
│ Clth│ 01-17│ 650│         │ Clothing        │
└─────────────────┘         │ ├─ 01-15 │  450│
                            │ └─ 01-17 │  650│
                            └─────────────────┘
Voir le code
# Créer des données plus riches pour les window functions
orders_data = [
    (1, 101, "2024-01-01", 150.0, "Premium"),
    (2, 101, "2024-01-15", 200.0, "Premium"),
    (3, 101, "2024-02-01", 180.0, "Premium"),
    (4, 102, "2024-01-05", 300.0, "Standard"),
    (5, 102, "2024-01-20", 250.0, "Standard"),
    (6, 103, "2024-01-10", 400.0, "Premium"),
    (7, 103, "2024-01-25", 400.0, "Premium"),  # Ex-aequo intentionnel
    (8, 103, "2024-02-10", 350.0, "Premium"),
    (9, 104, "2024-01-03", 100.0, "Standard"),
    (10, 104, "2024-02-15", 120.0, "Standard"),
]

orders_df = spark.createDataFrame(orders_data,
    ["order_id", "customer_id", "order_date", "amount", "segment"])
orders_df = orders_df.withColumn("order_date", to_date(col("order_date")))

orders_df.createOrReplaceTempView("orders")
orders_df.show()

2.2 Fonctions de Ranking

Fonction Description Gère les ex-aequo
ROW_NUMBER() Numéro unique séquentiel Non (arbitraire)
RANK() Rang avec gaps après ex-aequo Oui (1,1,3)
DENSE_RANK() Rang sans gaps Oui (1,1,2)
NTILE(n) Divise en n groupes égaux -
PERCENT_RANK() Rang en percentile (0-1) Oui
CUME_DIST() Distribution cumulative Oui
Voir le code
# Démonstration des fonctions de ranking
ranking_result = spark.sql("""
    SELECT 
        customer_id,
        order_date,
        amount,
        ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY amount DESC) as row_num,
        RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) as rank,
        DENSE_RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) as dense_rank,
        NTILE(2) OVER (PARTITION BY customer_id ORDER BY amount DESC) as ntile_2
    FROM orders
    WHERE customer_id = 103
    ORDER BY customer_id, amount DESC
""")

print("📊 Comparaison ROW_NUMBER vs RANK vs DENSE_RANK (customer 103 avec ex-aequo):")
ranking_result.show()
Voir le code
# Cas d'usage : Top 2 commandes par client
top_orders = spark.sql("""
    SELECT * FROM (
        SELECT 
            customer_id,
            order_date,
            amount,
            ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY amount DESC) as rn
        FROM orders
    )
    WHERE rn <= 2
    ORDER BY customer_id, rn
""")

print("🏆 Top 2 commandes par client:")
top_orders.show()

2.3 Fonctions de décalage (LAG/LEAD)

Fonction Description
LAG(col, n, default) Valeur n lignes AVANT
LEAD(col, n, default) Valeur n lignes APRÈS
FIRST_VALUE(col) Première valeur de la fenêtre
LAST_VALUE(col) Dernière valeur de la fenêtre
NTH_VALUE(col, n) N-ième valeur de la fenêtre
Voir le code
# Cas d'usage : Délai entre commandes
order_gaps = spark.sql("""
    SELECT 
        customer_id,
        order_date,
        amount,
        LAG(order_date, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_date,
        LEAD(order_date, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as next_order_date,
        DATEDIFF(
            order_date, 
            LAG(order_date, 1) OVER (PARTITION BY customer_id ORDER BY order_date)
        ) as days_since_last_order
    FROM orders
    ORDER BY customer_id, order_date
""")

print("📅 Analyse du délai entre commandes:")
order_gaps.show()
Voir le code
# FIRST_VALUE et LAST_VALUE
first_last = spark.sql("""
    SELECT 
        customer_id,
        order_date,
        amount,
        FIRST_VALUE(order_date) OVER (
            PARTITION BY customer_id ORDER BY order_date
        ) as first_order_date,
        FIRST_VALUE(amount) OVER (
            PARTITION BY customer_id ORDER BY order_date
        ) as first_order_amount
    FROM orders
    ORDER BY customer_id, order_date
""")

print("📊 FIRST_VALUE - Date et montant de la première commande:")
first_last.show()

2.4 Agrégations dans une fenêtre

Voir le code
# Running total, Running average, % du total
window_agg = spark.sql("""
    SELECT 
        customer_id,
        order_date,
        amount,
        
        -- Somme cumulative
        SUM(amount) OVER (
            PARTITION BY customer_id 
            ORDER BY order_date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) as running_total,
        
        -- Total du client (pour calculer %)
        SUM(amount) OVER (PARTITION BY customer_id) as customer_total,
        
        -- % de chaque commande
        ROUND(amount / SUM(amount) OVER (PARTITION BY customer_id) * 100, 1) as pct_of_customer_total
        
    FROM orders
    ORDER BY customer_id, order_date
""")

print("📊 Agrégations fenêtrées:")
window_agg.show()

2.5 Window Frames (Pièges Spark)

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW  -- Du début jusqu'à maintenant
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW          -- 7 dernières lignes (rolling)
ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING  -- De maintenant jusqu'à la fin
ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING          -- 3 lignes (précédente, courante, suivante)
Clause Basée sur Comportement
ROWS Position physique Prévisible, recommandé
RANGE Valeur logique ⚠️ Peut inclure plus de lignes si doublons

💡 En cas de doute, utilise ROWS (plus prévisible)

Voir le code
# Rolling average (moyenne mobile)
# Créer plus de données pour démontrer
daily_sales = spark.createDataFrame([
    ("2024-01-01", 100), ("2024-01-02", 150), ("2024-01-03", 120),
    ("2024-01-04", 180), ("2024-01-05", 90), ("2024-01-06", 200),
    ("2024-01-07", 170), ("2024-01-08", 140), ("2024-01-09", 160),
    ("2024-01-10", 190),
], ["date", "sales"])
daily_sales = daily_sales.withColumn("date", to_date(col("date")))
daily_sales.createOrReplaceTempView("daily_sales")

# Moyenne mobile sur 3 jours
rolling = spark.sql("""
    SELECT 
        date,
        sales,
        ROUND(AVG(sales) OVER (
            ORDER BY date
            ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
        ), 1) as rolling_avg_3d,
        SUM(sales) OVER (
            ORDER BY date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) as cumulative_sum
    FROM daily_sales
""")

print("📈 Moyenne mobile 3 jours et somme cumulative:")
rolling.show()

3. PIVOT & UNPIVOT — Reshaping des données

3.1 PIVOT : lignes → colonnes

Transforme les valeurs d’une colonne en colonnes distinctes.

Voir le code
# Données mensuelles
monthly_data = spark.createDataFrame([
    ("Alice", "Jan", 1000), ("Alice", "Feb", 1200), ("Alice", "Mar", 1100),
    ("Bob", "Jan", 800), ("Bob", "Feb", 900), ("Bob", "Mar", 950),
    ("Charlie", "Jan", 1500), ("Charlie", "Feb", 1400), ("Charlie", "Mar", 1600),
], ["salesperson", "month", "revenue"])

monthly_data.createOrReplaceTempView("monthly_sales")

print("Données originales (format long):")
monthly_data.show()

# PIVOT : transformer les mois en colonnes
pivoted = spark.sql("""
    SELECT * FROM monthly_sales
    PIVOT (
        SUM(revenue)
        FOR month IN ('Jan', 'Feb', 'Mar')
    )
""")

print("Après PIVOT (format large):")
pivoted.show()
Voir le code
# PIVOT avec DataFrame API
pivoted_df = monthly_data.groupBy("salesperson").pivot("month", ["Jan", "Feb", "Mar"]).sum("revenue")

print("PIVOT avec DataFrame API:")
pivoted_df.show()

3.2 UNPIVOT : colonnes → lignes

L’opération inverse de PIVOT.

Voir le code
# Créer une vue du DataFrame pivoté
pivoted_df.createOrReplaceTempView("pivoted_sales")

# UNPIVOT (Spark 3.4+)
# Pour les versions antérieures, utiliser stack()
unpivoted = spark.sql("""
    SELECT 
        salesperson,
        stack(3, 
            'Jan', Jan, 
            'Feb', Feb, 
            'Mar', Mar
        ) as (month, revenue)
    FROM pivoted_sales
""")

print("Après UNPIVOT (retour au format long):")
unpivoted.show()

4. Agrégations Avancées — GROUPING SETS, CUBE, ROLLUP

Ces fonctions permettent de calculer plusieurs niveaux d’agrégation en une seule requête.

4.1 GROUPING SETS : agrégations multiples

Voir le code
# GROUPING SETS : plusieurs agrégations en une requête
grouping_result = spark.sql("""
    SELECT 
        category,
        region,
        SUM(amount) as total_sales,
        COUNT(*) as num_transactions
    FROM sales
    GROUP BY GROUPING SETS (
        (category, region),  -- Par catégorie ET région
        (category),          -- Par catégorie seulement
        (region),            -- Par région seulement
        ()                   -- Total global
    )
    ORDER BY category NULLS LAST, region NULLS LAST
""")

print("📊 GROUPING SETS - Agrégations multiples:")
grouping_result.show()

4.2 ROLLUP : hiérarchie d’agrégations

ROLLUP(a, b, c) = GROUPING SETS ((a,b,c), (a,b), (a), ())

Voir le code
# ROLLUP : hiérarchie (catégorie → région → total)
rollup_result = spark.sql("""
    SELECT 
        category,
        region,
        SUM(amount) as total_sales
    FROM sales
    GROUP BY ROLLUP (category, region)
    ORDER BY category NULLS LAST, region NULLS LAST
""")

print("📊 ROLLUP - Hiérarchie d'agrégations:")
rollup_result.show()

4.3 CUBE : toutes les combinaisons

CUBE(a, b) = GROUPING SETS ((a,b), (a), (b), ())

Voir le code
# CUBE : toutes les combinaisons possibles
cube_result = spark.sql("""
    SELECT 
        category,
        region,
        SUM(amount) as total_sales
    FROM sales
    GROUP BY CUBE (category, region)
    ORDER BY category NULLS LAST, region NULLS LAST
""")

print("📊 CUBE - Toutes les combinaisons:")
cube_result.show()
Voir le code
# GROUPING() : identifier les sous-totaux
grouping_with_flags = spark.sql("""
    SELECT 
        category,
        region,
        SUM(amount) as total_sales,
        GROUPING(category) as is_category_subtotal,
        GROUPING(region) as is_region_subtotal,
        CASE 
            WHEN GROUPING(category) = 1 AND GROUPING(region) = 1 THEN 'GRAND TOTAL'
            WHEN GROUPING(category) = 1 THEN 'Region Subtotal'
            WHEN GROUPING(region) = 1 THEN 'Category Subtotal'
            ELSE 'Detail'
        END as row_type
    FROM sales
    GROUP BY CUBE (category, region)
    ORDER BY GROUPING(category), GROUPING(region), category, region
""")

print("📊 GROUPING() pour identifier les sous-totaux:")
grouping_with_flags.show()

5. Données Semi-Structurées — EXPLODE & JSON

5.1 EXPLODE : éclater arrays et maps

Voir le code
# Créer des données avec arrays
customers_with_tags = spark.createDataFrame([
    (1, "Alice", ["premium", "loyal", "newsletter"]),
    (2, "Bob", ["new", "newsletter"]),
    (3, "Charlie", ["premium", "vip"]),
], ["id", "name", "tags"])

customers_with_tags.createOrReplaceTempView("customers_tags")

print("Données avec arrays:")
customers_with_tags.show(truncate=False)

# EXPLODE : une ligne par tag
exploded = spark.sql("""
    SELECT id, name, tag
    FROM customers_tags
    LATERAL VIEW EXPLODE(tags) t AS tag
""")

print("Après EXPLODE:")
exploded.show()
Voir le code
# POSEXPLODE : avec position
posexploded = spark.sql("""
    SELECT id, name, pos, tag
    FROM customers_tags
    LATERAL VIEW POSEXPLODE(tags) t AS pos, tag
""")

print("POSEXPLODE (avec index):")
posexploded.show()
Voir le code
# EXPLODE avec Map
customers_with_attrs = spark.createDataFrame([
    (1, "Alice", {"city": "Paris", "country": "France"}),
    (2, "Bob", {"city": "London", "country": "UK", "postal": "SW1"}),
], ["id", "name", "attributes"])

customers_with_attrs.createOrReplaceTempView("customers_attrs")

print("Données avec Map:")
customers_with_attrs.show(truncate=False)

# EXPLODE sur Map → (key, value)
exploded_map = spark.sql("""
    SELECT id, name, key, value
    FROM customers_attrs
    LATERAL VIEW EXPLODE(attributes) t AS key, value
""")

print("EXPLODE sur Map:")
exploded_map.show()

5.2 Accès JSON

Voir le code
# Données JSON
events = spark.createDataFrame([
    (1, '{"user": "alice", "action": "click", "details": {"page": "home", "duration": 5}}'),
    (2, '{"user": "bob", "action": "purchase", "details": {"page": "cart", "amount": 99.99}}'),
], ["id", "json_data"])

events.createOrReplaceTempView("events")

# Extraire des champs JSON
json_extracted = spark.sql("""
    SELECT 
        id,
        get_json_object(json_data, '$.user') as user,
        get_json_object(json_data, '$.action') as action,
        get_json_object(json_data, '$.details.page') as page
    FROM events
""")

print("Extraction JSON avec get_json_object:")
json_extracted.show()
Voir le code
# Parser JSON complet avec schema
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

details_schema = StructType([
    StructField("page", StringType()),
    StructField("duration", DoubleType()),
    StructField("amount", DoubleType())
])

event_schema = StructType([
    StructField("user", StringType()),
    StructField("action", StringType()),
    StructField("details", details_schema)
])

parsed = events.withColumn("parsed", from_json(col("json_data"), event_schema))
parsed.select("id", "parsed.user", "parsed.action", "parsed.details.page").show()

6. CTEs & Subqueries — Structurer ses requêtes

6.1 Common Table Expressions (CTEs)

Voir le code
# CTE : structurer une requête complexe
cte_result = spark.sql("""
    WITH 
    -- Étape 1 : Agrégation par client
    customer_stats AS (
        SELECT 
            customer_id,
            COUNT(*) as num_orders,
            SUM(amount) as total_spent,
            AVG(amount) as avg_order
        FROM orders
        GROUP BY customer_id
    ),
    
    -- Étape 2 : Moyenne globale
    global_avg AS (
        SELECT AVG(total_spent) as avg_total_spent
        FROM customer_stats
    )
    
    -- Requête finale : clients au-dessus de la moyenne
    SELECT 
        c.*,
        g.avg_total_spent,
        CASE WHEN c.total_spent > g.avg_total_spent THEN 'Above Average' ELSE 'Below Average' END as status
    FROM customer_stats c
    CROSS JOIN global_avg g
    ORDER BY total_spent DESC
""")

print("📊 Analyse client avec CTEs:")
cte_result.show()

6.2 CTEs vs Subqueries

Aspect CTE Subquery
Lisibilité ✅ Excellent ⚠️ Peut être confus
Réutilisabilité ✅ Peut être référencé plusieurs fois ❌ Répétition nécessaire
Performance ⚠️ Pas toujours optimisé ⚠️ Variable

⚠️ Mythe : Les CTEs ne sont PAS toujours plus rapides. Catalyst les traite comme des subqueries inline.

Voir le code
# ❌ Subquery corrélée (potentiellement lent)
# SELECT * FROM orders o
# WHERE amount > (SELECT AVG(amount) FROM orders WHERE customer_id = o.customer_id)

# ✅ Window function (plus efficace)
efficient_query = spark.sql("""
    SELECT * FROM (
        SELECT 
            *,
            AVG(amount) OVER (PARTITION BY customer_id) as avg_customer_amount
        FROM orders
    )
    WHERE amount > avg_customer_amount
""")

print("✅ Commandes au-dessus de la moyenne du client (window function):")
efficient_query.show()

7. Optimisation SQL dans Spark

7.1 Join Hints

Voir le code
# Créer des tables pour démontrer les hints
spark.sql("DROP TABLE IF EXISTS dim_segment")
segments = spark.createDataFrame([
    ("Premium", 1.2), ("Standard", 1.0)
], ["segment", "multiplier"])
segments.createOrReplaceTempView("dim_segment")

# BROADCAST hint
broadcast_join = spark.sql("""
    SELECT /*+ BROADCAST(dim_segment) */ 
        o.*,
        s.multiplier,
        o.amount * s.multiplier as adjusted_amount
    FROM orders o
    JOIN dim_segment s ON o.segment = s.segment
""")

print("Plan avec BROADCAST hint:")
broadcast_join.explain()
print("\nRésultat:")
broadcast_join.show(5)

Hints disponibles :

Hint Usage Quand l’utiliser
/*+ BROADCAST(table) */ Force broadcast Petite table (< 100 MB)
/*+ MERGE(t1, t2) */ Force sort-merge join Grandes tables triées
/*+ SHUFFLE_HASH(t1) */ Force shuffle hash Tables moyennes
/*+ COALESCE(n) */ Réduit les partitions Avant écriture
Voir le code
# Collecter les statistiques (améliore l'optimiseur)
# Note : fonctionne sur des tables persistées, pas des vues temporaires

print("💡 Pour collecter des statistiques sur une table persistée :")
print("""
-- Statistiques de base
ANALYZE TABLE my_table COMPUTE STATISTICS

-- Statistiques sur colonnes spécifiques
ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1, col2

-- Voir les statistiques
DESCRIBE EXTENDED my_table
""")

7.2 Anti-patterns SQL

Anti-pattern Problème Solution
SELECT * Lit toutes les colonnes SELECT col1, col2
ORDER BY sans LIMIT Tri global coûteux Ajouter LIMIT
UDF dans WHERE Pas de pushdown Expression native
COUNT(DISTINCT) haute cardinalité Très lent APPROX_COUNT_DISTINCT
NOT IN avec NULL Résultats inattendus NOT EXISTS ou LEFT JOIN
Voir le code
# Approximation vs COUNT DISTINCT
comparison = spark.sql("""
    SELECT 
        COUNT(DISTINCT customer_id) as exact_count,
        APPROX_COUNT_DISTINCT(customer_id) as approx_count
    FROM orders
""")

print("COUNT DISTINCT vs APPROX_COUNT_DISTINCT:")
comparison.show()
print("💡 APPROX_COUNT_DISTINCT est ~10x plus rapide sur de grandes tables")

Mini-Projet : Datamart Customer Analytics

Objectif

Construire un datamart analytique client complet en utilisant toutes les techniques apprises.

Métriques à calculer

  • Date première commande (FIRST_VALUE)
  • Nombre de commandes (COUNT OVER)
  • Revenu total client (SUM OVER)
  • Délai moyen entre commandes (LAG + AVG)
  • Rang du client par segment (RANK)
  • % du revenu du segment (SUM OVER partition)

Architecture

┌─────────────┐     ┌─────────────┐
│   Orders    │     │  Segments   │
│   (fact)    │     │   (dim)     │
└──────┬──────┘     └──────┬──────┘
       │                   │
       └─────────┬─────────┘
                 │
       ┌─────────▼─────────┐
       │  CTE: enriched    │
       │  - joins          │
       │  - window funcs   │
       └─────────┬─────────┘
                 │
       ┌─────────▼─────────┐
       │  CTE: customer    │
       │      stats        │
       └─────────┬─────────┘
                 │
       ┌─────────▼─────────┐
       │    Datamart       │
       │   [Parquet]       │
       └───────────────────┘
Voir le code
import os
import shutil

# Créer des données plus riches
orders_extended = spark.createDataFrame([
    # Customer 101 - Premium, 3 commandes
    (1, 101, "2024-01-01", 150.0, "Premium"),
    (2, 101, "2024-01-15", 200.0, "Premium"),
    (3, 101, "2024-02-01", 180.0, "Premium"),
    # Customer 102 - Standard, 4 commandes
    (4, 102, "2024-01-05", 300.0, "Standard"),
    (5, 102, "2024-01-20", 250.0, "Standard"),
    (6, 102, "2024-02-10", 275.0, "Standard"),
    (7, 102, "2024-03-01", 320.0, "Standard"),
    # Customer 103 - Premium, 2 commandes
    (8, 103, "2024-01-10", 400.0, "Premium"),
    (9, 103, "2024-02-15", 450.0, "Premium"),
    # Customer 104 - Standard, 3 commandes
    (10, 104, "2024-01-03", 100.0, "Standard"),
    (11, 104, "2024-01-25", 120.0, "Standard"),
    (12, 104, "2024-02-20", 90.0, "Standard"),
    # Customer 105 - Premium, 1 commande
    (13, 105, "2024-02-01", 500.0, "Premium"),
], ["order_id", "customer_id", "order_date", "amount", "segment"])

orders_extended = orders_extended.withColumn("order_date", to_date(col("order_date")))
orders_extended.createOrReplaceTempView("orders_ext")

print("📦 Données source:")
orders_extended.show()
Voir le code
# Construction du Datamart avec CTEs et Window Functions
datamart = spark.sql("""
    WITH 
    -- Étape 1 : Enrichir les commandes avec métriques temporelles
    orders_enriched AS (
        SELECT 
            customer_id,
            segment,
            order_date,
            amount,
            
            -- Numéro de commande du client
            ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_number,
            
            -- Date de la commande précédente
            LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_date,
            
            -- Délai depuis la dernière commande
            DATEDIFF(
                order_date,
                LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date)
            ) as days_since_last
        FROM orders_ext
    ),
    
    -- Étape 2 : Agréger par client
    customer_stats AS (
        SELECT 
            customer_id,
            segment,
            
            -- Première commande
            MIN(order_date) as first_order_date,
            
            -- Dernière commande
            MAX(order_date) as last_order_date,
            
            -- Nombre de commandes
            COUNT(*) as total_orders,
            
            -- Revenus
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value,
            
            -- Délai moyen entre commandes (exclut NULL de la première commande)
            AVG(days_since_last) as avg_days_between_orders
        FROM orders_enriched
        GROUP BY customer_id, segment
    ),
    
    -- Étape 3 : Ajouter des métriques de segment
    customer_with_segment_stats AS (
        SELECT 
            *,
            
            -- Total du segment
            SUM(total_revenue) OVER (PARTITION BY segment) as segment_total_revenue,
            
            -- % du revenu du segment
            ROUND(total_revenue / SUM(total_revenue) OVER (PARTITION BY segment) * 100, 2) as pct_of_segment,
            
            -- Rang dans le segment
            RANK() OVER (PARTITION BY segment ORDER BY total_revenue DESC) as rank_in_segment,
            
            -- Nombre de clients dans le segment
            COUNT(*) OVER (PARTITION BY segment) as customers_in_segment
        FROM customer_stats
    )
    
    -- Résultat final
    SELECT 
        customer_id,
        segment,
        first_order_date,
        last_order_date,
        total_orders,
        ROUND(total_revenue, 2) as total_revenue,
        ROUND(avg_order_value, 2) as avg_order_value,
        ROUND(avg_days_between_orders, 1) as avg_days_between_orders,
        rank_in_segment,
        pct_of_segment,
        customers_in_segment
    FROM customer_with_segment_stats
    ORDER BY segment, rank_in_segment
""")

print("📊 DATAMART CUSTOMER ANALYTICS:")
datamart.show(truncate=False)
Voir le code
# Exporter le datamart
output_path = "/tmp/customer_datamart"
if os.path.exists(output_path):
    shutil.rmtree(output_path)

datamart.write.partitionBy("segment").parquet(output_path)

print(f"✅ Datamart exporté : {output_path}")
print(f"📁 Partitionné par segment")

# Vérifier la structure
for root, dirs, files in os.walk(output_path):
    level = root.replace(output_path, '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
Voir le code
# Résumé du mini-projet
print("""
╔══════════════════════════════════════════════════════════════╗
║           📊 RÉSUMÉ DU DATAMART                              ║
╠══════════════════════════════════════════════════════════════╣
║                                                              ║
║  Techniques utilisées :                                      ║
║  ✅ CTEs pour structurer la transformation                   ║
║  ✅ ROW_NUMBER() pour numéroter les commandes                ║
║  ✅ LAG() pour calculer le délai entre commandes             ║
║  ✅ RANK() pour classer les clients par segment              ║
║  ✅ SUM() OVER pour calculer les totaux de segment           ║
║  ✅ Partitionnement Parquet par segment                      ║
║                                                              ║
║  Métriques calculées :                                       ║
║  • Date première/dernière commande                           ║
║  • Nombre total de commandes                                 ║
║  • Revenu total et moyen                                     ║
║  • Délai moyen entre commandes                               ║
║  • Rang dans le segment                                      ║
║  • % du revenu du segment                                    ║
║                                                              ║
║  💡 Ce type de transformation sera automatisé                ║
║     avec dbt dans le module 26                               ║
║                                                              ║
╚══════════════════════════════════════════════════════════════╝
""")

Quiz de fin de module


❓ Q1. Quelle est la différence entre ROW_NUMBER() et RANK() ?

  1. ROW_NUMBER() est plus rapide
  2. RANK() gère les ex-aequo avec des gaps, ROW_NUMBER() donne des numéros uniques
  3. ROW_NUMBER() nécessite ORDER BY, pas RANK()
  4. Aucune différence
💡 Voir la réponse

Réponse : b — RANK() donne le même rang aux ex-aequo (1,1,3), ROW_NUMBER() donne toujours des numéros uniques (1,2,3).


❓ Q2. Quelle est la différence entre ROWS et RANGE dans une window frame ?

  1. ROWS est plus rapide
  2. RANGE supporte plus de fonctions
  3. ROWS se base sur la position physique, RANGE sur la valeur logique
  4. RANGE ne fonctionne qu’avec des dates
💡 Voir la réponse

Réponse : c — ROWS compte les lignes physiquement, RANGE peut inclure plusieurs lignes si elles ont la même valeur.


❓ Q3. Que fait la fonction GROUPING() ?

  1. Groupe les données
  2. Identifie si une colonne est agrégée (sous-total)
  3. Compte le nombre de groupes
  4. Trie les groupes
💡 Voir la réponse

Réponse : b — GROUPING() retourne 1 si la colonne est agrégée (NULL dans un sous-total), 0 sinon.


❓ Q4. Comment forcer un broadcast join en SQL Spark ?

  1. FORCE BROADCAST
  2. /*+ BROADCAST(table) */
  3. BROADCAST JOIN
  4. SET spark.broadcast = true
💡 Voir la réponse

Réponse : b — On utilise le hint SQL /*+ BROADCAST(table) */ après SELECT.


❓ Q5. PIVOT transforme… ?

  1. Colonnes en lignes
  2. Lignes en colonnes
  3. JSON en colonnes
  4. Arrays en lignes
💡 Voir la réponse

Réponse : b — PIVOT transforme les valeurs distinctes d’une colonne en colonnes séparées.


❓ Q6. EXPLODE est utilisé pour… ?

  1. Compresser les données
  2. Supprimer les doublons
  3. Transformer un array en plusieurs lignes
  4. Joindre des tables
💡 Voir la réponse

Réponse : c — EXPLODE éclate un array (ou map) en créant une ligne pour chaque élément.


❓ Q7. Les CTEs sont-ils toujours plus performants que les subqueries ?

  1. Oui, toujours
  2. Non, Catalyst les traite de manière similaire
  3. Oui, car ils sont matérialisés
  4. Non, ils sont toujours plus lents
💡 Voir la réponse

Réponse : b — C’est un mythe ! Catalyst inline les CTEs comme des subqueries. L’avantage est la lisibilité.


❓ Q8. Quel hint utiliser pour éviter un shuffle lors d’un join avec une petite table ?

  1. /*+ MERGE */
  2. /*+ SHUFFLE_HASH */
  3. /*+ BROADCAST */
  4. /*+ COALESCE */
💡 Voir la réponse

Réponse : c — BROADCAST envoie la petite table à tous les executors, évitant le shuffle.


❓ Q9. UNBOUNDED PRECEDING signifie… ?

  1. La ligne précédente
  2. Toutes les lignes depuis le début de la partition
  3. La première ligne de la table
  4. Aucune limite de mémoire
💡 Voir la réponse

Réponse : b — UNBOUNDED PRECEDING inclut toutes les lignes depuis le début de la partition jusqu’à la position actuelle.


❓ Q10. Comment collecter des statistiques sur une table pour améliorer l’optimiseur ?

  1. COMPUTE STATS table
  2. ANALYZE TABLE table COMPUTE STATISTICS
  3. COLLECT STATISTICS table
  4. DESCRIBE STATISTICS table
💡 Voir la réponse

Réponse : bANALYZE TABLE table COMPUTE STATISTICS collecte les statistiques pour l’optimiseur Catalyst.


📚 Ressources pour aller plus loin

🌐 Documentation officielle

📖 Articles & Tutoriels


➡️ Prochaine étape

Maintenant que tu maîtrises Spark SQL, passons à spark sur K8S !

👉 Module suivant : 21_spark_on_kubernetes - Spark & Kubernetes


📝 Récapitulatif de ce module

Concept Ce que tu as appris
Window Functions ROW_NUMBER, RANK, LAG/LEAD, FIRST_VALUE, frames
PIVOT/UNPIVOT Reshaping des données
GROUPING SETS CUBE, ROLLUP, agrégations multidimensionnelles
EXPLODE Données semi-structurées, JSON
CTEs Structurer les requêtes complexes
Optimisation Hints, statistiques, anti-patterns

🎉 Félicitations ! Tu as terminé le module Spark SQL Deep Dive.

Voir le code
# Nettoyage
spark.stop()
print("✅ SparkSession arrêtée")

# Nettoyage des fichiers temporaires (optionnel)
# import shutil
# if os.path.exists("/tmp/customer_datamart"):
#     shutil.rmtree("/tmp/customer_datamart")
# print("🧹 Fichiers temporaires supprimés")
Retour au sommet