PySpark for Data Engineering

Ce module présente PySpark, l’API Python pour Apache Spark — le moteur de traitement distribué le plus utilisé en Big Data.


Prérequis

Niveau Compétence
✅ Requis Avoir suivi le module 08_intro_big_data_distributed
✅ Requis Comprendre les 5V du Big Data
✅ Requis Comprendre MapReduce et ses limites
✅ Requis Maîtriser Python (modules 04-05)
✅ Requis Maîtriser SQL (module 07)

Objectifs du module

À la fin de ce notebook, tu seras capable de :

  • Comprendre l’architecture Spark (Driver, Executors, Cluster Manager)
  • Créer et manipuler des DataFrames distribués
  • Écrire des transformations et actions
  • Utiliser Spark SQL
  • Optimiser les performances (partitioning, caching, broadcast)
  • Lire/écrire des fichiers (CSV, JSON, Parquet)
  • Découvrir le streaming temps réel

PySpark dans l’écosystème Big Data

Tu as vu dans le module 08 que Spark a remplacé MapReduce comme moteur de traitement Big Data. Voici pourquoi :

Rappel : MapReduce vs Spark

MapReduce :  DISQUE → Map → DISQUE → Shuffle → DISQUE → Reduce → DISQUE
                  ↑           ↑              ↑              ↑
                  └───────────┴──────────────┴──────────────┘
                               LENT ! (I/O disque)

Spark :      DISQUE → Transformations → MÉMOIRE → ... → MÉMOIRE → Action
                                          ↑                ↑
                                          └────────────────┘
                                           RAPIDE ! (in-memory)

Rappel : Les 5V et Spark

V Comment Spark répond
Volume Traitement distribué sur cluster (To → Po)
Velocity Spark Streaming pour le temps réel
Variety Lit CSV, JSON, Parquet, JDBC, Avro…
Veracity Transformations pour nettoyer les données
Value Spark SQL, MLlib pour extraire de la valeur

Position dans l’écosystème

┌─────────────────────────────────────────────────────────────────┐
│                     ÉCOSYSTÈME BIG DATA                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Sources           Traitement              Stockage            │
│   ────────          ──────────              ────────            │
│                                                                 │
│   Kafka    ─┐                         ┌─►  Data Lake (S3)       │
│   Fichiers ─┼──►  ⚡ SPARK ⚡  ──────┼─►  Data Warehouse          │
│   JDBC     ─┤     (PySpark)          ├─►  NoSQL (MongoDB)       │
│   APIs     ─┘                         └─►  Elasticsearch        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

💡 Ce notebook est interactif : tu peux exécuter toutes les cellules de code !

Installation et Setup

PySpark nécessite Java. Vérifions d’abord l’installation.

Voir le code
# Installation de PySpark
!pip install pyspark pandas numpy pyarrow
Voir le code
# Vérifier Java
!java -version
Voir le code
# Imports de base
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

print("✅ Imports réussis !")

1️⃣ Introduction à Spark

Qu’est-ce que Spark ?

Apache Spark est un moteur de traitement distribué ultra-rapide pour le Big Data.

Concepts clés

  • SparkSession : Point d’entrée de toute application Spark
  • DataFrame : Collection distribuée de données organisées en colonnes
  • RDD : Resilient Distributed Dataset (bas niveau)
  • Transformations : Opérations lazy (map, filter, select, etc.)
  • Actions : Déclenchent l’exécution (count, collect, show, etc.)

Avantages de Spark

  • Vitesse : 100x plus rapide que MapReduce
  • Scalabilité : De quelques MB à plusieurs PB
  • Simplicité : API unifiée (Python, Scala, Java, R)
  • Versatilité : Batch, Streaming, ML, Graph processing

1.1 Créer une SparkSession

Voir le code
# Créer une SparkSession
spark = SparkSession.builder \
    .appName("PySpark Data Engineering Tutorial") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

print(" SparkSession créée")
print(f"Version Spark : {spark.version}")
print(f"Application : {spark.sparkContext.appName}")
print(f"Master : {spark.sparkContext.master}")
Voir le code
# Configuration du logging
spark.sparkContext.setLogLevel("ERROR")
print(" Logging configuré sur ERROR")

1.2 Premiers DataFrames

Voir le code
# Méthode 1 : Depuis une liste Python
data = [
    (1, "Alice", 25, "Paris", 45000),
    (2, "Bob", 30, "Lyon", 55000),
    (3, "Charlie", 35, "Paris", 60000),
    (4, "David", 28, "Marseille", 50000),
    (5, "Eve", 32, "Lyon", 58000)
]

columns = ["id", "nom", "age", "ville", "salaire"]

df = spark.createDataFrame(data, columns)

print(" Premier DataFrame créé :")
df.show()
Voir le code
# Méthode 2 : Depuis un Pandas DataFrame
pandas_df = pd.DataFrame({
    'produit': ['A', 'B', 'C', 'D'],
    'prix': [10.5, 20.0, 15.75, 30.0],
    'quantite': [100, 50, 75, 25]
})

spark_df = spark.createDataFrame(pandas_df)

print(" DataFrame depuis Pandas :")
spark_df.show()
Voir le code
# Méthode 3 : Avec un schéma explicite
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("nom", StringType(), False),
    StructField("age", IntegerType(), True),
    StructField("ville", StringType(), True),
    StructField("salaire", IntegerType(), True)
])

