PySpark Advanced

Bienvenue dans ce module avancé où tu vas apprendre à optimiser Spark comme un expert. Tu découvriras l’architecture interne, les techniques d’optimisation, et comment diagnostiquer et résoudre les problèmes de performance.


Prérequis

Niveau Compétence
✅ Requis Module 11 : PySpark for Data Engineering (bases Spark)
✅ Requis Module 18 : High Performance Python
💡 Recommandé Expérience avec des datasets > 1 Go

Objectifs du module

À la fin de ce module, tu seras capable de :

  • Comprendre l’architecture interne de Spark (Catalyst, Tungsten)
  • Exécuter Spark en production avec spark-submit
  • Optimiser les partitions, shuffles et joins
  • Diagnostiquer un job lent avec Spark UI
  • Réduire le temps d’exécution de 80-90%

Objectif concret

Transformer un pipeline de 20 minutes en 2 minutes.

C’est ce qui distingue un Data Engineer junior d’un senior sur Spark.


1. Rappels Spark Essentiels

💡 Si tu as suivi le module 11 (PySpark for Data Engineering), cette section est un rappel rapide.

Sinon, commence par ce module avant de continuer — les concepts de base sont indispensables.

1.1 SparkSession

Voir le code
from pyspark.sql import SparkSession

# Créer une SparkSession (point d'entrée unique)
spark = SparkSession.builder \
    .appName("PySpark Advanced") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"App name: {spark.sparkContext.appName}")

1.2 DataFrame vs RDD

Aspect RDD DataFrame
API Bas niveau Haut niveau
Optimisation Manuelle Catalyst (automatique)
Performance Baseline 10-100x plus rapide
Usage Legacy, cas spéciaux Standard

👉 Règle : Toujours utiliser DataFrame/Dataset, jamais RDD (sauf cas très spécifiques).

1.3 Transformations vs Actions

Type Exemples Exécution
Transformation filter, select, join, groupBy Lazy (différée)
Action count, collect, write, show Immédiate

1.4 Lazy Evaluation

df.filter(...)     # Rien ne s'exécute
  .select(...)     # Rien ne s'exécute
  .groupBy(...)    # Rien ne s'exécute
  .count()         # MAINTENANT tout s'exécute !

Avantage : Spark peut optimiser l’ensemble du pipeline avant exécution.


2. Architecture Interne — Ce que les débutants ne savent pas

La vraie maîtrise de Spark commence ici. Comprendre l’architecture interne te permet de prédire et résoudre les problèmes de performance.

2.1 Le cycle de vie d’un Job Spark

Code Python/SQL
      │
      ▼
┌─────────────────────┐
│    Logical Plan     │  Arbre d'opérations (ce que tu veux faire)
│    (non optimisé)   │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│      Catalyst       │  🧠 Optimiseur de requêtes
│      Optimizer      │  - Predicate pushdown
└──────────┬──────────┘  - Projection pruning
           │             - Join reordering
           ▼
┌─────────────────────┐
│   Physical Plan     │  Comment exécuter (stratégie)
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│        DAG          │  Directed Acyclic Graph
│   (Stages + Tasks)  │  - Stages = unités de travail
└──────────┬──────────┘  - Tasks = exécution par partition
           │
           ▼
┌─────────────────────┐
│      Tungsten       │  ⚡ Exécution optimisée
│       Engine        │  - Code generation
└─────────────────────┘  - Off-heap memory

2.2 Catalyst Optimizer — L’arme secrète

Optimisation Description Gain
Predicate pushdown Filtre appliqué à la source (Parquet, DB) I/O réduit drastiquement
Projection pruning Colonnes inutiles non lues I/O réduit
Constant folding Calculs constants pré-calculés CPU réduit
Join reordering Ordre optimal des joins Shuffle réduit

2.3 Tungsten Engine

  • Off-heap memory : stockage hors JVM → évite le Garbage Collector
  • Whole-stage code generation : génère du bytecode optimisé à la volée
  • Vectorized execution : traitement par batch (comme Polars !)
Voir le code
from pyspark.sql.functions import col, sum as spark_sum

# Créer des données de test
data = [(i, f"cat_{i % 5}", float(i * 10)) for i in range(1000)]
df = spark.createDataFrame(data, ["id", "category", "amount"])

# Pipeline avec transformations
result = (
    df
    .filter(col("amount") > 100)
    .select("category", "amount")
    .groupBy("category")
    .agg(spark_sum("amount").alias("total"))
)

# Voir le plan d'exécution
print("=" * 50)
print("PLAN D'EXÉCUTION (explain)")
print("=" * 50)
result.explain("formatted")
Voir le code
# Plan complet avec toutes les étapes
print("PLAN COMPLET (Parsed → Analyzed → Optimized → Physical)")
print("=" * 60)
result.explain(True)

3. spark-submit — Exécuter Spark comme un pro

Compétence indispensable en entreprise. Très peu de formations l’enseignent correctement.

3.1 Pourquoi spark-submit ?

Contexte Outil Usage
Exploration, développement Notebooks (Jupyter, Databricks) Dev, prototypage
Production, CI/CD, scheduling spark-submit Déploiement réel

💡 En entreprise, spark-submit est rarement lancé manuellement. Il est appelé par : - Airflow (orchestration) → Module 25 - CI/CD pipelines (GitLab CI, GitHub Actions) - Schedulers (cron, Kubernetes CronJobs)

