Voir le code
# Installation de PySpark
!pip install pyspark pandas numpy pyarrowcache() ou persist() ?repartition() et coalesce() ?Ce module présente PySpark, l’API Python pour Apache Spark — le moteur de traitement distribué le plus utilisé en Big Data.
| Niveau | Compétence |
|---|---|
| ✅ Requis | Avoir suivi le module 08_intro_big_data_distributed |
| ✅ Requis | Comprendre les 5V du Big Data |
| ✅ Requis | Comprendre MapReduce et ses limites |
| ✅ Requis | Maîtriser Python (modules 04-05) |
| ✅ Requis | Maîtriser SQL (module 07) |
À la fin de ce notebook, tu seras capable de :
Tu as vu dans le module 08 que Spark a remplacé MapReduce comme moteur de traitement Big Data. Voici pourquoi :
MapReduce : DISQUE → Map → DISQUE → Shuffle → DISQUE → Reduce → DISQUE
↑ ↑ ↑ ↑
└───────────┴──────────────┴──────────────┘
LENT ! (I/O disque)
Spark : DISQUE → Transformations → MÉMOIRE → ... → MÉMOIRE → Action
↑ ↑
└────────────────┘
RAPIDE ! (in-memory)
| V | Comment Spark répond |
|---|---|
| Volume | Traitement distribué sur cluster (To → Po) |
| Velocity | Spark Streaming pour le temps réel |
| Variety | Lit CSV, JSON, Parquet, JDBC, Avro… |
| Veracity | Transformations pour nettoyer les données |
| Value | Spark SQL, MLlib pour extraire de la valeur |
┌─────────────────────────────────────────────────────────────────┐
│ ÉCOSYSTÈME BIG DATA │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Sources Traitement Stockage │
│ ──────── ────────── ──────── │
│ │
│ Kafka ─┐ ┌─► Data Lake (S3) │
│ Fichiers ─┼──► ⚡ SPARK ⚡ ──────┼─► Data Warehouse │
│ JDBC ─┤ (PySpark) ├─► NoSQL (MongoDB) │
│ APIs ─┘ └─► Elasticsearch │
│ │
└─────────────────────────────────────────────────────────────────┘
💡 Ce notebook est interactif : tu peux exécuter toutes les cellules de code !
PySpark nécessite Java. Vérifions d’abord l’installation.
Apache Spark est un moteur de traitement distribué ultra-rapide pour le Big Data.
# Créer une SparkSession
spark = SparkSession.builder \
.appName("PySpark Data Engineering Tutorial") \
.master("local[*]") \
.config("spark.driver.memory", "4g") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
print(" SparkSession créée")
print(f"Version Spark : {spark.version}")
print(f"Application : {spark.sparkContext.appName}")
print(f"Master : {spark.sparkContext.master}")# Méthode 1 : Depuis une liste Python
data = [
(1, "Alice", 25, "Paris", 45000),
(2, "Bob", 30, "Lyon", 55000),
(3, "Charlie", 35, "Paris", 60000),
(4, "David", 28, "Marseille", 50000),
(5, "Eve", 32, "Lyon", 58000)
]
columns = ["id", "nom", "age", "ville", "salaire"]
df = spark.createDataFrame(data, columns)
print(" Premier DataFrame créé :")
df.show()# Méthode 3 : Avec un schéma explicite
schema = StructType([
StructField("id", IntegerType(), False),
StructField("nom", StringType(), False),
StructField("age", IntegerType(), True),
StructField("ville", StringType(), True),
StructField("salaire", IntegerType(), True)
])
df_with_schema = spark.createDataFrame(data, schema)
print(" DataFrame avec schéma explicite :")
df_with_schema.printSchema()# Afficher le schéma
print(" Schéma du DataFrame :")
df.printSchema()
# Afficher les premières lignes
print("\n Premières lignes :")
df.show(3)
# Compter les lignes
print(f"\n Nombre de lignes : {df.count()}")
# Colonnes
print(f"\n Colonnes : {df.columns}")
# Types de données
print("\n Types de données :")
print(df.dtypes)Les transformations sont lazy : elles ne s’exécutent que lorsqu’une action est appelée.
# Sélectionner des colonnes
print(" Sélection de colonnes :")
df.select("nom", "ville").show()
# Avec alias
print("\n Avec alias :")
df.select(
F.col("nom").alias("employee_name"),
F.col("salaire").alias("salary")
).show()
# Sélectionner avec expressions
print("\n Avec expressions :")
df.select(
"nom",
(F.col("salaire") * 12).alias("salaire_annuel")
).show()# Filtrer les lignes
print(" Employés de Paris :")
df.filter(F.col("ville") == "Paris").show()
# Filtres multiples avec AND
print("\n Employés de Paris avec salaire > 50000 :")
df.filter(
(F.col("ville") == "Paris") &
(F.col("salaire") > 50000)
).show()
# Filtres avec OR
print("\n Employés de Paris OU Lyon :")
df.filter(
(F.col("ville") == "Paris") |
(F.col("ville") == "Lyon")
).show()
# Filtrer avec IN
print("\n Villes avec IN :")
df.filter(F.col("ville").isin(["Paris", "Lyon"])).show()# Ajouter une nouvelle colonne
df_with_bonus = df.withColumn(
"bonus",
F.col("salaire") * 0.1
)
print(" Ajout de la colonne 'bonus' :")
df_with_bonus.show()
# Modifier une colonne existante
df_modified = df.withColumn(
"salaire",
F.col("salaire") * 1.05 # Augmentation de 5%
)
print("\n Salaire augmenté de 5% :")
df_modified.show()# Ajouter plusieurs colonnes
df_enriched = df \
.withColumn("salaire_mensuel", F.col("salaire")) \
.withColumn("salaire_annuel", F.col("salaire") * 12) \
.withColumn("bonus", F.col("salaire") * 0.1) \
.withColumn("total_annuel", F.col("salaire_annuel") + F.col("bonus"))
print(" DataFrame enrichi :")
df_enriched.select("nom", "salaire_mensuel", "salaire_annuel", "bonus", "total_annuel").show()# Trier par salaire (ascendant)
print(" Tri par salaire (croissant) :")
df.orderBy("salaire").show()
# Trier par salaire (descendant)
print("\n Tri par salaire (décroissant) :")
df.orderBy(F.col("salaire").desc()).show()
# Tri multiple
print("\n Tri par ville puis salaire :")
df.orderBy("ville", F.col("salaire").desc()).show()Les agrégations permettent de calculer des statistiques sur les données.
# Créer un DataFrame plus complexe pour les exemples
data_ventes = [
("2024-01", "Paris", "Produit A", 100, 1500),
("2024-01", "Paris", "Produit B", 50, 2000),
("2024-01", "Lyon", "Produit A", 75, 1200),
("2024-02", "Paris", "Produit A", 120, 1800),
("2024-02", "Lyon", "Produit B", 60, 2400),
("2024-02", "Marseille", "Produit A", 90, 1350),
]
columns_ventes = ["mois", "ville", "produit", "quantite", "montant"]
df_ventes = spark.createDataFrame(data_ventes, columns_ventes)
print(" Données de ventes :")
df_ventes.show()# Agrégations conditionnelles
print(" Agrégations conditionnelles :")
df_ventes.groupBy("ville").agg(
F.sum("montant").alias("total"),
F.sum(F.when(F.col("produit") == "Produit A", F.col("montant")).otherwise(0)).alias("total_produit_a"),
F.sum(F.when(F.col("produit") == "Produit B", F.col("montant")).otherwise(0)).alias("total_produit_b")
).show()# Calculs cumulatifs
window_cumul = Window.partitionBy("ville").orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_cumul = df.withColumn(
"salaire_cumul",
F.sum("salaire").over(window_cumul)
)
print(" Salaire cumulé par ville :")
df_cumul.select("id", "nom", "ville", "salaire", "salaire_cumul").orderBy("ville", "id").show()# Calcul de moyennes mobiles
window_rolling = Window.partitionBy("ville").orderBy("id").rowsBetween(-1, 1)
df_rolling = df.withColumn(
"salaire_avg_3",
F.avg("salaire").over(window_rolling)
)
print(" Moyenne mobile sur 3 lignes :")
df_rolling.select("id", "nom", "ville", "salaire", "salaire_avg_3").orderBy("ville", "id").show()Les jointures permettent de combiner plusieurs DataFrames.
# DataFrame employés
employes = spark.createDataFrame([
(1, "Alice", "IT"),
(2, "Bob", "Finance"),
(3, "Charlie", "IT"),
(4, "David", "HR")
], ["emp_id", "nom", "dept_id"])
# DataFrame départements
departements = spark.createDataFrame([
("IT", "Information Technology", "Paris"),
("Finance", "Finance Department", "Lyon"),
("HR", "Human Resources", "Marseille"),
("Marketing", "Marketing Department", "Paris")
], ["dept_id", "dept_name", "location"])
print("👥 Employés :")
employes.show()
print("\n🏢 Départements :")
departements.show()# INNER JOIN (par défaut)
print("🔗 INNER JOIN :")
employes.join(departements, "dept_id", "inner").show()
# LEFT JOIN
print("\n🔗 LEFT JOIN :")
employes.join(departements, "dept_id", "left").show()
# RIGHT JOIN
print("\n🔗 RIGHT JOIN :")
employes.join(departements, "dept_id", "right").show()
# FULL OUTER JOIN
print("\n🔗 FULL OUTER JOIN :")
employes.join(departements, "dept_id", "outer").show()# Jointure avec colonnes différentes
employes_alt = employes.withColumnRenamed("dept_id", "department")
print(" Jointure avec colonnes différentes :")
employes_alt.join(
departements,
employes_alt.department == departements.dept_id,
"inner"
).select(
employes_alt["*"],
departements.dept_name,
departements.location
).show()# Jointures multiples
salaires = spark.createDataFrame([
(1, 45000),
(2, 55000),
(3, 50000),
(4, 48000)
], ["emp_id", "salaire"])
print("🔗 Jointures multiples :")
result = employes \
.join(departements, "dept_id", "inner") \
.join(salaires, "emp_id", "inner")
result.select("nom", "dept_name", "location", "salaire").show()Spark supporte de nombreux formats de fichiers.
# Créer des données de test
import os
os.makedirs('data', exist_ok=True)
# Écrire en CSV
df.write \
.mode("overwrite") \
.option("header", "true") \
.csv("data/employes.csv")
print(" CSV écrit")
# Lire le CSV
df_from_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("data/employes.csv")
print("\n CSV lu :")
df_from_csv.show(3)
df_from_csv.printSchema()# Parquet avec partitionnement
df.write \
.mode("overwrite") \
.partitionBy("ville") \
.parquet("data/employes_partitioned.parquet")
print(" Parquet partitionné écrit")
# Lire avec filtre de partition (très performant)
df_paris = spark.read \
.parquet("data/employes_partitioned.parquet") \
.filter(F.col("ville") == "Paris")
print("\n Parquet partitionné lu (ville=Paris) :")
df_paris.show()# Mode d'écriture
# - "overwrite" : Écrase les données existantes
# - "append" : Ajoute aux données existantes
# - "ignore" : Ne fait rien si le fichier existe
# - "error" (default) : Erreur si le fichier existe
# Compression
df.write \
.mode("overwrite") \
.option("compression", "snappy") \
.parquet("data/employes_compressed.parquet")
print(" Parquet compressé écrit")
# Contrôler le nombre de fichiers
df.coalesce(1).write \
.mode("overwrite") \
.csv("data/employes_single_file.csv")
print(" CSV en un seul fichier écrit")Spark permet d’exécuter des requêtes SQL sur les DataFrames.
# CTE (Common Table Expression)
result = spark.sql("""
WITH stats_ville AS (
SELECT
ville,
AVG(salaire) as salaire_moyen
FROM employes
GROUP BY ville
)
SELECT
e.nom,
e.ville,
e.salaire,
s.salaire_moyen,
ROUND(e.salaire - s.salaire_moyen, 2) as diff_moyenne
FROM employes e
JOIN stats_ville s ON e.ville = s.ville
ORDER BY e.ville, e.salaire DESC
""")
print(" Comparaison à la moyenne :")
result.show()PySpark offre plusieurs méthodes pour gérer les nulls.
# Créer un DataFrame avec des nulls
data_with_nulls = [
(1, "Alice", 25, "Paris", 45000),
(2, "Bob", None, "Lyon", 55000),
(3, "Charlie", 35, None, 60000),
(4, "David", 28, "Marseille", None),
(5, None, 32, "Lyon", 58000)
]
df_nulls = spark.createDataFrame(data_with_nulls, ["id", "nom", "age", "ville", "salaire"])
print(" DataFrame avec nulls :")
df_nulls.show()# Supprimer les lignes avec des nulls
print(" Suppression de toutes les lignes avec nulls :")
df_nulls.dropna().show()
# Supprimer seulement si toutes les colonnes sont nulles
print("\n Suppression si toutes les colonnes sont nulles :")
df_nulls.dropna(how='all').show()
# Supprimer les nulls sur des colonnes spécifiques
print("\n Suppression des nulls sur 'nom' et 'ville' :")
df_nulls.dropna(subset=["nom", "ville"]).show()# Remplir les nulls avec des valeurs par défaut
print("✨ Remplir tous les nulls avec 0 :")
df_nulls.fillna(0).show()
# Remplir avec des valeurs différentes par colonne
print("\n✨ Remplir avec des valeurs spécifiques :")
df_nulls.fillna({
"nom": "Inconnu",
"age": 30,
"ville": "Non spécifié",
"salaire": 50000
}).show()# Remplacer les nulls par la moyenne/médiane
from pyspark.sql.functions import mean, median
# Calculer la moyenne
age_moyen = df_nulls.select(mean("age")).first()[0]
salaire_moyen = df_nulls.select(mean("salaire")).first()[0]
print(f"Age moyen : {age_moyen}")
print(f"Salaire moyen : {salaire_moyen}")
# Remplir avec les moyennes
df_filled = df_nulls.fillna({
"age": age_moyen,
"salaire": salaire_moyen
})
print("\n✨ Nulls remplacés par les moyennes :")
df_filled.show()Les UDFs permettent d’appliquer des fonctions Python personnalisées.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Définir une fonction Python
def categoriser_age(age):
if age is None:
return "Inconnu"
elif age < 30:
return "Junior"
elif age < 40:
return "Senior"
else:
return "Expert"
# Enregistrer comme UDF
categoriser_age_udf = udf(categoriser_age, StringType())
# Utiliser l'UDF
df_with_category = df.withColumn(
"categorie",
categoriser_age_udf(F.col("age"))
)
print("🔧 DataFrame avec catégorie :")
df_with_category.show()# UDF avec plusieurs paramètres
def calculer_bonus(salaire, performance):
if salaire is None or performance is None:
return 0
return salaire * performance * 0.1
calculer_bonus_udf = udf(calculer_bonus, IntegerType())
# Ajouter une colonne de performance
df_perf = df.withColumn("performance", F.lit(1.2))
# Appliquer l'UDF
df_with_bonus = df_perf.withColumn(
"bonus",
calculer_bonus_udf(F.col("salaire"), F.col("performance"))
)
print("💰 DataFrame avec bonus :")
df_with_bonus.select("nom", "salaire", "performance", "bonus").show()# ⚠️ Attention : Les UDFs sont moins performantes que les fonctions natives
# Préférer les fonctions natives quand c'est possible
# Avec UDF (plus lent)
double_udf = udf(lambda x: x * 2, IntegerType())
df.withColumn("salaire_double_udf", double_udf(F.col("salaire")))
# Avec fonction native (plus rapide)
df.withColumn("salaire_double", F.col("salaire") * 2)
print("✅ Préférez toujours les fonctions natives !")Quelques techniques pour optimiser vos jobs Spark.
# Vérifier le nombre de partitions
print(f"Nombre de partitions : {df.rdd.getNumPartitions()}")
# Repartitionner (shuffle)
df_repartitioned = df.repartition(4)
print(f"Après repartition : {df_repartitioned.rdd.getNumPartitions()}")
# Coalesce (pas de shuffle, moins coûteux)
df_coalesced = df.coalesce(2)
print(f"Après coalesce : {df_coalesced.rdd.getNumPartitions()}")# Cache un DataFrame en mémoire
df_cached = df.cache()
# Première action : calcul complet
print("Première action (calcul complet) :")
df_cached.count()
# Deuxième action : utilise le cache (beaucoup plus rapide)
print("\nDeuxième action (utilise le cache) :")
df_cached.show()
# Libérer le cache
df_cached.unpersist()
print("\n Cache libéré")# Persist avec différents niveaux de stockage
from pyspark import StorageLevel
# MEMORY_ONLY : En mémoire uniquement
df.persist(StorageLevel.MEMORY_ONLY)
# MEMORY_AND_DISK : Mémoire + disque si nécessaire
df.persist(StorageLevel.MEMORY_AND_DISK)
# DISK_ONLY : Disque uniquement
df.persist(StorageLevel.DISK_ONLY)
print(" Différents niveaux de persistance disponibles")# ❌ Avec UDF (lent)
def add_ten(x):
return x + 10
add_ten_udf = udf(add_ten, IntegerType())
df.withColumn("salaire_plus_10_udf", add_ten_udf(F.col("salaire")))
# ✅ Avec fonction native (rapide)
df.withColumn("salaire_plus_10", F.col("salaire") + 10)
print(" Les fonctions natives sont toujours plus rapides !")Traitement de données en temps réel avec Spark.
# Créer un streaming DataFrame depuis un dossier
# Les fichiers ajoutés au dossier seront traités automatiquement
schema_stream = StructType([
StructField("timestamp", TimestampType(), True),
StructField("user_id", IntegerType(), True),
StructField("action", StringType(), True),
StructField("value", IntegerType(), True)
])
# Lire un stream depuis un dossier
stream_df = spark.readStream \
.schema(schema_stream) \
.json("data/stream_input/")
print(" Stream créé")
print(f"Is streaming: {stream_df.isStreaming}")# Écrire le stream vers un sink
# Note: Ce code est un exemple, il nécessite un stream actif pour s'exécuter
# query = stream_processed.writeStream \
# .outputMode("complete") \
# .format("console") \
# .start()
# # Attendre la fin du stream
# query.awaitTermination()
print("💡 Exemple de streaming (nécessite des données en entrée pour s'exécuter)")Construisons un pipeline ETL complet.
import os
from datetime import datetime
# Créer la structure de dossiers
os.makedirs('spark_pipeline/raw', exist_ok=True)
os.makedirs('spark_pipeline/processed', exist_ok=True)
os.makedirs('spark_pipeline/output', exist_ok=True)
os.makedirs('spark_pipeline/logs', exist_ok=True)
print("✅ Structure créée")def extract_data(spark, path):
"""Extrait des données depuis plusieurs sources"""
print(f"📥 Extraction depuis {path}")
# Créer des données de test
data = [
(1, "2024-01-15", "Paris", "Produit A", 100, 1500, "online"),
(2, "2024-01-15", "Lyon", "Produit B", 50, 2000, "store"),
(3, "2024-01-16", "Paris", "Produit A", 75, 1200, "online"),
(4, "2024-01-16", "Marseille", "Produit C", 120, 1800, "online"),
(5, "2024-01-17", "Lyon", "Produit B", 60, 2400, "store"),
(6, "2024-01-17", None, "Produit A", 90, None, "online"), # Données sales
]
columns = ["id", "date", "ville", "produit", "quantite", "montant", "canal"]
df = spark.createDataFrame(data, columns)
# Sauvegarder les données brutes
df.write.mode("overwrite").parquet(f"{path}/ventes_raw.parquet")
print(f" {df.count()} lignes extraites")
return df
# Test
df_raw = extract_data(spark, "spark_pipeline/raw")
df_raw.show()def transform_data(df):
"""Transforme et nettoie les données"""
print(" Transformation des données")
# 1. Convertir la date
df = df.withColumn("date", F.to_date(F.col("date")))
# 2. Gérer les valeurs manquantes
df = df.fillna({
"ville": "Inconnu",
"montant": 0
})
# 3. Filtrer les données invalides
df = df.filter(
(F.col("quantite") > 0) &
(F.col("montant") >= 0)
)
# 4. Créer des colonnes dérivées
df = df.withColumn(
"prix_unitaire",
F.when(F.col("quantite") > 0, F.col("montant") / F.col("quantite")).otherwise(0)
)
df = df.withColumn(
"annee",
F.year(F.col("date"))
)
df = df.withColumn(
"mois",
F.month(F.col("date"))
)
df = df.withColumn(
"jour_semaine",
F.dayofweek(F.col("date"))
)
# 5. Catégoriser
df = df.withColumn(
"categorie_montant",
F.when(F.col("montant") < 1500, "Faible")
.when(F.col("montant") < 2000, "Moyen")
.otherwise("Élevé")
)
# 6. Ajouter metadata
df = df.withColumn("processed_at", F.current_timestamp())
print(f" {df.count()} lignes transformées")
return df
# Test
df_transformed = transform_data(df_raw)
df_transformed.show()def aggregate_data(df):
"""Crée des agrégations métier"""
print(" Agrégation des données")
# Agrégation par ville et produit
agg_ville_produit = df.groupBy("ville", "produit").agg(
F.sum("quantite").alias("total_quantite"),
F.sum("montant").alias("total_montant"),
F.avg("prix_unitaire").alias("prix_moyen"),
F.count("*").alias("nb_transactions")
).orderBy("ville", "produit")
print("\n Agrégation par ville et produit :")
agg_ville_produit.show()
# Agrégation par canal
agg_canal = df.groupBy("canal").agg(
F.sum("montant").alias("total_montant"),
F.count("*").alias("nb_transactions"),
F.avg("montant").alias("montant_moyen")
)
print("\n Agrégation par canal :")
agg_canal.show()
# Agrégation temporelle
agg_temporelle = df.groupBy("annee", "mois").agg(
F.sum("montant").alias("total_montant"),
F.count("*").alias("nb_transactions")
).orderBy("annee", "mois")
print("\n Agrégation temporelle :")
agg_temporelle.show()
return {
"ville_produit": agg_ville_produit,
"canal": agg_canal,
"temporelle": agg_temporelle
}
# Test
aggregations = aggregate_data(df_transformed)def load_data(df, aggregations, output_path):
"""Charge les données dans le datalake"""
print(" Chargement des données")
# 1. Données transformées (partitionnées par date)
df.write \
.mode("overwrite") \
.partitionBy("annee", "mois") \
.parquet(f"{output_path}/ventes_transformed")
print(" Données transformées sauvegardées")
# 2. Agrégations
for name, agg_df in aggregations.items():
agg_df.write \
.mode("overwrite") \
.parquet(f"{output_path}/agg_{name}")
print(f" Agrégation '{name}' sauvegardée")
# 3. Export CSV pour l'analyse
df.coalesce(1).write \
.mode("overwrite") \
.option("header", "true") \
.csv(f"{output_path}/ventes_export.csv")
print(" Export CSV créé")
# Test
load_data(df_transformed, aggregations, "spark_pipeline/output")def run_pipeline(spark):
"""Exécute le pipeline ETL complet"""
import time
start_time = time.time()
print("="*60)
print("🚀 DÉMARRAGE DU PIPELINE PYSPARK")
print("="*60)
try:
# EXTRACT
print("\n PHASE 1: EXTRACTION")
df_raw = extract_data(spark, "spark_pipeline/raw")
# TRANSFORM
print("\n PHASE 2: TRANSFORMATION")
df_transformed = transform_data(df_raw)
# Cache pour les performances
df_transformed.cache()
# AGGREGATE
print("\n PHASE 3: AGRÉGATION")
aggregations = aggregate_data(df_transformed)
# LOAD
print("\n PHASE 4: CHARGEMENT")
load_data(df_transformed, aggregations, "spark_pipeline/output")
# STATISTICS
duration = time.time() - start_time
print("\n" + "="*60)
print("STATISTIQUES DU PIPELINE")
print("="*60)
print(f"Durée totale: {duration:.2f}s")
print(f"Lignes traitées: {df_transformed.count()}")
print(f"Partitions: {df_transformed.rdd.getNumPartitions()}")
print("="*60)
print("PIPELINE TERMINÉ AVEC SUCCÈS")
print("="*60)
# Libérer le cache
df_transformed.unpersist()
return True
except Exception as e:
print(f"\n ERREUR: {e}")
import traceback
traceback.print_exc()
return False
# Exécuter le pipeline
success = run_pipeline(spark)| Aspect | Pandas | PySpark |
|---|---|---|
| Exécution | Eager (immédiate) | Lazy (différée) |
| Données | En mémoire (single machine) | Distribuées (cluster) |
| Scalabilité | Limité à la RAM | Quasi illimité |
| API | df[df['col'] > 5] |
df.filter(F.col('col') > 5) |
| Performances | Rapide pour petites données | Rapide pour Big Data |
✅ Utilisez PySpark si : - Données > 10 GB - Besoin de parallélisation - Traitement distribué nécessaire - Streaming en temps réel
❌ Utilisez Pandas si : - Données < 10 GB - Prototypage rapide - Analyses exploratoires - Machine learning local
Félicitations ! 🎉 Vous maîtrisez maintenant les fondamentaux de PySpark !
Testez vos connaissances PySpark !
filter, select, groupBy) : elles construisent un plan d’exécution mais ne l’exécutent pas immédiatement. Les actions (comme count, show, collect) déclenchent l’exécution réelle du job Spark.
df['new_col'] = df['old_col'] * 2df.withColumn('new_col', F.col('old_col') * 2)df.add_column('new_col', 'old_col * 2')df.new_column('new_col', df['old_col'] * 2)withColumn('nom', expression) avec F.col() pour référencer les colonnes. Les DataFrames Spark sont immutables, donc withColumn retourne un nouveau DataFrame.
df.show()df.explain()df.describe()df.plan()df.explain() affiche le plan d’exécution physique de Spark. Utilisez df.explain(True) pour voir le plan détaillé avec toutes les étapes d’optimisation.
F.broadcast(small_df) pour forcer un broadcast join.
cache() ou persist() ?cache() et persist() stockent le DataFrame en mémoire pour éviter de recalculer les mêmes transformations plusieurs fois. Utilisez-les quand vous réutilisez un DataFrame plusieurs fois dans votre code.
repartition() et coalesce() ?repartition() fait un full shuffle, coalesce() réduit sans shufflecoalesce() est plus lent que repartition()repartition() supprime les données, coalesce() les garderepartition(n) fait un full shuffle (coûteux) pour redistribuer les données. coalesce(n) réduit le nombre de partitions sans shuffle en combinant les partitions existantes. Utilisez coalesce() pour réduire les partitions, repartition() pour augmenter.
df.sql('SELECT * FROM table')spark.sql('SELECT * FROM table') après avoir créé une vue temporairedf.query('SELECT * FROM table')spark.execute('SELECT * FROM table')df.createOrReplaceTempView('table'), puis exécuter spark.sql('SELECT * FROM table').
df.write.parquet('path')df.write.partitionBy('date').parquet('path')df.write.csv('path')df.write.json('path')partitionBy('colonne') crée une hiérarchie de dossiers par valeur de colonne (ex: date=2024-01-01/, date=2024-01-02/). Cela permet de lire uniquement les partitions nécessaires (partition pruning) et améliore considérablement les performances.
| Plateforme | Service Spark |
|---|---|
| Databricks | Databricks Lakehouse |
| AWS | EMR (Elastic MapReduce) |
| Azure | Synapse Analytics, HDInsight |
| GCP | Dataproc |
Tu maîtrises maintenant le traitement Big Data avec PySpark !
Pour continuer ton parcours Data Engineering,
👉 Module suivant : 12_orchestration_pipelines — Orchestration de pipelines
🎉 Félicitations ! Tu as terminé le module PySpark et le parcours sur les bases de données et le Big Data !