df_with_schema = spark.createDataFrame(data, schema)

print(" DataFrame avec schéma explicite :")
df_with_schema.printSchema()

1.3 Explorer un DataFrame

Voir le code
# Afficher le schéma
print(" Schéma du DataFrame :")
df.printSchema()

# Afficher les premières lignes
print("\n Premières lignes :")
df.show(3)

# Compter les lignes
print(f"\n Nombre de lignes : {df.count()}")

# Colonnes
print(f"\n Colonnes : {df.columns}")

# Types de données
print("\n Types de données :")
print(df.dtypes)
Voir le code
# Statistiques descriptives
print(" Statistiques descriptives :")
df.describe().show()

# Statistiques sur colonnes spécifiques
print("\n Statistiques sur 'age' et 'salaire' :")
df.select('age', 'salaire').describe().show()

2️⃣ Transformations de base

Les transformations sont lazy : elles ne s’exécutent que lorsqu’une action est appelée.

2.1 Sélection de colonnes

Voir le code
# Sélectionner des colonnes
print(" Sélection de colonnes :")
df.select("nom", "ville").show()

# Avec alias
print("\n Avec alias :")
df.select(
    F.col("nom").alias("employee_name"),
    F.col("salaire").alias("salary")
).show()

# Sélectionner avec expressions
print("\n Avec expressions :")
df.select(
    "nom",
    (F.col("salaire") * 12).alias("salaire_annuel")
).show()

2.2 Filtrage

Voir le code
# Filtrer les lignes
print(" Employés de Paris :")
df.filter(F.col("ville") == "Paris").show()

# Filtres multiples avec AND
print("\n Employés de Paris avec salaire > 50000 :")
df.filter(
    (F.col("ville") == "Paris") & 
    (F.col("salaire") > 50000)
).show()

# Filtres avec OR
print("\n Employés de Paris OU Lyon :")
df.filter(
    (F.col("ville") == "Paris") | 
    (F.col("ville") == "Lyon")
).show()

# Filtrer avec IN
print("\n Villes avec IN :")
df.filter(F.col("ville").isin(["Paris", "Lyon"])).show()
Voir le code
# Filtres avancés
print(" Noms commençant par 'A' :")
df.filter(F.col("nom").startswith("A")).show()

print("\n Noms contenant 'li' :")
df.filter(F.col("nom").contains("li")).show()

print("\n Age entre 25 et 30 :")
df.filter(F.col("age").between(25, 30)).show()

2.3 Ajouter et modifier des colonnes

Voir le code
# Ajouter une nouvelle colonne
df_with_bonus = df.withColumn(
    "bonus",
    F.col("salaire") * 0.1
)

print(" Ajout de la colonne 'bonus' :")
df_with_bonus.show()

# Modifier une colonne existante
df_modified = df.withColumn(
    "salaire",
    F.col("salaire") * 1.05  # Augmentation de 5%
)

print("\n Salaire augmenté de 5% :")
df_modified.show()
Voir le code
# Ajouter plusieurs colonnes
df_enriched = df \
    .withColumn("salaire_mensuel", F.col("salaire")) \
    .withColumn("salaire_annuel", F.col("salaire") * 12) \
    .withColumn("bonus", F.col("salaire") * 0.1) \
    .withColumn("total_annuel", F.col("salaire_annuel") + F.col("bonus"))

print(" DataFrame enrichi :")
df_enriched.select("nom", "salaire_mensuel", "salaire_annuel", "bonus", "total_annuel").show()

2.4 Renommer et supprimer des colonnes

Voir le code
# Renommer une colonne
df_renamed = df.withColumnRenamed("nom", "employee_name")
print(" Colonne renommée :")
df_renamed.show(3)

# Supprimer des colonnes
df_dropped = df.drop("age", "ville")
print("\n Colonnes supprimées :")
df_dropped.show(3)

2.5 Tri

Voir le code
# Trier par salaire (ascendant)
print(" Tri par salaire (croissant) :")
df.orderBy("salaire").show()

# Trier par salaire (descendant)
print("\n Tri par salaire (décroissant) :")
df.orderBy(F.col("salaire").desc()).show()

# Tri multiple
print("\n Tri par ville puis salaire :")
df.orderBy("ville", F.col("salaire").desc()).show()

3️⃣ Agrégations et GroupBy

Les agrégations permettent de calculer des statistiques sur les données.

3.1 Agrégations simples

Voir le code
# Statistiques de base
print(" Statistiques simples :")
df.select(
    F.count("*").alias("total"),
    F.avg("salaire").alias("salaire_moyen"),
    F.min("salaire").alias("salaire_min"),
    F.max("salaire").alias("salaire_max"),
    F.sum("salaire").alias("salaire_total")
).show()
Voir le code
# Agrégations multiples
from pyspark.sql.functions import stddev, variance

print(" Statistiques avancées :")
df.agg(
    F.count("*").alias("count"),
    F.avg("age").alias("age_moyen"),
    F.stddev("salaire").alias("salaire_stddev"),
    F.variance("salaire").alias("salaire_variance")
).show()

3.2 GroupBy