3.2 Deploy Modes

CLIENT MODE                          CLUSTER MODE
═══════════                          ════════════

┌─────────────┐                      ┌─────────────┐
│   Client    │                      │   Client    │
│   Machine   │                      │   Machine   │
│  ┌───────┐  │                      │             │
│  │Driver │  │ ◄── Driver ici       │  (submit)   │
│  └───────┘  │                      └──────┬──────┘
└──────┬──────┘                             │
       │                                    │
       ▼                                    ▼
┌─────────────┐                      ┌─────────────┐
│   Cluster   │                      │   Cluster   │
│  ┌───────┐  │                      │  ┌───────┐  │
│  │Exec 1 │  │                      │  │Driver │  │ ◄── Driver ici
│  ├───────┤  │                      │  ├───────┤  │
│  │Exec 2 │  │                      │  │Exec 1 │  │
│  ├───────┤  │                      │  ├───────┤  │
│  │Exec 3 │  │                      │  │Exec 2 │  │
│  └───────┘  │                      │  └───────┘  │
└─────────────┘                      └─────────────┘

Usage: Debug, interactif            Usage: Production
Mode Driver tourne sur Usage
client Machine qui soumet Debug, logs visibles
cluster Worker du cluster Production

3.3 Resource Managers (Masters)

Master Commande Usage
Local --master local[*] Dev/test (tous les cores)
Local (N cores) --master local[4] Dev/test (4 cores)
Standalone --master spark://host:7077 Cluster Spark simple
YARN --master yarn Clusters Hadoop
Kubernetes --master k8s://https://... Cloud native

3.4 Syntaxe complète spark-submit

spark-submit \
  # === Resource Manager ===
  --master local[4] \
  --deploy-mode client \
  
  # === Ressources ===
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  
  # === Configuration Spark ===
  --conf spark.sql.shuffle.partitions=200 \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.executor.memoryOverhead=2g \
  
  # === Dépendances ===
  --packages io.delta:delta-spark_2.12:3.2.0 \
  --jars /path/to/postgres-42.7.jar \
  --py-files utils.zip \
  
  # === Application ===
  main.py \
  
  # === Arguments application ===
  --date 2024-01-01 \
  --env prod

3.5 Structure projet production

spark_project/
├── src/
│   ├── __init__.py
│   ├── main.py              # Point d'entrée
│   ├── etl/
│   │   ├── __init__.py
│   │   ├── extract.py
│   │   ├── transform.py
│   │   └── load.py
│   └── utils/
│       ├── __init__.py
│       ├── config.py
│       └── logger.py
├── config/
│   ├── dev.yaml
│   └── prod.yaml
├── jars/
│   └── postgres-42.7.jar
├── scripts/
│   ├── run_local.sh
│   └── run_cluster.sh
├── tests/
│   └── test_transform.py
├── requirements.txt
├── setup.py                 # Pour créer .whl
└── README.md

3.6 Packaging pour production

# ❌ MAUVAIS : liste de fichiers (difficile à maintenir)
--py-files utils.py,config.py,helpers.py

# ✅ MIEUX : Package .zip
cd src/ && zip -r ../app.zip . && cd ..
spark-submit --py-files app.zip main.py

# ✅ MEILLEUR : Wheel (.whl) - le plus propre
pip wheel . -w dist/
spark-submit --py-files dist/myproject-1.0.0-py3-none-any.whl main.py

💡 En production, préfère .zip ou .whl pour un déploiement propre et versionné.

Voir le code
# Pattern : arguments en ligne de commande (main.py)

# === Exemple de main.py pour spark-submit ===
example_main = '''
#!/usr/bin/env python3
"""Point d'entrée du job Spark."""

import argparse
from pyspark.sql import SparkSession

def parse_args():
    parser = argparse.ArgumentParser(description="ETL Pipeline")
    parser.add_argument("--date", required=True, help="Date de traitement (YYYY-MM-DD)")
    parser.add_argument("--env", default="dev", choices=["dev", "prod"])
    parser.add_argument("--input", required=True, help="Chemin input")
    parser.add_argument("--output", required=True, help="Chemin output")
    return parser.parse_args()

def main():
    args = parse_args()
    
    spark = SparkSession.builder \
        .appName(f"ETL-{args.env}-{args.date}") \
        .getOrCreate()
    
    # Charger les données
    df = spark.read.parquet(f"{args.input}/date={args.date}")
    
    # Transformations...
    result = df.filter(df.amount > 0)
    
    # Écrire
    result.write.mode("overwrite").parquet(f"{args.output}/date={args.date}")
    
    spark.stop()

if __name__ == "__main__":
    main()
'''

print(example_main)

4. Partitionnement & Shuffle — LA source de lenteur

80% des problèmes de performance Spark viennent du shuffle et du partitionnement.

4.1 Qu’est-ce qu’un Shuffle ?

Le shuffle est la redistribution des données entre partitions. Il est nécessaire pour :

  • groupBy, reduceByKey
  • join (sauf broadcast)
  • distinct, repartition
  • orderBy (tri global)
SHUFFLE = GOULOT D'ÉTRANGLEMENT
══════════════════════════════

