Bienvenue dans ce module avancé où tu vas apprendre à optimiser Spark comme un expert. Tu découvriras l’architecture interne, les techniques d’optimisation, et comment diagnostiquer et résoudre les problèmes de performance.
Prérequis
Niveau
Compétence
✅ Requis
Module 11 : PySpark for Data Engineering (bases Spark)
✅ Requis
Module 18 : High Performance Python
💡 Recommandé
Expérience avec des datasets > 1 Go
Objectifs du module
À la fin de ce module, tu seras capable de :
Comprendre l’architecture interne de Spark (Catalyst, Tungsten)
Exécuter Spark en production avec spark-submit
Optimiser les partitions, shuffles et joins
Diagnostiquer un job lent avec Spark UI
Réduire le temps d’exécution de 80-90%
Objectif concret
Transformer un pipeline de 20 minutes en 2 minutes.
C’est ce qui distingue un Data Engineer junior d’un senior sur Spark.
1. Rappels Spark Essentiels
💡 Si tu as suivi le module 11 (PySpark for Data Engineering), cette section est un rappel rapide.
Sinon, commence par ce module avant de continuer — les concepts de base sont indispensables.
from pyspark.sql.functions import spark_partition_id# Créer un DataFramedf = spark.range(0, 1000000)print(f"Partitions initiales : {df.rdd.getNumPartitions()}")# repartition = SHUFFLE (redistribue les données)df_repart = df.repartition(10)print(f"Après repartition(10) : {df_repart.rdd.getNumPartitions()}")# coalesce = PAS DE SHUFFLE (combine les partitions existantes)df_coal = df_repart.coalesce(3)print(f"Après coalesce(3) : {df_coal.rdd.getNumPartitions()}")# Voir la distribution des données par partitionprint("\nDistribution après repartition(10):")df_repart.groupBy(spark_partition_id().alias("partition")).count().orderBy("partition").show()
# cache() = persist(StorageLevel.MEMORY_AND_DISK)df.cache()# persist() = contrôle fin du niveau de stockagefrom pyspark import StorageLeveldf.persist(StorageLevel.MEMORY_ONLY)
Niveau
RAM
Disque
Sérialisé
Usage
MEMORY_ONLY
✅
❌
❌
Petit DF, RAM suffisante
MEMORY_AND_DISK
✅
✅
❌
Défaut (cache())
MEMORY_ONLY_SER
✅
❌
✅
Économie RAM
DISK_ONLY
❌
✅
✅
Très gros DF
OFF_HEAP
✅
❌
✅
Éviter GC Java
Voir le code
from pyspark import StorageLevelimport time# Créer un DataFrame avec calculsdf = spark.range(0, 5000000).withColumn("squared", col("id") **2)# Sans cache : chaque action recalcule toutstart = time.time()count1 = df.filter(col("squared") >1000000).count()count2 = df.filter(col("squared") <100).count()print(f"Sans cache : {time.time() - start:.2f}s")# Avec cache : calcul une seule foisdf_cached = df.cache()start = time.time()count1 = df_cached.filter(col("squared") >1000000).count()count2 = df_cached.filter(col("squared") <100).count()print(f"Avec cache : {time.time() - start:.2f}s")# ⚠️ IMPORTANT : libérer la mémoire !df_cached.unpersist()print("\n✅ Cache libéré avec unpersist()")
⚠️ Quand NE PAS cacher :
Situation
Pourquoi
DF utilisé une seule fois
Gaspillage mémoire
DF très volumineux (> RAM)
Débordement disque lent
Avant un shuffle
Inutile, données redistribuées
Pipeline simple et rapide
Overhead du cache > gain
5. Joins Avancés — La compétence qui change tout
Un excellent Data Engineer sait optimiser ses joins. C’est souvent là que se gagne (ou se perd) le plus de temps.
5.1 Types de Joins internes Spark
Type
Quand Spark l’utilise
Performance
Broadcast Hash Join
Petite table (< seuil)
⭐⭐⭐ Meilleur
Sort Merge Join
Grandes tables
⭐⭐ Standard
Shuffle Hash Join
Tables moyennes
⭐ Éviter si possible
5.2 Broadcast Join — Ton meilleur ami
SORT MERGE JOIN (shuffle) BROADCAST JOIN (pas de shuffle)
═════════════════════════ ══════════════════════════════
Big Table Small Table Big Table Small Table
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ P1 │ │ P1 │ │ P1 │◄──────│ │
│ P2 │ │ P2 │ │ P2 │◄──────│ COPY │
│ P3 │ │ P3 │ │ P3 │◄──────│ │
└──┬───┘ └──┬───┘ └──────┘ └──────┘
│ SHUFFLE │ │
└──────┬───────┘ Broadcast to
│ all executors
┌───▼───┐ (pas de shuffle !)
│ JOIN │
└───────┘
Voir le code
from pyspark.sql.functions import broadcast# Grande table (10M lignes simulées)big_df = spark.range(0, 1000000).withColumn("category_id", (col("id") %100).cast("int"))# Petite table (100 lignes)small_df = spark.createDataFrame( [(i, f"Category {i}") for i inrange(100)], ["category_id", "category_name"])# ❌ SANS broadcast hint (Spark peut ou non broadcaster)result_no_hint = big_df.join(small_df, "category_id")print("Sans broadcast hint:")result_no_hint.explain()print("\n"+"="*50+"\n")# ✅ AVEC broadcast hint (force le broadcast)result_broadcast = big_df.join(broadcast(small_df), "category_id")print("Avec broadcast():")result_broadcast.explain()
5.3 Configurer le seuil de broadcast
# Défaut : 10 MB# Si une table < 10 MB → broadcast automatique# Augmenter pour broadcaster des tables plus grandesspark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100*1024*1024) # 100 MB# Désactiver le broadcast automatique (forcer shuffle)spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
5.4 Join Hints (Spark 3.0+)
# DataFrame APIbig_df.join(small_df.hint("broadcast"), "key")# SQLspark.sql(""" SELECT /*+ BROADCAST(small_table) */ * FROM big_table JOIN small_table ON big_table.key = small_table.key""")
5.5 Anti-pattern : Join sur clé skewed
Voir le code
# Configuration pour gérer le skew automatiquement (AQE)spark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")print("AQE Skew Join activé")print("Spark va automatiquement détecter et gérer les partitions skewed")
6. Lecture/Écriture Optimisées
6.1 Formats : Parquet toujours !
Format
Lecture
Écriture
Compression
Predicate Pushdown
CSV
🐢 Lent
🐢 Lent
❌
❌
JSON
🐢 Lent
🐢 Lent
❌
❌
Parquet
🚀 Rapide
🚀 Rapide
✅
✅
ORC
🚀 Rapide
🚀 Rapide
✅
✅
Delta
🚀 Rapide
🚀 Rapide
✅
✅ + ACID
🔭 Les formats modernes (Delta, Iceberg, Hudi) seront approfondis dans le module 21 (Lakehouse) :
Transactions ACID
Time Travel
Vacuum, Compaction
Z-Ordering
6.2 Predicate Pushdown
Voir le code
import osimport shutil# Créer des données de testtest_data = spark.range(0, 100000).withColumn("category", (col("id") %10).cast("string"))# Sauvegarder en Parquetoutput_path ="/tmp/test_parquet"if os.path.exists(output_path): shutil.rmtree(output_path)test_data.write.parquet(output_path)# Lecture avec filtre → predicate pushdowndf_filtered = spark.read.parquet(output_path).filter(col("id") <100)print("Plan d'exécution avec Predicate Pushdown:")df_filtered.explain()# Observe "PushedFilters" dans le plan !
6.3 Partitionnement sur disque
Voir le code
from pyspark.sql.functions import year, month, dayofmonth, litfrom datetime import datetime, timedeltaimport random# Créer des données avec datesdates = [(datetime(2024, 1, 1) + timedelta(days=random.randint(0, 90))).strftime("%Y-%m-%d") for _ inrange(10000)]data = [(i, dates[i %len(dates)], float(random.randint(10, 1000))) for i inrange(10000)]df = spark.createDataFrame(data, ["id", "date", "amount"])df = df.withColumn("date", col("date").cast("date"))df = df.withColumn("year", year("date")).withColumn("month", month("date"))# Écriture partitionnéeoutput_partitioned ="/tmp/partitioned_data"if os.path.exists(output_partitioned): shutil.rmtree(output_partitioned)df.write.partitionBy("year", "month").parquet(output_partitioned)print("Structure créée:")for root, dirs, files in os.walk(output_partitioned): level = root.replace(output_partitioned, '').count(os.sep) indent =' '*2* levelprint(f"{indent}{os.path.basename(root)}/")if level <2: # Limiter la profondeurfor d insorted(dirs)[:3]:print(f"{indent}{d}/")
6.4 Schémas explicites
Pourquoi définir un schéma ?
Évite l’inférence (coûteuse sur gros fichiers)
Garantit les types attendus
Détecte les erreurs tôt
Voir le code
from pyspark.sql.types import StructType, StructField, LongType, StringType, DecimalType, DateType# Définir un schéma expliciteschema = StructType([ StructField("id", LongType(), nullable=False), StructField("customer_name", StringType(), nullable=True), StructField("amount", DecimalType(10, 2), nullable=True), # Précision pour montants ! StructField("transaction_date", DateType(), nullable=True)])print("Schéma défini:")print(schema.simpleString())# Utilisation# df = spark.read.schema(schema).parquet("data/transactions/")print("\n💡 Conseil : Utiliser DecimalType pour les montants financiers")print(" Évite les erreurs d'arrondi des float/double")
L’AQE (Spark 3.0+) rend certaines optimisations manuelles obsolètes :
Avant AQE (manuel)
Avec AQE (automatique)
coalesce(n) après filter
Auto-coalesce des partitions
Calculer shuffle.partitions
Auto-optimize partitions
Détecter skew manuellement
Auto-skew handling
Broadcast threshold fixe
Runtime broadcast decisions
Voir le code
# Configuration AQE complète (recommandée)spark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")# Avec AQE, tu peux laisser shuffle.partitions élevé# Spark optimisera automatiquementspark.conf.set("spark.sql.shuffle.partitions", "1000")print("✅ Configuration AQE optimale :")print(f" adaptive.enabled = {spark.conf.get('spark.sql.adaptive.enabled')}")print(f" coalescePartitions = {spark.conf.get('spark.sql.adaptive.coalescePartitions.enabled')}")print(f" skewJoin = {spark.conf.get('spark.sql.adaptive.skewJoin.enabled')}")print(f" shuffle.partitions = {spark.conf.get('spark.sql.shuffle.partitions')}")print("\n💡 Avec AQE, Spark ajuste automatiquement le nombre de partitions !")
9. Spark UI & Diagnostic
Savoir lire Spark UI = savoir debugger. C’est la compétence qui fait la différence.
9.1 Accéder à Spark UI
Local : http://localhost:4040
Cluster : via le Resource Manager (YARN, K8s dashboard)
Databricks : intégré dans l’interface
9.2 Les onglets importants
Onglet
Information
Jobs
Vue d’ensemble des jobs, durée
Stages
Détail des stages, shuffle read/write
Storage
DataFrames en cache
Environment
Configuration Spark
Executors
Ressources, GC, mémoire
SQL
Plans d’exécution des requêtes
9.3 Métriques à surveiller
Métrique
Signification
🚨 Problème si…
Shuffle Read/Write
Données échangées
Très élevé (> 10 GB)
Spill (Memory/Disk)
Débordement mémoire
> 0
Task Duration
Temps par tâche
Très variable (skew !)
GC Time
Garbage Collection
> 10% du temps total
Input/Output
Données lues/écrites
Beaucoup plus que prévu
9.4 Patterns de problèmes
Symptôme dans Spark UI
Diagnostic
Solution
1 tâche 10x plus longue
Data skew
Salting, broadcast, AQE
Shuffle > 50 GB
Join non optimisé
Broadcast join
Spill to disk
Mémoire insuffisante
Plus de RAM, moins de partitions
GC Time > 20%
Trop d’objets Java
Tungsten, plus de memoryOverhead
Beaucoup de petites tâches
Trop de partitions
Coalesce, AQE
Voir le code
# URL de Spark UI pour cette sessionprint(f"🔍 Spark UI disponible sur : {spark.sparkContext.uiWebUrl}")print("\n📊 Pour voir les métriques d'un job, lance une action puis consulte l'UI")# Exemple : déclencher un job pour voir dans l'UIdf = spark.range(0, 1000000)result = df.groupBy((col("id") %100).alias("group")).count()result.collect() # Action qui déclenche le jobprint("\n✅ Job exécuté - consulte Spark UI pour voir les stages et métriques")
10. Bonnes pratiques & Anti-patterns
❌ Anti-patterns (à éviter absolument)
Anti-pattern
Pourquoi c’est mal
Solution
collect() sur 100M lignes
OOM Driver garanti
write() vers fichier
CSV en production
Lent, pas de schema
Parquet/Delta
UDF Python partout
10-100x plus lent
Expressions natives
shuffle.partitions=200 toujours
Pas adapté aux données
Ajuster ou AQE
Pas de cache() sur DF réutilisé
Recalcul inutile
cache() + unpersist()
Ignorer Spark UI
Debug à l’aveugle
Toujours vérifier
Join sans broadcast
Shuffle énorme
broadcast() sur petites tables
repartition() avant write()
Shuffle inutile
coalesce() pour réduire
✅ Bonnes pratiques
Pratique
Bénéfice
Toujours Parquet
I/O optimisé, predicate pushdown
Broadcast petites tables
Évite shuffle
AQE activé
Optimisation runtime
Partitionner par date
Partition pruning
Éviter UDFs
Performance native
Monitorer Spark UI
Debug efficace
Cache + unpersist
Évite recalculs
Schema explicite
Évite inférence
Mini-Projet : Optimisation d’un pipeline
Objectif
Réduire un pipeline de 20 minutes à < 3 minutes en appliquant les techniques apprises.