Voir le code
# Grouper par ville
print(" Statistiques par ville :")
df.groupBy("ville").agg(
    F.count("*").alias("nb_employes"),
    F.avg("salaire").alias("salaire_moyen"),
    F.min("salaire").alias("salaire_min"),
    F.max("salaire").alias("salaire_max")
).orderBy("ville").show()
Voir le code
# Créer un DataFrame plus complexe pour les exemples
data_ventes = [
    ("2024-01", "Paris", "Produit A", 100, 1500),
    ("2024-01", "Paris", "Produit B", 50, 2000),
    ("2024-01", "Lyon", "Produit A", 75, 1200),
    ("2024-02", "Paris", "Produit A", 120, 1800),
    ("2024-02", "Lyon", "Produit B", 60, 2400),
    ("2024-02", "Marseille", "Produit A", 90, 1350),
]

columns_ventes = ["mois", "ville", "produit", "quantite", "montant"]
df_ventes = spark.createDataFrame(data_ventes, columns_ventes)

print(" Données de ventes :")
df_ventes.show()
Voir le code
# GroupBy multiple
print(" Ventes par mois et ville :")
df_ventes.groupBy("mois", "ville").agg(
    F.sum("quantite").alias("total_quantite"),
    F.sum("montant").alias("total_montant"),
    F.count("*").alias("nb_transactions")
).orderBy("mois", "ville").show()
Voir le code
# Agrégations conditionnelles
print(" Agrégations conditionnelles :")
df_ventes.groupBy("ville").agg(
    F.sum("montant").alias("total"),
    F.sum(F.when(F.col("produit") == "Produit A", F.col("montant")).otherwise(0)).alias("total_produit_a"),
    F.sum(F.when(F.col("produit") == "Produit B", F.col("montant")).otherwise(0)).alias("total_produit_b")
).show()

3.3 Window Functions

Voir le code
# Ranking dans chaque ville
from pyspark.sql.window import Window

window_spec = Window.partitionBy("ville").orderBy(F.col("salaire").desc())

df_ranked = df.withColumn(
    "rank",
    F.row_number().over(window_spec)
)

print("🏆 Ranking des salaires par ville :")
df_ranked.orderBy("ville", "rank").show()
Voir le code
# Calculs cumulatifs
window_cumul = Window.partitionBy("ville").orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_cumul = df.withColumn(
    "salaire_cumul",
    F.sum("salaire").over(window_cumul)
)

print(" Salaire cumulé par ville :")
df_cumul.select("id", "nom", "ville", "salaire", "salaire_cumul").orderBy("ville", "id").show()
Voir le code
# Calcul de moyennes mobiles
window_rolling = Window.partitionBy("ville").orderBy("id").rowsBetween(-1, 1)

df_rolling = df.withColumn(
    "salaire_avg_3",
    F.avg("salaire").over(window_rolling)
)

print(" Moyenne mobile sur 3 lignes :")
df_rolling.select("id", "nom", "ville", "salaire", "salaire_avg_3").orderBy("ville", "id").show()

4️⃣ Jointures

Les jointures permettent de combiner plusieurs DataFrames.

4.1 Créer des DataFrames pour les exemples

Voir le code
# DataFrame employés
employes = spark.createDataFrame([
    (1, "Alice", "IT"),
    (2, "Bob", "Finance"),
    (3, "Charlie", "IT"),
    (4, "David", "HR")
], ["emp_id", "nom", "dept_id"])

# DataFrame départements
departements = spark.createDataFrame([
    ("IT", "Information Technology", "Paris"),
    ("Finance", "Finance Department", "Lyon"),
    ("HR", "Human Resources", "Marseille"),
    ("Marketing", "Marketing Department", "Paris")
], ["dept_id", "dept_name", "location"])

print("👥 Employés :")
employes.show()

print("\n🏢 Départements :")
departements.show()

4.2 Types de jointures

Voir le code
# INNER JOIN (par défaut)
print("🔗 INNER JOIN :")
employes.join(departements, "dept_id", "inner").show()

# LEFT JOIN
print("\n🔗 LEFT JOIN :")
employes.join(departements, "dept_id", "left").show()

# RIGHT JOIN
print("\n🔗 RIGHT JOIN :")
employes.join(departements, "dept_id", "right").show()

# FULL OUTER JOIN
print("\n🔗 FULL OUTER JOIN :")
employes.join(departements, "dept_id", "outer").show()
Voir le code
# Jointure avec colonnes différentes
employes_alt = employes.withColumnRenamed("dept_id", "department")

print(" Jointure avec colonnes différentes :")
employes_alt.join(
    departements,
    employes_alt.department == departements.dept_id,
    "inner"
).select(
    employes_alt["*"],
    departements.dept_name,
    departements.location
).show()
Voir le code
# Jointures multiples
salaires = spark.createDataFrame([
    (1, 45000),
    (2, 55000),
    (3, 50000),
    (4, 48000)
], ["emp_id", "salaire"])

print("🔗 Jointures multiples :")
result = employes \
    .join(departements, "dept_id", "inner") \
    .join(salaires, "emp_id", "inner")

result.select("nom", "dept_name", "location", "salaire").show()

5️⃣ Lecture et écriture de fichiers

Spark supporte de nombreux formats de fichiers.

5.1 CSV

Voir le code
# Créer des données de test
import os
os.makedirs('data', exist_ok=True)

# Écrire en CSV
df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("data/employes.csv")

print(" CSV écrit")

# Lire le CSV
df_from_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("data/employes.csv")

print("\n CSV lu :")
df_from_csv.show(3)
df_from_csv.printSchema()

5.2 JSON