Stage 1                              Stage 2
┌─────────┐                          ┌─────────┐
│Partition│                          │Partition│
│    1    │───────┐          ┌──────▶│   1'    │
└─────────┘       │          │       └─────────┘
┌─────────┐       │          │       ┌─────────┐
│Partition│───────┼──────────┼──────▶│Partition│
│    2    │       │  SHUFFLE │       │   2'    │
└─────────┘       │ (réseau) │       └─────────┘
┌─────────┐       │          │       ┌─────────┐
│Partition│───────┘          └──────▶│Partition│
│    3    │                          │   3'    │
└─────────┘                          └─────────┘

Coût du Shuffle :
1. Sérialisation des données
2. Écriture sur disque (shuffle write)
3. Transfert réseau
4. Lecture depuis disque (shuffle read)
5. Désérialisation

4.2 repartition vs coalesce

Méthode Shuffle Usage
repartition(n) Toujours Augmenter partitions, rééquilibrer
coalesce(n) ❌ Non (si réduction) Réduire partitions avant write
Voir le code
from pyspark.sql.functions import spark_partition_id

# Créer un DataFrame
df = spark.range(0, 1000000)
print(f"Partitions initiales : {df.rdd.getNumPartitions()}")

# repartition = SHUFFLE (redistribue les données)
df_repart = df.repartition(10)
print(f"Après repartition(10) : {df_repart.rdd.getNumPartitions()}")

# coalesce = PAS DE SHUFFLE (combine les partitions existantes)
df_coal = df_repart.coalesce(3)
print(f"Après coalesce(3) : {df_coal.rdd.getNumPartitions()}")

# Voir la distribution des données par partition
print("\nDistribution après repartition(10):")
df_repart.groupBy(spark_partition_id().alias("partition")).count().orderBy("partition").show()

4.3 Taille optimale des partitions

Taille partition Problème
Trop petit (< 10 MB) Overhead de scheduling, trop de tâches
Optimal (128-256 MB) ✅ Sweet spot
Trop grand (> 1 GB) OOM, mauvaise parallélisation

Formule :

num_partitions = data_size_mb / target_partition_size_mb

Exemple : 50 GB de données
num_partitions = 50000 MB / 200 MB = 250 partitions

4.4 Data Skew — Le tueur de performance

ÉQUILIBRÉ (bon)                     SKEWED (problème !)
═══════════════                     ══════════════════

┌────┐┌────┐┌────┐┌────┐            ┌────┐┌────┐┌────┐┌───────────────────┐
│ 1M ││ 1M ││ 1M ││ 1M │            │100K││100K││100K││        10M        │
└────┘└────┘└────┘└────┘            └────┘└────┘└────┘└───────────────────┘

Temps: ████                         Temps: ████████████████████████████████
       4 tâches parallèles                 ↑
       = temps minimal                     Un seul executor bloqué !
                                           Les autres attendent...

Techniques anti-skew :

  1. Broadcast join : si une table est petite (< 100 MB)
  2. Salting : ajouter une clé aléatoire pour distribuer
  3. AQE (Adaptive Query Execution) : Spark 3.0+ gère automatiquement
Voir le code
# Activer AQE (recommandé Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Vérifier la configuration
print("AQE enabled:", spark.conf.get("spark.sql.adaptive.enabled"))
print("Skew Join enabled:", spark.conf.get("spark.sql.adaptive.skewJoin.enabled"))

4.5 Caching & Persistence

Quand utiliser le cache ?

  • DataFrame réutilisé dans plusieurs actions
  • Calcul coûteux (join, aggregation) réutilisé
  • Itérations (ML training)

cache() vs persist()

# cache() = persist(StorageLevel.MEMORY_AND_DISK)
df.cache()

# persist() = contrôle fin du niveau de stockage
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY)
Niveau RAM Disque Sérialisé Usage
MEMORY_ONLY Petit DF, RAM suffisante
MEMORY_AND_DISK Défaut (cache())
MEMORY_ONLY_SER Économie RAM
DISK_ONLY Très gros DF
OFF_HEAP Éviter GC Java
Voir le code
from pyspark import StorageLevel
import time

# Créer un DataFrame avec calculs
df = spark.range(0, 5000000).withColumn("squared", col("id") ** 2)

# Sans cache : chaque action recalcule tout
start = time.time()
count1 = df.filter(col("squared") > 1000000).count()
count2 = df.filter(col("squared") < 100).count()
print(f"Sans cache : {time.time() - start:.2f}s")

# Avec cache : calcul une seule fois
df_cached = df.cache()

start = time.time()
count1 = df_cached.filter(col("squared") > 1000000).count()
count2 = df_cached.filter(col("squared") < 100).count()
print(f"Avec cache : {time.time() - start:.2f}s")

# ⚠️ IMPORTANT : libérer la mémoire !
df_cached.unpersist()
print("\n✅ Cache libéré avec unpersist()")

⚠️ Quand NE PAS cacher :

Situation Pourquoi
DF utilisé une seule fois Gaspillage mémoire
DF très volumineux (> RAM) Débordement disque lent
Avant un shuffle Inutile, données redistribuées
Pipeline simple et rapide Overhead du cache > gain

5. Joins Avancés — La compétence qui change tout

Un excellent Data Engineer sait optimiser ses joins. C’est souvent là que se gagne (ou se perd) le plus de temps.