Voir le code
# Écrire en JSON
df.write \
    .mode("overwrite") \
    .json("data/employes.json")

print(" JSON écrit")

# Lire le JSON
df_from_json = spark.read.json("data/employes.json")

print("\n JSON lu :")
df_from_json.show(3)
df_from_json.printSchema()

5.3 Parquet (Format recommandé)

Voir le code
# Écrire en Parquet
df.write \
    .mode("overwrite") \
    .parquet("data/employes.parquet")

print(" Parquet écrit")

# Lire le Parquet
df_from_parquet = spark.read.parquet("data/employes.parquet")

print("\n Parquet lu :")
df_from_parquet.show(3)
df_from_parquet.printSchema()
Voir le code
# Parquet avec partitionnement
df.write \
    .mode("overwrite") \
    .partitionBy("ville") \
    .parquet("data/employes_partitioned.parquet")

print(" Parquet partitionné écrit")

# Lire avec filtre de partition (très performant)
df_paris = spark.read \
    .parquet("data/employes_partitioned.parquet") \
    .filter(F.col("ville") == "Paris")

print("\n Parquet partitionné lu (ville=Paris) :")
df_paris.show()

5.4 Options d’écriture

Voir le code
# Mode d'écriture
# - "overwrite" : Écrase les données existantes
# - "append" : Ajoute aux données existantes
# - "ignore" : Ne fait rien si le fichier existe
# - "error" (default) : Erreur si le fichier existe

# Compression
df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet("data/employes_compressed.parquet")

print(" Parquet compressé écrit")

# Contrôler le nombre de fichiers
df.coalesce(1).write \
    .mode("overwrite") \
    .csv("data/employes_single_file.csv")

print(" CSV en un seul fichier écrit")

6️⃣ Spark SQL

Spark permet d’exécuter des requêtes SQL sur les DataFrames.

6.1 Créer des vues temporaires

Voir le code
# Créer une vue temporaire
df.createOrReplaceTempView("employes")
df_ventes.createOrReplaceTempView("ventes")

print(" Vues temporaires créées")

6.2 Requêtes SQL

Voir le code
# Requête SQL simple
result = spark.sql("""
    SELECT nom, ville, salaire
    FROM employes
    WHERE salaire > 50000
    ORDER BY salaire DESC
""")

print(" Employés avec salaire > 50000 :")
result.show()
Voir le code
# Agrégation avec SQL
result = spark.sql("""
    SELECT 
        ville,
        COUNT(*) as nb_employes,
        AVG(salaire) as salaire_moyen,
        MIN(salaire) as salaire_min,
        MAX(salaire) as salaire_max
    FROM employes
    GROUP BY ville
    ORDER BY salaire_moyen DESC
""")

print(" Statistiques par ville :")
result.show()
Voir le code
# Window functions en SQL
result = spark.sql("""
    SELECT 
        nom,
        ville,
        salaire,
        ROW_NUMBER() OVER (PARTITION BY ville ORDER BY salaire DESC) as rank_ville,
        DENSE_RANK() OVER (ORDER BY salaire DESC) as rank_global
    FROM employes
    ORDER BY ville, rank_ville
""")

print(" Ranking avec SQL :")
result.show()
Voir le code
# CTE (Common Table Expression)
result = spark.sql("""
    WITH stats_ville AS (
        SELECT 
            ville,
            AVG(salaire) as salaire_moyen
        FROM employes
        GROUP BY ville
    )
    SELECT 
        e.nom,
        e.ville,
        e.salaire,
        s.salaire_moyen,
        ROUND(e.salaire - s.salaire_moyen, 2) as diff_moyenne
    FROM employes e
    JOIN stats_ville s ON e.ville = s.ville
    ORDER BY e.ville, e.salaire DESC
""")

print(" Comparaison à la moyenne :")
result.show()

7️⃣ Gestion des données manquantes

PySpark offre plusieurs méthodes pour gérer les nulls.

Voir le code
# Créer un DataFrame avec des nulls
data_with_nulls = [
    (1, "Alice", 25, "Paris", 45000),
    (2, "Bob", None, "Lyon", 55000),
    (3, "Charlie", 35, None, 60000),
    (4, "David", 28, "Marseille", None),
    (5, None, 32, "Lyon", 58000)
]

df_nulls = spark.createDataFrame(data_with_nulls, ["id", "nom", "age", "ville", "salaire"])

print(" DataFrame avec nulls :")
df_nulls.show()
Voir le code
# Supprimer les lignes avec des nulls
print(" Suppression de toutes les lignes avec nulls :")
df_nulls.dropna().show()

# Supprimer seulement si toutes les colonnes sont nulles
print("\n Suppression si toutes les colonnes sont nulles :")
df_nulls.dropna(how='all').show()

# Supprimer les nulls sur des colonnes spécifiques
print("\n Suppression des nulls sur 'nom' et 'ville' :")
df_nulls.dropna(subset=["nom", "ville"]).show()
Voir le code
# Remplir les nulls avec des valeurs par défaut
print("✨ Remplir tous les nulls avec 0 :")
df_nulls.fillna(0).show()

# Remplir avec des valeurs différentes par colonne
print("\n✨ Remplir avec des valeurs spécifiques :")
df_nulls.fillna({
    "nom": "Inconnu",
    "age": 30,
    "ville": "Non spécifié",
    "salaire": 50000
}).show()
Voir le code
# Remplacer les nulls par la moyenne/médiane
from pyspark.sql.functions import mean, median

# Calculer la moyenne
age_moyen = df_nulls.select(mean("age")).first()[0]
salaire_moyen = df_nulls.select(mean("salaire")).first()[0]

print(f"Age moyen : {age_moyen}")
print(f"Salaire moyen : {salaire_moyen}")

# Remplir avec les moyennes
df_filled = df_nulls.fillna({
    "age": age_moyen,
    "salaire": salaire_moyen
})

print("\n✨ Nulls remplacés par les moyennes :")
df_filled.show()

8️⃣ UDFs (User Defined Functions)

Les UDFs permettent d’appliquer des fonctions Python personnalisées.

Voir le code
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

# Définir une fonction Python
def categoriser_age(age):
    if age is None:
        return "Inconnu"
    elif age < 30:
        return "Junior"
    elif age < 40:
        return "Senior"
    else:
        return "Expert"

# Enregistrer comme UDF
categoriser_age_udf = udf(categoriser_age, StringType())

# Utiliser l'UDF
df_with_category = df.withColumn(
    "categorie",
    categoriser_age_udf(F.col("age"))
)

print("🔧 DataFrame avec catégorie :")
df_with_category.show()
Voir le code
# UDF avec plusieurs paramètres
def calculer_bonus(salaire, performance):
    if salaire is None or performance is None:
        return 0
    return salaire * performance * 0.1

calculer_bonus_udf = udf(calculer_bonus, IntegerType())

# Ajouter une colonne de performance
df_perf = df.withColumn("performance", F.lit(1.2))

# Appliquer l'UDF
df_with_bonus = df_perf.withColumn(
    "bonus",
    calculer_bonus_udf(F.col("salaire"), F.col("performance"))
)

print("💰 DataFrame avec bonus :")
df_with_bonus.select("nom", "salaire", "performance", "bonus").show()
Voir le code
# ⚠️ Attention : Les UDFs sont moins performantes que les fonctions natives
# Préférer les fonctions natives quand c'est possible

# Avec UDF (plus lent)
double_udf = udf(lambda x: x * 2, IntegerType())
df.withColumn("salaire_double_udf", double_udf(F.col("salaire")))

# Avec fonction native (plus rapide)
df.withColumn("salaire_double", F.col("salaire") * 2)

print("✅ Préférez toujours les fonctions natives !")

9️⃣ Optimisation et Performance

Quelques techniques pour optimiser vos jobs Spark.

9.1 Partitionnement

Voir le code
# Vérifier le nombre de partitions
print(f"Nombre de partitions : {df.rdd.getNumPartitions()}")

# Repartitionner (shuffle)
df_repartitioned = df.repartition(4)
print(f"Après repartition : {df_repartitioned.rdd.getNumPartitions()}")

# Coalesce (pas de shuffle, moins coûteux)
df_coalesced = df.coalesce(2)
print(f"Après coalesce : {df_coalesced.rdd.getNumPartitions()}")
Voir le code
# Repartitionner par colonne (utile avant les groupBy)
df_repartitioned_by_ville = df.repartition("ville")

# Maintenant les groupBy sur 'ville' seront plus efficaces
df_repartitioned_by_ville.groupBy("ville").count().show()

9.2 Caching

Voir le code
# Cache un DataFrame en mémoire
df_cached = df.cache()

# Première action : calcul complet
print("Première action (calcul complet) :")
df_cached.count()

# Deuxième action : utilise le cache (beaucoup plus rapide)
print("\nDeuxième action (utilise le cache) :")
df_cached.show()

# Libérer le cache
df_cached.unpersist()
print("\n Cache libéré")
Voir le code
# Persist avec différents niveaux de stockage
from pyspark import StorageLevel

# MEMORY_ONLY : En mémoire uniquement
df.persist(StorageLevel.MEMORY_ONLY)

# MEMORY_AND_DISK : Mémoire + disque si nécessaire
df.persist(StorageLevel.MEMORY_AND_DISK)

# DISK_ONLY : Disque uniquement
df.persist(StorageLevel.DISK_ONLY)

print(" Différents niveaux de persistance disponibles")

9.3 Broadcast Joins

Voir le code
# Pour les petits DataFrames (< 10MB), utilisez broadcast
from pyspark.sql.functions import broadcast

# departements est petit, on le broadcast
result = employes.join(
    broadcast(departements),
    "dept_id",
    "inner"
)

print(" Broadcast join :")
result.show()

# Évite le shuffle, beaucoup plus rapide !

9.4 Éviter les UDFs quand possible

Voir le code
# ❌ Avec UDF (lent)
def add_ten(x):
    return x + 10

add_ten_udf = udf(add_ten, IntegerType())
df.withColumn("salaire_plus_10_udf", add_ten_udf(F.col("salaire")))

# ✅ Avec fonction native (rapide)
df.withColumn("salaire_plus_10", F.col("salaire") + 10)

print(" Les fonctions natives sont toujours plus rapides !")

9.5 Explain Plans

Voir le code
# Voir le plan d'exécution
print(" Plan d'exécution :")
df.filter(F.col("salaire") > 50000).explain()

# Plan détaillé
print("\n Plan d'exécution détaillé :")
df.filter(F.col("salaire") > 50000).explain(extended=True)

🔟 Structured Streaming (Bonus)