5.1 Types de Joins internes Spark

Type Quand Spark l’utilise Performance
Broadcast Hash Join Petite table (< seuil) ⭐⭐⭐ Meilleur
Sort Merge Join Grandes tables ⭐⭐ Standard
Shuffle Hash Join Tables moyennes ⭐ Éviter si possible

5.2 Broadcast Join — Ton meilleur ami

SORT MERGE JOIN (shuffle)           BROADCAST JOIN (pas de shuffle)
═════════════════════════           ══════════════════════════════

   Big Table      Small Table          Big Table      Small Table
   ┌──────┐       ┌──────┐             ┌──────┐       ┌──────┐
   │  P1  │       │  P1  │             │  P1  │◄──────│      │
   │  P2  │       │  P2  │             │  P2  │◄──────│ COPY │
   │  P3  │       │  P3  │             │  P3  │◄──────│      │
   └──┬───┘       └──┬───┘             └──────┘       └──────┘
      │   SHUFFLE    │                    │
      └──────┬───────┘               Broadcast to
             │                       all executors
         ┌───▼───┐                   (pas de shuffle !)
         │ JOIN  │
         └───────┘
Voir le code
from pyspark.sql.functions import broadcast

# Grande table (10M lignes simulées)
big_df = spark.range(0, 1000000).withColumn("category_id", (col("id") % 100).cast("int"))

# Petite table (100 lignes)
small_df = spark.createDataFrame(
    [(i, f"Category {i}") for i in range(100)],
    ["category_id", "category_name"]
)

# ❌ SANS broadcast hint (Spark peut ou non broadcaster)
result_no_hint = big_df.join(small_df, "category_id")
print("Sans broadcast hint:")
result_no_hint.explain()

print("\n" + "="*50 + "\n")

# ✅ AVEC broadcast hint (force le broadcast)
result_broadcast = big_df.join(broadcast(small_df), "category_id")
print("Avec broadcast():")
result_broadcast.explain()

5.3 Configurer le seuil de broadcast

# Défaut : 10 MB
# Si une table < 10 MB → broadcast automatique

# Augmenter pour broadcaster des tables plus grandes
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100 MB

# Désactiver le broadcast automatique (forcer shuffle)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

5.4 Join Hints (Spark 3.0+)

# DataFrame API
big_df.join(small_df.hint("broadcast"), "key")

# SQL
spark.sql("""
    SELECT /*+ BROADCAST(small_table) */ *
    FROM big_table
    JOIN small_table ON big_table.key = small_table.key
""")

5.5 Anti-pattern : Join sur clé skewed

Voir le code
# Configuration pour gérer le skew automatiquement (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

print("AQE Skew Join activé")
print("Spark va automatiquement détecter et gérer les partitions skewed")

6. Lecture/Écriture Optimisées

6.1 Formats : Parquet toujours !

Format Lecture Écriture Compression Predicate Pushdown
CSV 🐢 Lent 🐢 Lent
JSON 🐢 Lent 🐢 Lent
Parquet 🚀 Rapide 🚀 Rapide
ORC 🚀 Rapide 🚀 Rapide
Delta 🚀 Rapide 🚀 Rapide ✅ + ACID

🔭 Les formats modernes (Delta, Iceberg, Hudi) seront approfondis dans le module 21 (Lakehouse) :

  • Transactions ACID
  • Time Travel
  • Vacuum, Compaction
  • Z-Ordering

6.2 Predicate Pushdown

Voir le code
import os
import shutil

# Créer des données de test
test_data = spark.range(0, 100000).withColumn("category", (col("id") % 10).cast("string"))

# Sauvegarder en Parquet
output_path = "/tmp/test_parquet"
if os.path.exists(output_path):
    shutil.rmtree(output_path)
test_data.write.parquet(output_path)

# Lecture avec filtre → predicate pushdown
df_filtered = spark.read.parquet(output_path).filter(col("id") < 100)

print("Plan d'exécution avec Predicate Pushdown:")
df_filtered.explain()
# Observe "PushedFilters" dans le plan !

6.3 Partitionnement sur disque

Voir le code
from pyspark.sql.functions import year, month, dayofmonth, lit
from datetime import datetime, timedelta
import random

# Créer des données avec dates
dates = [(datetime(2024, 1, 1) + timedelta(days=random.randint(0, 90))).strftime("%Y-%m-%d") 
         for _ in range(10000)]
data = [(i, dates[i % len(dates)], float(random.randint(10, 1000))) 
        for i in range(10000)]

df = spark.createDataFrame(data, ["id", "date", "amount"])
df = df.withColumn("date", col("date").cast("date"))
df = df.withColumn("year", year("date")).withColumn("month", month("date"))

# Écriture partitionnée
output_partitioned = "/tmp/partitioned_data"
if os.path.exists(output_partitioned):
    shutil.rmtree(output_partitioned)

df.write.partitionBy("year", "month").parquet(output_partitioned)