Traitement de données en temps réel avec Spark.

Voir le code
# Créer un streaming DataFrame depuis un dossier
# Les fichiers ajoutés au dossier seront traités automatiquement

schema_stream = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("action", StringType(), True),
    StructField("value", IntegerType(), True)
])

# Lire un stream depuis un dossier
stream_df = spark.readStream \
    .schema(schema_stream) \
    .json("data/stream_input/")

print(" Stream créé")
print(f"Is streaming: {stream_df.isStreaming}")
Voir le code
# Transformations sur le stream (comme un DataFrame normal)
stream_processed = stream_df \
    .filter(F.col("value") > 100) \
    .groupBy("user_id", "action") \
    .count()

print(" Transformations appliquées")
Voir le code
# Écrire le stream vers un sink
# Note: Ce code est un exemple, il nécessite un stream actif pour s'exécuter

# query = stream_processed.writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .start()

# # Attendre la fin du stream
# query.awaitTermination()

print("💡 Exemple de streaming (nécessite des données en entrée pour s'exécuter)")

1️⃣1️⃣ Pipeline ETL Complet avec PySpark

Construisons un pipeline ETL complet.

Voir le code
import os
from datetime import datetime

# Créer la structure de dossiers
os.makedirs('spark_pipeline/raw', exist_ok=True)
os.makedirs('spark_pipeline/processed', exist_ok=True)
os.makedirs('spark_pipeline/output', exist_ok=True)
os.makedirs('spark_pipeline/logs', exist_ok=True)

print("✅ Structure créée")

11.1 Extract

Voir le code
def extract_data(spark, path):
    """Extrait des données depuis plusieurs sources"""
    print(f"📥 Extraction depuis {path}")
    
    # Créer des données de test
    data = [
        (1, "2024-01-15", "Paris", "Produit A", 100, 1500, "online"),
        (2, "2024-01-15", "Lyon", "Produit B", 50, 2000, "store"),
        (3, "2024-01-16", "Paris", "Produit A", 75, 1200, "online"),
        (4, "2024-01-16", "Marseille", "Produit C", 120, 1800, "online"),
        (5, "2024-01-17", "Lyon", "Produit B", 60, 2400, "store"),
        (6, "2024-01-17", None, "Produit A", 90, None, "online"),  # Données sales
    ]
    
    columns = ["id", "date", "ville", "produit", "quantite", "montant", "canal"]
    df = spark.createDataFrame(data, columns)
    
    # Sauvegarder les données brutes
    df.write.mode("overwrite").parquet(f"{path}/ventes_raw.parquet")
    
    print(f" {df.count()} lignes extraites")
    return df

# Test
df_raw = extract_data(spark, "spark_pipeline/raw")
df_raw.show()

11.2 Transform

Voir le code
def transform_data(df):
    """Transforme et nettoie les données"""
    print(" Transformation des données")
    
    # 1. Convertir la date
    df = df.withColumn("date", F.to_date(F.col("date")))
    
    # 2. Gérer les valeurs manquantes
    df = df.fillna({
        "ville": "Inconnu",
        "montant": 0
    })
    
    # 3. Filtrer les données invalides
    df = df.filter(
        (F.col("quantite") > 0) & 
        (F.col("montant") >= 0)
    )
    
    # 4. Créer des colonnes dérivées
    df = df.withColumn(
        "prix_unitaire",
        F.when(F.col("quantite") > 0, F.col("montant") / F.col("quantite")).otherwise(0)
    )
    
    df = df.withColumn(
        "annee",
        F.year(F.col("date"))
    )
    
    df = df.withColumn(
        "mois",
        F.month(F.col("date"))
    )
    
    df = df.withColumn(
        "jour_semaine",
        F.dayofweek(F.col("date"))
    )
    
    # 5. Catégoriser
    df = df.withColumn(
        "categorie_montant",
        F.when(F.col("montant") < 1500, "Faible")
         .when(F.col("montant") < 2000, "Moyen")
         .otherwise("Élevé")
    )
    
    # 6. Ajouter metadata
    df = df.withColumn("processed_at", F.current_timestamp())
    
    print(f" {df.count()} lignes transformées")
    return df

# Test
df_transformed = transform_data(df_raw)
df_transformed.show()

11.3 Aggregate

Voir le code
def aggregate_data(df):
    """Crée des agrégations métier"""
    print(" Agrégation des données")
    
    # Agrégation par ville et produit
    agg_ville_produit = df.groupBy("ville", "produit").agg(
        F.sum("quantite").alias("total_quantite"),
        F.sum("montant").alias("total_montant"),
        F.avg("prix_unitaire").alias("prix_moyen"),
        F.count("*").alias("nb_transactions")
    ).orderBy("ville", "produit")
    
    print("\n Agrégation par ville et produit :")
    agg_ville_produit.show()
    
    # Agrégation par canal
    agg_canal = df.groupBy("canal").agg(
        F.sum("montant").alias("total_montant"),
        F.count("*").alias("nb_transactions"),
        F.avg("montant").alias("montant_moyen")
    )
    
    print("\n Agrégation par canal :")
    agg_canal.show()
    
    # Agrégation temporelle
    agg_temporelle = df.groupBy("annee", "mois").agg(
        F.sum("montant").alias("total_montant"),
        F.count("*").alias("nb_transactions")
    ).orderBy("annee", "mois")
    
    print("\n Agrégation temporelle :")
    agg_temporelle.show()
    
    return {
        "ville_produit": agg_ville_produit,
        "canal": agg_canal,
        "temporelle": agg_temporelle
    }

# Test
aggregations = aggregate_data(df_transformed)

11.4 Load

Voir le code
def load_data(df, aggregations, output_path):
    """Charge les données dans le datalake"""
    print(" Chargement des données")
    
    # 1. Données transformées (partitionnées par date)
    df.write \
        .mode("overwrite") \
        .partitionBy("annee", "mois") \
        .parquet(f"{output_path}/ventes_transformed")
    print(" Données transformées sauvegardées")
    
    # 2. Agrégations
    for name, agg_df in aggregations.items():
        agg_df.write \
            .mode("overwrite") \
            .parquet(f"{output_path}/agg_{name}")
        print(f" Agrégation '{name}' sauvegardée")
    
    # 3. Export CSV pour l'analyse
    df.coalesce(1).write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv(f"{output_path}/ventes_export.csv")
    print(" Export CSV créé")

# Test
load_data(df_transformed, aggregations, "spark_pipeline/output")

11.5 Pipeline complet orchestré

Voir le code
def run_pipeline(spark):
    """Exécute le pipeline ETL complet"""
    import time
    
    start_time = time.time()
    print("="*60)
    print("🚀 DÉMARRAGE DU PIPELINE PYSPARK")
    print("="*60)
    
    try:
        # EXTRACT
        print("\n PHASE 1: EXTRACTION")
        df_raw = extract_data(spark, "spark_pipeline/raw")
        
        # TRANSFORM
        print("\n PHASE 2: TRANSFORMATION")
        df_transformed = transform_data(df_raw)
        
        # Cache pour les performances
        df_transformed.cache()
        
        # AGGREGATE
        print("\n PHASE 3: AGRÉGATION")
        aggregations = aggregate_data(df_transformed)
        
        # LOAD
        print("\n PHASE 4: CHARGEMENT")
        load_data(df_transformed, aggregations, "spark_pipeline/output")
        
        # STATISTICS
        duration = time.time() - start_time
        print("\n" + "="*60)
        print("STATISTIQUES DU PIPELINE")
        print("="*60)
        print(f"Durée totale: {duration:.2f}s")
        print(f"Lignes traitées: {df_transformed.count()}")
        print(f"Partitions: {df_transformed.rdd.getNumPartitions()}")
        print("="*60)
        print("PIPELINE TERMINÉ AVEC SUCCÈS")
        print("="*60)
        
        # Libérer le cache
        df_transformed.unpersist()
        
        return True
        
    except Exception as e:
        print(f"\n ERREUR: {e}")
        import traceback
        traceback.print_exc()
        return False

# Exécuter le pipeline
success = run_pipeline(spark)

Résumé et Prochaines Étapes

Ce que vous avez appris

  1. Fondamentaux Spark : Architecture, concepts, SparkSession
  2. DataFrames : Création, transformations, actions
  3. Transformations : Select, filter, withColumn, orderBy
  4. Agrégations : GroupBy, agrégations complexes, window functions
  5. Jointures : Inner, left, right, outer, broadcast
  6. I/O : CSV, JSON, Parquet avec partitionnement
  7. Spark SQL : Requêtes SQL, CTEs, window functions
  8. Optimisation : Partitionnement, caching, broadcast joins
  9. UDFs : Fonctions personnalisées
  10. Streaming : Traitement temps réel (introduction)
  11. Pipeline ETL : Architecture complète production-ready

Différences clés Pandas vs PySpark

Aspect Pandas PySpark
Exécution Eager (immédiate) Lazy (différée)
Données En mémoire (single machine) Distribuées (cluster)
Scalabilité Limité à la RAM Quasi illimité
API df[df['col'] > 5] df.filter(F.col('col') > 5)
Performances Rapide pour petites données Rapide pour Big Data

Quand utiliser PySpark ?

Utilisez PySpark si : - Données > 10 GB - Besoin de parallélisation - Traitement distribué nécessaire - Streaming en temps réel

Utilisez Pandas si : - Données < 10 GB - Prototypage rapide - Analyses exploratoires - Machine learning local

Prochaines étapes

  1. Pratiquer : Créer des pipelines avec vos propres données
  2. Approfondir :
    • MLlib (Machine Learning)
    • GraphX (Graph processing)
    • Delta Lake (ACID transactions)
  3. Production :
    • Databricks
    • AWS EMR
    • Azure Synapse
    • Google Dataproc
  4. Orchestration :
    • Apache Airflow
    • Prefect
    • Dagster

Ressources 📚


Félicitations ! 🎉 Vous maîtrisez maintenant les fondamentaux de PySpark !


Quiz Final

Testez vos connaissances PySpark !

❓ Q1. Quelle est la différence principale entre une transformation et une action dans Spark ?

  1. Les transformations sont plus rapides que les actions
  2. Les transformations sont lazy (évaluées plus tard), les actions déclenchent l’exécution
  3. Les transformations modifient les données, les actions les affichent seulement
  4. Il n’y a pas de différence
💡 Voir la réponse Réponse : b – Les transformations sont lazy (comme filter, select, groupBy) : elles construisent un plan d’exécution mais ne l’exécutent pas immédiatement. Les actions (comme count, show, collect) déclenchent l’exécution réelle du job Spark.