print("Structure créée:")
for root, dirs, files in os.walk(output_partitioned):
    level = root.replace(output_partitioned, '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
    if level < 2:  # Limiter la profondeur
        for d in sorted(dirs)[:3]:
            print(f"{indent}  {d}/")

6.4 Schémas explicites

Pourquoi définir un schéma ?

  • Évite l’inférence (coûteuse sur gros fichiers)
  • Garantit les types attendus
  • Détecte les erreurs tôt
Voir le code
from pyspark.sql.types import StructType, StructField, LongType, StringType, DecimalType, DateType

# Définir un schéma explicite
schema = StructType([
    StructField("id", LongType(), nullable=False),
    StructField("customer_name", StringType(), nullable=True),
    StructField("amount", DecimalType(10, 2), nullable=True),  # Précision pour montants !
    StructField("transaction_date", DateType(), nullable=True)
])

print("Schéma défini:")
print(schema.simpleString())

# Utilisation
# df = spark.read.schema(schema).parquet("data/transactions/")

print("\n💡 Conseil : Utiliser DecimalType pour les montants financiers")
print("   Évite les erreurs d'arrondi des float/double")

7. UDFs : Le piège de performance

7.1 Pourquoi les Python UDFs sont toxiques

┌─────────────────┐                    ┌─────────────────┐
│                 │   Sérialisation    │                 │
│       JVM       │ ────────────────▶  │     Python      │
│     (Spark)     │                    │   (UDF lente)   │
│                 │ ◀────────────────  │                 │
└─────────────────┘   Désérialisation  └─────────────────┘
                            │
                            │
                    TRÈS COÛTEUX !
                    (par ligne)

7.2 Hiérarchie de performance

Type Performance Quand l’utiliser
Expressions Spark natives ⭐⭐⭐⭐⭐ Toujours si possible
Pandas UDF (vectorized) ⭐⭐⭐ Si besoin Python
Python UDF Dernier recours

7.3 Remplacer UDF par expressions natives

Voir le code
from pyspark.sql.functions import udf, when, col
from pyspark.sql.types import StringType
import time

# Créer des données de test
df = spark.range(0, 500000).withColumn("amount", (col("id") % 2000).cast("double"))

# ❌ MAUVAIS : Python UDF
@udf(StringType())
def categorize_udf(amount):
    if amount > 1000:
        return "high"
    elif amount > 100:
        return "medium"
    return "low"

start = time.time()
result_udf = df.withColumn("category", categorize_udf(col("amount")))
result_udf.write.mode("overwrite").format("noop").save()  # Force l'exécution
udf_time = time.time() - start
print(f"❌ Python UDF : {udf_time:.2f}s")

# ✅ BON : Expression Spark native
start = time.time()
result_native = df.withColumn("category",
    when(col("amount") > 1000, "high")
    .when(col("amount") > 100, "medium")
    .otherwise("low")
)
result_native.write.mode("overwrite").format("noop").save()
native_time = time.time() - start
print(f"✅ Expression native : {native_time:.2f}s")

print(f"\n📊 Speedup : {udf_time/native_time:.1f}x plus rapide avec expression native !")

7.4 Pandas UDF (si Python nécessaire)

Voir le code
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np

# Pandas UDF = vectorisé (traite des Series, pas des scalaires)
@pandas_udf("double")
def log_transform(s: pd.Series) -> pd.Series:
    return np.log1p(s)

# Test
df = spark.range(0, 100000).withColumn("value", col("id").cast("double"))

start = time.time()
result = df.withColumn("log_value", log_transform(col("value")))
result.write.mode("overwrite").format("noop").save()
print(f"Pandas UDF : {time.time() - start:.2f}s")

result.show(5)

8. Configuration & Tuning

8.1 Paramètres essentiels

Paramètre Défaut Recommandation
spark.sql.shuffle.partitions 200 Adapter à la taille des données
spark.default.parallelism Selon cluster 2-3x num_cores
spark.sql.autoBroadcastJoinThreshold 10MB 50-100MB
spark.executor.memory 1g 4-16g selon cluster
spark.driver.memory 1g 2-8g
spark.executor.memoryOverhead 10% 15-20% pour PySpark

8.2 Dimensionnement : Executors vs Cores

❌ MAUVAIS : 2 executors × 10 cores chacun
   - GC Java doit gérer énorme heap (~50 GB)
   - Si 1 executor crash → 50% de perte
   - Parallélisme moins granulaire

✅ BON : 10 executors × 4 cores chacun  
   - GC plus efficace (heap ~10 GB)
   - Meilleure isolation des erreurs
   - Parallélisme plus granulaire

Règles de dimensionnement :

executor_cores = 4-5 max (sweet spot pour GC)
executor_memory = 4-16g (selon données)
num_executors = (total_cores / executor_cores) - 1

# Réserver pour le Driver et l'OS
driver_memory = 2-8g
memoryOverhead = 15-20% pour PySpark (sérialisation Python)

Exemple concret (cluster 100 cores, 400 GB RAM) :

spark-submit \
  --executor-cores 4 \
  --executor-memory 12g \
  --num-executors 20 \
  --driver-memory 4g \
  --conf spark.executor.memoryOverhead=2g \
  main.py

8.3 Adaptive Query Execution (AQE)

L’AQE (Spark 3.0+) rend certaines optimisations manuelles obsolètes :

Avant AQE (manuel) Avec AQE (automatique)
coalesce(n) après filter Auto-coalesce des partitions
Calculer shuffle.partitions Auto-optimize partitions
Détecter skew manuellement Auto-skew handling
Broadcast threshold fixe Runtime broadcast decisions
Voir le code
# Configuration AQE complète (recommandée)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# Avec AQE, tu peux laisser shuffle.partitions élevé
# Spark optimisera automatiquement
spark.conf.set("spark.sql.shuffle.partitions", "1000")

print("✅ Configuration AQE optimale :")
print(f"   adaptive.enabled = {spark.conf.get('spark.sql.adaptive.enabled')}")
print(f"   coalescePartitions = {spark.conf.get('spark.sql.adaptive.coalescePartitions.enabled')}")
print(f"   skewJoin = {spark.conf.get('spark.sql.adaptive.skewJoin.enabled')}")
print(f"   shuffle.partitions = {spark.conf.get('spark.sql.shuffle.partitions')}")
print("\n💡 Avec AQE, Spark ajuste automatiquement le nombre de partitions !")

9. Spark UI & Diagnostic

Savoir lire Spark UI = savoir debugger. C’est la compétence qui fait la différence.

9.1 Accéder à Spark UI

  • Local : http://localhost:4040
  • Cluster : via le Resource Manager (YARN, K8s dashboard)
  • Databricks : intégré dans l’interface

9.2 Les onglets importants

Onglet Information
Jobs Vue d’ensemble des jobs, durée
Stages Détail des stages, shuffle read/write
Storage DataFrames en cache
Environment Configuration Spark
Executors Ressources, GC, mémoire
SQL Plans d’exécution des requêtes

9.3 Métriques à surveiller

Métrique Signification 🚨 Problème si…
Shuffle Read/Write Données échangées Très élevé (> 10 GB)
Spill (Memory/Disk) Débordement mémoire > 0
Task Duration Temps par tâche Très variable (skew !)
GC Time Garbage Collection > 10% du temps total
Input/Output Données lues/écrites Beaucoup plus que prévu

9.4 Patterns de problèmes

Symptôme dans Spark UI Diagnostic Solution
1 tâche 10x plus longue Data skew Salting, broadcast, AQE
Shuffle > 50 GB Join non optimisé Broadcast join
Spill to disk Mémoire insuffisante Plus de RAM, moins de partitions
GC Time > 20% Trop d’objets Java Tungsten, plus de memoryOverhead
Beaucoup de petites tâches Trop de partitions Coalesce, AQE
Voir le code
# URL de Spark UI pour cette session
print(f"🔍 Spark UI disponible sur : {spark.sparkContext.uiWebUrl}")
print("\n📊 Pour voir les métriques d'un job, lance une action puis consulte l'UI")

# Exemple : déclencher un job pour voir dans l'UI
df = spark.range(0, 1000000)
result = df.groupBy((col("id") % 100).alias("group")).count()
result.collect()  # Action qui déclenche le job

print("\n✅ Job exécuté - consulte Spark UI pour voir les stages et métriques")

10. Bonnes pratiques & Anti-patterns

❌ Anti-patterns (à éviter absolument)

Anti-pattern Pourquoi c’est mal Solution
collect() sur 100M lignes OOM Driver garanti write() vers fichier
CSV en production Lent, pas de schema Parquet/Delta
UDF Python partout 10-100x plus lent Expressions natives
shuffle.partitions=200 toujours Pas adapté aux données Ajuster ou AQE
Pas de cache() sur DF réutilisé Recalcul inutile cache() + unpersist()
Ignorer Spark UI Debug à l’aveugle Toujours vérifier
Join sans broadcast Shuffle énorme broadcast() sur petites tables
repartition() avant write() Shuffle inutile coalesce() pour réduire

✅ Bonnes pratiques

Pratique Bénéfice
Toujours Parquet I/O optimisé, predicate pushdown
Broadcast petites tables Évite shuffle
AQE activé Optimisation runtime
Partitionner par date Partition pruning
Éviter UDFs Performance native
Monitorer Spark UI Debug efficace
Cache + unpersist Évite recalculs
Schema explicite Évite inférence

Mini-Projet : Optimisation d’un pipeline

Objectif

Réduire un pipeline de 20 minutes à < 3 minutes en appliquant les techniques apprises.

Scénario : E-commerce Analytics

Table Lignes Format initial
Transactions 5M Parquet
Produits 10K CSV
Clients 500K Parquet

Architecture cible

┌────────────────┐    ┌────────────────┐    ┌────────────────┐
│  Transactions  │    │    Produits    │    │    Clients     │
│    (5M rows)   │    │   (10K rows)   │    │   (500K rows)  │
│   [Parquet]    │    │ [CSV→Parquet]  │    │   [Parquet]    │
└───────┬────────┘    └───────┬────────┘    └───────┬────────┘
        │                     │ broadcast           │
        │                     ▼                     │
        └─────────────────────┼─────────────────────┘
                              │
                              ▼
                 ┌───────────────────────────────┐
                 │      Join + Aggregation       │
                 │   - Broadcast products        │
                 │   - AQE enabled               │
                 │   - Native expressions        │
                 └───────────────┬───────────────┘
                                 │
                                 ▼
                 ┌───────────────────────────────┐
                 │      Partitioned Output       │
                 │   partitionBy("year","month") │
                 │         [Parquet]             │
                 └───────────────────────────────┘
Voir le code
import os
import shutil
from pyspark.sql.functions import *
from datetime import datetime, timedelta
import random
import time

# Setup : créer les données de test
print("📦 Création des données de test...")

# Configuration optimale
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Nettoyer les répertoires
for path in ["/tmp/transactions", "/tmp/products", "/tmp/customers", "/tmp/output"]:
    if os.path.exists(path):
        shutil.rmtree(path)

# 1. Transactions (5M lignes)
transactions = spark.range(0, 500000).select(
    col("id").alias("transaction_id"),
    (col("id") % 10000).alias("product_id"),
    (col("id") % 50000).alias("customer_id"),
    (rand() * 1000).alias("amount"),
    date_add(lit("2024-01-01"), (rand() * 90).cast("int")).alias("date")
)
transactions.write.parquet("/tmp/transactions")
print(f"Transactions : {transactions.count()} lignes")

# 2. Produits (10K lignes) - CSV intentionnellement
products_data = [(i, f"Product_{i}", f"Category_{i % 50}", float(10 + i % 100)) 
                 for i in range(10000)]
products = spark.createDataFrame(products_data, 
    ["product_id", "product_name", "category", "base_price"])
products.write.mode("overwrite").option("header", True).csv("/tmp/products")
print(f"Produits : {products.count()} lignes (CSV)")

# 3. Clients (500K lignes)
customers = spark.range(0, 50000).select(
    col("id").alias("customer_id"),
    concat(lit("Customer_"), col("id")).alias("customer_name"),
    (col("id") % 5).alias("segment")
)
customers.write.parquet("/tmp/customers")
print(f"Clients : {customers.count()} lignes")

print("\n Données créées avec succès !")
Voir le code
# ❌ VERSION NON OPTIMISÉE (baseline)
print("="*50)
print("❌ PIPELINE NON OPTIMISÉ")
print("="*50)

# Désactiver AQE pour le baseline
spark.conf.set("spark.sql.adaptive.enabled", "false")

start = time.time()

# Lecture
transactions_df = spark.read.parquet("/tmp/transactions")
products_df = spark.read.option("header", True).option("inferSchema", True).csv("/tmp/products")  # ❌ Inférence
customers_df = spark.read.parquet("/tmp/customers")

# UDF non optimisé
@udf(StringType())
def get_amount_category(amount):
    if amount > 500: return "high"
    elif amount > 100: return "medium"
    return "low"

# Joins sans broadcast
result = transactions_df \
    .join(products_df, "product_id") \
    .join(customers_df, "customer_id") \
    .withColumn("amount_category", get_amount_category(col("amount"))) \
    .groupBy("category", "segment", "amount_category") \
    .agg(
        count("*").alias("num_transactions"),
        sum("amount").alias("total_amount")
    )

# Écriture non partitionnée
if os.path.exists("/tmp/output"):
    shutil.rmtree("/tmp/output")
result.write.parquet("/tmp/output")

baseline_time = time.time() - start
print(f"\n⏱️ Temps baseline : {baseline_time:.2f}s")
Voir le code
# ✅ VERSION OPTIMISÉE
print("="*50)
print("✅ PIPELINE OPTIMISÉ")
print("="*50)

# Activer AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

start = time.time()

# 1. Lecture avec schema explicite pour CSV
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

products_schema = StructType([
    StructField("product_id", IntegerType()),
    StructField("product_name", StringType()),
    StructField("category", StringType()),
    StructField("base_price", DoubleType())
])

transactions_df = spark.read.parquet("/tmp/transactions")
products_df = spark.read.option("header", True).schema(products_schema).csv("/tmp/products")  # ✅ Schema explicite
customers_df = spark.read.parquet("/tmp/customers")

# 2. Broadcast les petites tables
products_df = broadcast(products_df)

# 3. Expression native au lieu de UDF
amount_category_expr = when(col("amount") > 500, "high") \
    .when(col("amount") > 100, "medium") \
    .otherwise("low")

# 4. Pipeline optimisé
result_optimized = transactions_df \
    .join(products_df, "product_id") \
    .join(customers_df, "customer_id") \
    .withColumn("amount_category", amount_category_expr) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .groupBy("category", "segment", "amount_category", "year", "month") \
    .agg(
        count("*").alias("num_transactions"),
        sum("amount").alias("total_amount")
    )

# 5. Écriture partitionnée
if os.path.exists("/tmp/output_optimized"):
    shutil.rmtree("/tmp/output_optimized")
result_optimized.write.partitionBy("year", "month").parquet("/tmp/output_optimized")

optimized_time = time.time() - start
print(f"\n⏱️ Temps optimisé : {optimized_time:.2f}s")
Voir le code
# Résumé des optimisations
print("\n" + "="*60)
print("📊 RÉSUMÉ DES OPTIMISATIONS")
print("="*60)

speedup = baseline_time / optimized_time if optimized_time > 0 else 0
reduction = (1 - optimized_time / baseline_time) * 100 if baseline_time > 0 else 0

print(f"""
⏱️ Temps baseline    : {baseline_time:.2f}s
⏱️ Temps optimisé    : {optimized_time:.2f}s
🚀 Speedup           : {speedup:.1f}x
📉 Réduction         : {reduction:.0f}%

Optimisations appliquées :
  ✅ AQE activé (Adaptive Query Execution)
  ✅ Schema explicite pour CSV (pas d'inférence)
  ✅ Broadcast join pour products (10K lignes)
  ✅ Expression native au lieu de UDF Python
  ✅ Écriture partitionnée par year/month
""")

Quiz de fin de module


❓ Q1. Quel composant de Spark optimise automatiquement le plan de requête ?

  1. Tungsten
  2. Catalyst
  3. Driver
  4. Executor
💡 Voir la réponse

Réponse : b — Catalyst est l’optimiseur de requêtes qui applique predicate pushdown, projection pruning, et join reordering.


❓ Q2. Quelle opération cause un shuffle ?

  1. filter()
  2. select()
  3. groupBy()
  4. withColumn()
💡 Voir la réponse

Réponse : cgroupBy() nécessite un shuffle pour regrouper les données par clé. Les autres sont des transformations narrow.


❓ Q3. Quelle méthode utiliser pour réduire le nombre de partitions SANS shuffle ?

  1. repartition()
  2. coalesce()
  3. partitionBy()
  4. bucketBy()
💡 Voir la réponse

Réponse : bcoalesce() combine les partitions existantes sans shuffle (si réduction). repartition() cause toujours un shuffle.


❓ Q4. Quelle est la taille optimale d’une partition Spark ?

  1. 1-10 MB
  2. 128-256 MB
  3. 1-2 GB
  4. 10-50 GB
💡 Voir la réponse

Réponse : b — 128-256 MB est le sweet spot. Trop petit = overhead, trop grand = OOM et mauvaise parallélisation.


❓ Q5. Pour joindre une table de 10 GB avec une table de 50 MB, quelle stratégie utiliser ?

  1. Sort Merge Join
  2. Shuffle Hash Join
  3. Broadcast Join
  4. Nested Loop Join
💡 Voir la réponse

Réponse : c — Broadcast Join envoie la petite table (50 MB) à tous les executors, évitant le shuffle de la grande table.


❓ Q6. Pourquoi les Python UDFs sont-ils lents ?

  1. Python est un langage interprété
  2. Sérialisation JVM ↔︎ Python pour chaque ligne
  3. Le GIL de Python
  4. Manque de mémoire
💡 Voir la réponse

Réponse : b — Chaque ligne nécessite une sérialisation JVM→Python et désérialisation Python→JVM, ce qui est très coûteux.


❓ Q7. Que signifie “Spill to disk” dans Spark UI ?

  1. Les données sont écrites en Parquet
  2. La mémoire est insuffisante, données écrites sur disque
  3. Le cache est activé
  4. Le shuffle est terminé
💡 Voir la réponse

Réponse : b — Spill indique que la mémoire est insuffisante et les données débordent sur le disque, ce qui ralentit l’exécution.


❓ Q8. Quel deploy mode utiliser en production ?

  1. client
  2. cluster
  3. local
  4. standalone
💡 Voir la réponse

Réponse : b — En mode cluster, le Driver tourne sur un worker du cluster, ce qui est plus robuste pour la production.


❓ Q9. Que fait l’AQE (Adaptive Query Execution) ?

  1. Compile le code Python
  2. Optimise le plan d’exécution au runtime
  3. Compresse les données
  4. Gère l’authentification
💡 Voir la réponse

Réponse : b — AQE optimise dynamiquement le plan d’exécution pendant l’exécution (coalesce, skew handling, broadcast).


❓ Q10. Combien de cores par executor est recommandé ?

  1. 1 core
  2. 4-5 cores
  3. 10-15 cores
  4. Tous les cores disponibles
💡 Voir la réponse

Réponse : b — 4-5 cores est le sweet spot. Plus de cores = heap plus grand = GC moins efficace.


📚 Ressources pour aller plus loin

🌐 Documentation officielle

📖 Articles & Tutoriels

🔧 Outils


➡️ Prochaine étape

Maintenant que tu maîtrises l’optimisation Spark, passons aux fonctionnalités SQL avancées !

👉 Module suivant : 20_spark_sql_deep_dive — Spark SQL Deep Dive

Tu vas apprendre :

  • Window functions avancées
  • CTEs et subqueries
  • Optimisation SQL
  • Spark SQL vs DataFrame API

⚠️ Note : Spark Streaming sera couvert dans le module 24 Kafka.


📝 Récapitulatif de ce module

Concept Ce que tu as appris
Architecture Catalyst, Tungsten, DAG
spark-submit Deploy modes, packaging, structure projet
Partitionnement Shuffle, repartition vs coalesce, skew
Caching cache() vs persist(), storage levels
Joins Broadcast, Sort Merge, hints
I/O Parquet, partitionnement disque, schemas
UDFs Éviter Python UDF, expressions natives
Tuning AQE, executors/cores, configuration
Diagnostic Spark UI, métriques

🎉 Félicitations ! Tu as terminé le module PySpark Advanced.

Voir le code
# Nettoyage
spark.stop()
print("✅ SparkSession arrêtée")

# Nettoyage des fichiers temporaires (optionnel)
# import shutil
# for path in ["/tmp/transactions", "/tmp/products", "/tmp/customers", 
#              "/tmp/output", "/tmp/output_optimized", "/tmp/test_parquet", "/tmp/partitioned_data"]:
#     if os.path.exists(path):
#         shutil.rmtree(path)
# print("🧹 Fichiers temporaires supprimés")
Retour au sommet