❓ Q2. Quel format de fichier est recommandé pour stocker des données dans un data lake avec Spark ?

  1. CSV
  2. JSON
  3. Parquet
  4. Excel
💡 Voir la réponse Réponse : cParquet est le format recommandé car il est columnaire (lecture rapide de colonnes spécifiques), compressé (prend moins d’espace), et conserve les types de données et le schéma.

❓ Q3. Comment créer une nouvelle colonne avec PySpark ?

  1. df['new_col'] = df['old_col'] * 2
  2. df.withColumn('new_col', F.col('old_col') * 2)
  3. df.add_column('new_col', 'old_col * 2')
  4. df.new_column('new_col', df['old_col'] * 2)
💡 Voir la réponse Réponse : b – On utilise withColumn('nom', expression) avec F.col() pour référencer les colonnes. Les DataFrames Spark sont immutables, donc withColumn retourne un nouveau DataFrame.

❓ Q4. Quelle commande permet de voir le plan d’exécution d’une requête ?

  1. df.show()
  2. df.explain()
  3. df.describe()
  4. df.plan()
💡 Voir la réponse Réponse : b – La commande df.explain() affiche le plan d’exécution physique de Spark. Utilisez df.explain(True) pour voir le plan détaillé avec toutes les étapes d’optimisation.

❓ Q5. Qu’est-ce qu’un broadcast join ?

  1. Une jointure qui diffuse les résultats à tous les nœuds
  2. Une jointure optimisée où la petite table est copiée sur tous les nœuds
  3. Une jointure qui utilise le réseau pour communiquer
  4. Une jointure qui trie les données avant de les joindre
💡 Voir la réponse Réponse : b – Un broadcast join copie la petite table (< 10 MB) sur tous les nœuds workers pour éviter le shuffle. C’est beaucoup plus rapide ! Utilisez F.broadcast(small_df) pour forcer un broadcast join.

❓ Q6. Pourquoi utilise-t-on cache() ou persist() ?

  1. Pour sauvegarder les données sur disque
  2. Pour accélérer les calculs en gardant le DataFrame en mémoire
  3. Pour compresser les données
  4. Pour créer un backup automatique
💡 Voir la réponse Réponse : bcache() et persist() stockent le DataFrame en mémoire pour éviter de recalculer les mêmes transformations plusieurs fois. Utilisez-les quand vous réutilisez un DataFrame plusieurs fois dans votre code.

❓ Q7. Quelle est la différence entre repartition() et coalesce() ?

  1. Aucune différence
  2. repartition() fait un full shuffle, coalesce() réduit sans shuffle
  3. coalesce() est plus lent que repartition()
  4. repartition() supprime les données, coalesce() les garde
💡 Voir la réponse Réponse : brepartition(n) fait un full shuffle (coûteux) pour redistribuer les données. coalesce(n) réduit le nombre de partitions sans shuffle en combinant les partitions existantes. Utilisez coalesce() pour réduire les partitions, repartition() pour augmenter.

❓ Q8. Comment exécuter une requête SQL dans PySpark ?

  1. df.sql('SELECT * FROM table')
  2. spark.sql('SELECT * FROM table') après avoir créé une vue temporaire
  3. df.query('SELECT * FROM table')
  4. spark.execute('SELECT * FROM table')
💡 Voir la réponse Réponse : b – On doit d’abord créer une vue temporaire avec df.createOrReplaceTempView('table'), puis exécuter spark.sql('SELECT * FROM table').

❓ Q9. Quelle est la meilleure stratégie pour écrire des données Parquet partitionnées ?

  1. df.write.parquet('path')
  2. df.write.partitionBy('date').parquet('path')
  3. df.write.csv('path')
  4. df.write.json('path')
💡 Voir la réponse Réponse : b – Utiliser partitionBy('colonne') crée une hiérarchie de dossiers par valeur de colonne (ex: date=2024-01-01/, date=2024-01-02/). Cela permet de lire uniquement les partitions nécessaires (partition pruning) et améliore considérablement les performances.

❓ Q10. Quand devriez-vous utiliser PySpark plutôt que Pandas ?

  1. Toujours, PySpark est toujours meilleur
  2. Quand les données dépassent la mémoire d’une seule machine (> 100 GB)
  3. Pour les petits datasets (< 1 GB)
  4. Jamais, Pandas est suffisant
💡 Voir la réponse Réponse : b – Utilisez PySpark quand vos données sont trop volumineuses pour tenir en mémoire sur une seule machine (généralement > 100 GB), ou quand vous avez besoin de traitement distribué et de parallélisation. Pour les petits datasets (< 10 GB), Pandas est plus simple et souvent plus rapide.


📚 Ressources

🏭 Plateformes Cloud

Plateforme Service Spark
Databricks Databricks Lakehouse
AWS EMR (Elastic MapReduce)
Azure Synapse Analytics, HDInsight
GCP Dataproc

➡️ Prochaine étape

Tu maîtrises maintenant le traitement Big Data avec PySpark !

Pour continuer ton parcours Data Engineering,

👉 Module suivant : 12_orchestration_pipelines — Orchestration de pipelines


🎉 Félicitations ! Tu as terminé le module PySpark et le parcours sur les bases de données et le Big Data !

Voir le code
# Fermer la SparkSession
spark.stop()
print("✅ SparkSession fermée")
Retour au sommet