🚀 Spark & Scala Deep Dive
Bienvenue dans ce module avancé où tu vas maîtriser Scala pour Spark et comprendre les internals de Spark pour écrire des jobs performants. Tu apprendras à développer, builder et déployer des applications Spark professionnelles.
Prérequis
| Niveau | Compétence |
|---|---|
| ✅ Requis | PySpark DataFrame API (M19) |
| ✅ Requis | Spark sur Kubernetes (M21) |
| ✅ Requis | Notions de programmation fonctionnelle |
| 💡 Recommandé | Expérience avec un IDE (VS Code, PyCharm) |
🎯 Objectifs du module
À la fin de ce module, tu seras capable de :
- Écrire du code Scala idiomatique pour Spark
- Utiliser les ADT (Algebraic Data Types) et Either/Try pour des pipelines robustes
- Configurer un environnement de développement complet (Notebook + IntelliJ)
- Builder et déployer des applications Spark avec sbt et spark-submit
- Comprendre Catalyst, AQE et Tungsten pour optimiser tes jobs
- Diagnostiquer et résoudre les problèmes de performance
1. Scala pour Data Engineers
1.1 Pourquoi Scala pour Spark ?
| Aspect | Scala | Python |
|---|---|---|
| Performance | Natif JVM, pas de sérialisation | Sérialisation Python ↔︎ JVM |
| Typage | Statique, erreurs à la compilation | Dynamique, erreurs au runtime |
| API Spark | API native, toutes les features | Wrapper, parfois en retard |
| Écosystème | Kafka, Flink, Akka | ML, Data Science |
| Courbe d’apprentissage | Plus raide | Plus accessible |
Règle pratique : - Python : Exploration, prototypage, Data Science, petits pipelines - Scala : Production, gros volumes, performance critique, équipe backend Java/Scala
1.2 Syntaxe Essentielle
// ═══════════════════════════════════════════════════════════════
// Variables : val (immutable) vs var (mutable)
// ═══════════════════════════════════════════════════════════════
val name: String = "Spark" // Immutable (préféré)
var counter: Int = 0 // Mutable (à éviter)
val inferred = 42 // Type inféré automatiquement
// ═══════════════════════════════════════════════════════════════
// Fonctions
// ═══════════════════════════════════════════════════════════════
// Fonction classique
def add(a: Int, b: Int): Int = {
a + b
}
// Fonction one-liner (return implicite)
def multiply(a: Int, b: Int): Int = a * b
// Fonction anonyme (lambda)
val double = (x: Int) => x * 2
// Paramètres par défaut
def greet(name: String, greeting: String = "Hello"): String =
s"$greeting, $name!"
greet("Alice") // "Hello, Alice!"
greet("Bob", "Bonjour") // "Bonjour, Bob!"
// ═══════════════════════════════════════════════════════════════
// String interpolation
// ═══════════════════════════════════════════════════════════════
val version = 3.5
println(s"Spark version: $version") // Simple
println(s"Next version: ${version + 0.1}") // Expression
println(f"Pi = ${math.Pi}%.2f") // Formaté
// ═══════════════════════════════════════════════════════════════
// Conditions et boucles
// ═══════════════════════════════════════════════════════════════
// if est une expression (retourne une valeur)
val status = if (counter > 0) "positive" else "zero or negative"
// for-comprehension
val squares = for (i <- 1 to 5) yield i * i // Vector(1, 4, 9, 16, 25)
// for avec filtres
val evenSquares = for {
i <- 1 to 10
if i % 2 == 0
} yield i * i // Vector(4, 16, 36, 64, 100)1.3 Collections Fonctionnelles
Les collections Scala sont la base pour comprendre les transformations Spark (RDD, DataFrame).
// ═══════════════════════════════════════════════════════════════
// Types de collections
// ═══════════════════════════════════════════════════════════════
val list = List(1, 2, 3, 4, 5) // Immutable, linked list
val vector = Vector(1, 2, 3, 4, 5) // Immutable, indexed
val set = Set(1, 2, 3, 3, 3) // Immutable, unique: Set(1, 2, 3)
val map = Map("a" -> 1, "b" -> 2) // Immutable, key-value
// ═══════════════════════════════════════════════════════════════
// Transformations (comme Spark !)
// ═══════════════════════════════════════════════════════════════
val numbers = List(1, 2, 3, 4, 5)
// map : transformer chaque élément
numbers.map(x => x * 2) // List(2, 4, 6, 8, 10)
numbers.map(_ * 2) // Syntaxe courte
// filter : garder les éléments qui matchent
numbers.filter(x => x > 2) // List(3, 4, 5)
numbers.filter(_ > 2) // Syntaxe courte
// flatMap : map + flatten
val words = List("hello world", "scala spark")
words.flatMap(_.split(" ")) // List("hello", "world", "scala", "spark")
// reduce : agréger en une valeur
numbers.reduce((a, b) => a + b) // 15
numbers.reduce(_ + _) // Syntaxe courte
// fold : reduce avec valeur initiale
numbers.fold(0)(_ + _) // 15
numbers.fold(10)(_ + _) // 25 (commence à 10)
// ═══════════════════════════════════════════════════════════════
// Chaînage (comme les pipelines Spark)
// ═══════════════════════════════════════════════════════════════
val result = numbers
.filter(_ % 2 == 0) // Garder les pairs
.map(_ * 10) // Multiplier par 10
.sum // Sommer : 60 (20 + 40)
// ═══════════════════════════════════════════════════════════════
// groupBy (comme Spark groupBy !)
// ═══════════════════════════════════════════════════════════════
case class Sale(product: String, amount: Double)
val sales = List(
Sale("laptop", 1000),
Sale("phone", 500),
Sale("laptop", 1200),
Sale("phone", 600)
)
val byProduct = sales.groupBy(_.product)
// Map(
// "laptop" -> List(Sale("laptop", 1000), Sale("laptop", 1200)),
// "phone" -> List(Sale("phone", 500), Sale("phone", 600))
// )
val totalByProduct = sales
.groupBy(_.product)
.map { case (product, sales) =>
(product, sales.map(_.amount).sum)
}
// Map("laptop" -> 2200.0, "phone" -> 1100.0)1.4 Option : Gérer les valeurs absentes
// Option = Some(valeur) ou None (jamais null !)
def findUser(id: Int): Option[String] = {
val users = Map(1 -> "Alice", 2 -> "Bob")
users.get(id) // Retourne Option[String]
}
findUser(1) // Some("Alice")
findUser(99) // None
// Pattern matching
findUser(1) match {
case Some(name) => println(s"Found: $name")
case None => println("User not found")
}
// getOrElse : valeur par défaut
val user = findUser(99).getOrElse("Unknown")
// map sur Option (sûr, pas d'exception)
val upperName = findUser(1).map(_.toUpperCase) // Some("ALICE")
val noName = findUser(99).map(_.toUpperCase) // None (pas d'erreur !)
// flatMap pour chaîner
def findEmail(name: String): Option[String] = {
if (name == "Alice") Some("alice@example.com") else None
}
val email = findUser(1).flatMap(findEmail) // Some("alice@example.com")1.5 Case Classes et Pattern Matching
// ═══════════════════════════════════════════════════════════════
// Case Class = classe de données immuable
// ═══════════════════════════════════════════════════════════════
case class Customer(
id: Long,
name: String,
email: String,
country: String = "France" // Valeur par défaut
)
// Création (pas besoin de "new")
val alice = Customer(1, "Alice", "alice@example.com")
val bob = Customer(2, "Bob", "bob@example.com", "USA")
// Accesseurs automatiques
println(alice.name) // "Alice"
println(alice.country) // "France"
// equals automatique (compare les valeurs)
val alice2 = Customer(1, "Alice", "alice@example.com")
println(alice == alice2) // true !
// copy : créer une copie modifiée (immutabilité)
val aliceUSA = alice.copy(country = "USA")
// toString automatique
println(alice) // Customer(1,Alice,alice@example.com,France)
// ═══════════════════════════════════════════════════════════════
// Pattern Matching (switch++ ultra puissant)
// ═══════════════════════════════════════════════════════════════
def describeCustomer(c: Customer): String = c match {
case Customer(_, "Alice", _, _) => "C'est Alice !"
case Customer(id, _, _, "France") => s"Client français #$id"
case Customer(_, name, _, country) => s"$name de $country"
}
// Pattern matching avec guards
def customerTier(c: Customer, totalSpent: Double): String = c match {
case Customer(_, _, _, "France") if totalSpent > 10000 => "VIP France"
case _ if totalSpent > 5000 => "Gold"
case _ => "Standard"
}
// ═══════════════════════════════════════════════════════════════
// Case classes et Spark
// ═══════════════════════════════════════════════════════════════
// Spark utilise les case classes pour créer des Datasets typés !
import spark.implicits._
case class Sale(product: String, amount: Double, date: String)
val sales = Seq(
Sale("laptop", 1000, "2024-01-15"),
Sale("phone", 500, "2024-01-16")
).toDS() // Dataset[Sale] - typé !
// Accès typé
sales.filter(_.amount > 600).show()1.6 Sealed Traits et ADT (Algebraic Data Types) 🔥
Les ADT permettent de modéliser tous les états possibles d’un système. Le compilateur garantit l’exhaustivité !
// ═══════════════════════════════════════════════════════════════
// sealed trait = toutes les sous-classes dans le même fichier
// Le compilateur CONNAÎT tous les cas possibles
// ═══════════════════════════════════════════════════════════════
// Modéliser le résultat d'un job ETL
sealed trait JobResult
case class Success(recordsProcessed: Long, durationMs: Long) extends JobResult
case class Failure(error: String, stage: String) extends JobResult
case object Skipped extends JobResult // object = singleton
def handleResult(result: JobResult): Unit = result match {
case Success(n, d) => println(s"✅ $n records traités en ${d}ms")
case Failure(e, s) => println(s"❌ Erreur à l'étape $s: $e")
case Skipped => println(s"⏭️ Job ignoré")
}
// Si tu oublies un cas, le compilateur te PRÉVIENT !
// ═══════════════════════════════════════════════════════════════
// Exemple : Sources de données
// ═══════════════════════════════════════════════════════════════
sealed trait DataSource
case class JdbcSource(url: String, table: String, user: String) extends DataSource
case class S3Source(bucket: String, path: String, format: String) extends DataSource
case class KafkaSource(brokers: String, topic: String) extends DataSource
case class LocalSource(path: String) extends DataSource
def readData(source: DataSource)(implicit spark: SparkSession): DataFrame = source match {
case JdbcSource(url, table, user) =>
spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.load()
case S3Source(bucket, path, format) =>
spark.read
.format(format)
.load(s"s3a://$bucket/$path")
case KafkaSource(brokers, topic) =>
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.load()
case LocalSource(path) =>
spark.read.parquet(path)
}
// ═══════════════════════════════════════════════════════════════
// Exemple : Modes de traitement
// ═══════════════════════════════════════════════════════════════
sealed trait WriteMode
case object Overwrite extends WriteMode
case object Append extends WriteMode
case class MergeByKey(keys: Seq[String]) extends WriteMode
def writeData(df: DataFrame, path: String, mode: WriteMode): Unit = mode match {
case Overwrite => df.write.mode("overwrite").parquet(path)
case Append => df.write.mode("append").parquet(path)
case MergeByKey(keys) =>
// Logique Delta Lake MERGE
println(s"Merge on keys: ${keys.mkString(", ")}")
}1.7 Gestion d’Erreurs : Either et Try 🔥
Fini les try/catch partout ! Scala offre des types pour gérer les erreurs proprement.
// ═══════════════════════════════════════════════════════════════
// Try[T] : capturer les exceptions (code legacy, I/O)
// ═══════════════════════════════════════════════════════════════
import scala.util.{Try, Success, Failure}
def parseNumber(s: String): Try[Int] = Try {
s.toInt // Peut lancer NumberFormatException
}
parseNumber("42") // Success(42)
parseNumber("abc") // Failure(NumberFormatException)
// Pattern matching
parseNumber("42") match {
case Success(n) => println(s"Nombre: $n")
case Failure(ex) => println(s"Erreur: ${ex.getMessage}")
}
// getOrElse
val num = parseNumber("abc").getOrElse(0) // 0
// map / flatMap (chaînage sûr)
val doubled = parseNumber("21").map(_ * 2) // Success(42)
// recover : transformer l'erreur
val safe = parseNumber("abc").recover {
case _: NumberFormatException => -1
} // Success(-1)
// ═══════════════════════════════════════════════════════════════
// Either[L, R] : erreurs métier typées (pas d'exceptions)
// Left = erreur, Right = succès
// ═══════════════════════════════════════════════════════════════
// Définir les types d'erreurs
sealed trait ETLError
case class SourceNotFound(path: String) extends ETLError
case class SchemaError(expected: String, actual: String) extends ETLError
case class WriteError(message: String) extends ETLError
case class ConfigError(param: String) extends ETLError
// Fonctions qui retournent Either
def readSource(path: String): Either[ETLError, DataFrame] = {
if (!new java.io.File(path).exists())
Left(SourceNotFound(path))
else
Right(spark.read.parquet(path))
}
def validateSchema(df: DataFrame, expected: Seq[String]): Either[ETLError, DataFrame] = {
val actual = df.columns.toSeq
if (expected.forall(actual.contains))
Right(df)
else
Left(SchemaError(expected.mkString(","), actual.mkString(",")))
}
def writeOutput(df: DataFrame, path: String): Either[ETLError, Long] = {
try {
df.write.parquet(path)
Right(df.count())
} catch {
case e: Exception => Left(WriteError(e.getMessage))
}
}
// ═══════════════════════════════════════════════════════════════
// Chaînage avec for-comprehension (le pattern ultime !)
// ═══════════════════════════════════════════════════════════════
def runETL(inputPath: String, outputPath: String): Either[ETLError, Long] = {
for {
rawData <- readSource(inputPath)
validated <- validateSchema(rawData, Seq("id", "name", "amount"))
transformed = validated.filter(col("amount") > 0) // = pour les transformations pures
recordCount <- writeOutput(transformed, outputPath)
} yield recordCount
}
// Utilisation
runETL("data/input", "data/output") match {
case Right(count) => println(s"✅ ETL terminé : $count records")
case Left(SourceNotFound(p)) => println(s"❌ Source introuvable : $p")
case Left(SchemaError(e, a)) => println(s"❌ Schéma invalide. Attendu: $e, Reçu: $a")
case Left(WriteError(msg)) => println(s"❌ Erreur d'écriture : $msg")
case Left(ConfigError(p)) => println(s"❌ Config manquante : $p")
}
// ═══════════════════════════════════════════════════════════════
// Quand utiliser quoi ?
// ═══════════════════════════════════════════════════════════════| Outil | Quand l’utiliser | Exemple |
|---|---|---|
| Option[T] | Valeur absente (pas une erreur) | findUser(id) |
| Try[T] | Exceptions Java/legacy, I/O | Try { file.read() } |
| Either[E, T] | Erreurs métier typées | Pipeline ETL, validation |
2. Environnements de Développement
2.1 Scala dans Jupyter Notebook avec Almond
Almond est un kernel Scala pour Jupyter, idéal pour l’exploration et la formation.
Installation
# 1. Installer Coursier (gestionnaire de packages Scala)
curl -fL https://github.com/coursier/coursier/releases/latest/download/cs-x86_64-pc-linux.gz | gzip -d > cs
chmod +x cs
./cs setup # Ajoute au PATH
# 2. Installer Almond
cs launch --fork almond -- --install
# 3. Vérifier
jupyter kernelspec list
# Devrait afficher : scala /home/user/.local/share/jupyter/kernels/scala
# 4. Lancer Jupyter
jupyter notebook
# → Nouveau notebook → Kernel "Scala"Configuration Spark dans Almond
// Dans une cellule du notebook Scala
// Importer les dépendances avec Ammonite
import $ivy.`org.apache.spark::spark-sql:3.5.0`
import $ivy.`io.delta::delta-spark:3.1.0`
// Créer la SparkSession
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Almond Spark")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import spark.implicits._
// Test
val df = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
df.show()Alternatives
| Kernel | Avantages | Inconvénients |
|---|---|---|
| Almond | Moderne, Ammonite, bien maintenu | Setup manuel |
| spylon-kernel | Simple, pip install | Moins de features |
| Apache Toree | Officiel Apache Spark | Plus lourd, moins actif |
2.2 IntelliJ IDEA
Pour les projets de production, IntelliJ IDEA est l’IDE de référence pour Scala.
Installation
# 1. Télécharger IntelliJ IDEA Community Edition (gratuit)
# https://www.jetbrains.com/idea/download/
# 2. Linux : extraire et lancer
tar -xzf ideaIC-*.tar.gz
cd idea-IC-*/bin
./idea.sh
# 3. Installer le plugin Scala
# File → Settings → Plugins → Marketplace → Rechercher "Scala" → Install
# 4. Configurer le JDK
# File → Project Structure → SDKs → + → Download JDK → Version 11 ou 172.3 Quand utiliser quoi ?
| Environnement | Use case |
|---|---|
| Notebook (Almond) | Exploration, prototypage, formation, démos |
| IntelliJ + sbt | Développement, tests, refactoring, projets production |
| spark-submit | Exécution sur cluster (YARN, K8s) |
3. Projet Complet avec IntelliJ (Step-by-Step)
On va créer un projet Spark/Scala complet de A à Z.
3.1 Créer le projet
- File → New → Project
- Sélectionner sbt (à gauche)
- Configurer :
- Name :
spark-etl-project - Location :
/home/user/projects/spark-etl-project - JDK : 11 ou 17
- sbt version : 1.9.x
- Scala version : 2.12.18 (compatible Spark 3.5)
- Name :
- Create
3.2 Structure du projet
spark-etl-project/
├── build.sbt # Dépendances et config
├── project/
│ ├── build.properties # Version sbt
│ └── plugins.sbt # Plugins (sbt-assembly)
├── src/
│ ├── main/
│ │ ├── scala/
│ │ │ └── com/
│ │ │ └── example/
│ │ │ ├── Main.scala
│ │ │ ├── config/
│ │ │ │ └── AppConfig.scala
│ │ │ ├── jobs/
│ │ │ │ └── SalesETL.scala
│ │ │ ├── models/
│ │ │ │ └── Models.scala
│ │ │ └── utils/
│ │ │ └── SparkSessionWrapper.scala
│ │ └── resources/
│ │ ├── application.conf
│ │ └── log4j2.properties
│ └── test/
│ └── scala/
│ └── com/
│ └── example/
│ └── jobs/
│ └── SalesETLSpec.scala
├── data/
│ └── input/
│ └── sales.csv
└── output/ # Généré
3.3 Fichiers de configuration
build.sbt :
name := "spark-etl-project"
version := "1.0.0"
scalaVersion := "2.12.18"
// Spark 3.5 (provided = déjà sur le cluster)
val sparkVersion = "3.5.0"
libraryDependencies ++= Seq(
// Spark (provided pour le cluster, compile pour local)
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
// Delta Lake
"io.delta" %% "delta-spark" % "3.1.0",
// Config
"com.typesafe" % "config" % "1.4.3",
// Tests
"org.scalatest" %% "scalatest" % "3.2.17" % Test
)
// Pour exécuter en local dans IntelliJ (override provided)
Compile / run := Defaults.runTask(
Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated
// Assembly config (fat JAR)
assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case x => MergeStrategy.first
}
assembly / assemblyJarName := s"${name.value}-${version.value}.jar"project/build.properties :
sbt.version=1.9.8
project/plugins.sbt :
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")3.4 Code Scala
src/main/scala/com/example/utils/SparkSessionWrapper.scala :
package com.example.utils
import org.apache.spark.sql.SparkSession
trait SparkSessionWrapper {
lazy val spark: SparkSession = SparkSession.builder()
.appName("SparkETL")
.master(sys.env.getOrElse("SPARK_MASTER", "local[*]"))
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate()
// Réduire les logs Spark
spark.sparkContext.setLogLevel("WARN")
}src/main/scala/com/example/models/Models.scala :
package com.example.models
// ═══════════════════════════════════════════════════════════════
// Modèles de données (case classes)
// ═══════════════════════════════════════════════════════════════
case class Sale(
transactionId: String,
productCategory: String,
amount: Double,
quantity: Int,
date: String,
customerId: String
)
case class SalesSummary(
productCategory: String,
totalSales: Double,
totalQuantity: Long,
avgSale: Double,
transactionCount: Long
)
// ═══════════════════════════════════════════════════════════════
// ADT pour les résultats de job
// ═══════════════════════════════════════════════════════════════
sealed trait JobResult
case class JobSuccess(
recordsRead: Long,
recordsWritten: Long,
durationMs: Long
) extends JobResult
case class JobFailure(
stage: String,
error: String
) extends JobResult
// ═══════════════════════════════════════════════════════════════
// ADT pour les erreurs ETL
// ═══════════════════════════════════════════════════════════════
sealed trait ETLError
case class SourceNotFound(path: String) extends ETLError
case class InvalidSchema(message: String) extends ETLError
case class TransformError(message: String) extends ETLError
case class WriteError(message: String) extends ETLErrorsrc/main/scala/com/example/jobs/SalesETL.scala :
package com.example.jobs
import com.example.models._
import com.example.utils.SparkSessionWrapper
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
object SalesETL extends SparkSessionWrapper {
import spark.implicits._
// ═══════════════════════════════════════════════════════════════
// EXTRACT
// ═══════════════════════════════════════════════════════════════
def extract(inputPath: String): Either[ETLError, Dataset[Sale]] = {
try {
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inputPath)
// Renommer les colonnes pour matcher la case class
val cleaned = df
.withColumnRenamed("transaction_id", "transactionId")
.withColumnRenamed("product_category", "productCategory")
.withColumnRenamed("customer_id", "customerId")
Right(cleaned.as[Sale])
} catch {
case e: org.apache.spark.sql.AnalysisException =>
Left(SourceNotFound(inputPath))
case e: Exception =>
Left(InvalidSchema(e.getMessage))
}
}
// ═══════════════════════════════════════════════════════════════
// TRANSFORM
// ═══════════════════════════════════════════════════════════════
def transform(sales: Dataset[Sale]): Either[ETLError, Dataset[SalesSummary]] = {
try {
val summary = sales
.filter(_.amount > 0)
.groupByKey(_.productCategory)
.agg(
sum($"amount").as[Double],
sum($"quantity").as[Long],
avg($"amount").as[Double],
count("*").as[Long]
)
.map { case (category, total, qty, avgSale, count) =>
SalesSummary(category, total, qty, avgSale, count)
}
Right(summary)
} catch {
case e: Exception =>
Left(TransformError(e.getMessage))
}
}
// ═══════════════════════════════════════════════════════════════
// LOAD
// ═══════════════════════════════════════════════════════════════
def load(data: Dataset[SalesSummary], outputPath: String): Either[ETLError, Long] = {
try {
data.write
.format("delta")
.mode("overwrite")
.save(outputPath)
Right(data.count())
} catch {
case e: Exception =>
Left(WriteError(e.getMessage))
}
}
// ═══════════════════════════════════════════════════════════════
// RUN (orchestration avec for-comprehension)
// ═══════════════════════════════════════════════════════════════
def run(inputPath: String, outputPath: String): Either[ETLError, JobSuccess] = {
val startTime = System.currentTimeMillis()
for {
sales <- extract(inputPath)
_ = println(s"📥 Extracted ${sales.count()} records")
summary <- transform(sales)
_ = println(s"🔄 Transformed to ${summary.count()} categories")
count <- load(summary, outputPath)
_ = println(s"📤 Loaded $count records to $outputPath")
} yield JobSuccess(
recordsRead = sales.count(),
recordsWritten = count,
durationMs = System.currentTimeMillis() - startTime
)
}
}src/main/scala/com/example/Main.scala :
package com.example
import com.example.jobs.SalesETL
import com.example.models._
object Main {
def main(args: Array[String]): Unit = {
val inputPath = args.lift(0).getOrElse("data/input/sales.csv")
val outputPath = args.lift(1).getOrElse("output/sales_summary")
println(s"🚀 Starting ETL Job")
println(s" Input: $inputPath")
println(s" Output: $outputPath")
println()
SalesETL.run(inputPath, outputPath) match {
case Right(JobSuccess(read, written, duration)) =>
println()
println(s"✅ Job completed successfully!")
println(s" Records read: $read")
println(s" Records written: $written")
println(s" Duration: ${duration}ms")
case Left(SourceNotFound(path)) =>
System.err.println(s"❌ Source not found: $path")
System.exit(1)
case Left(InvalidSchema(msg)) =>
System.err.println(s"❌ Invalid schema: $msg")
System.exit(1)
case Left(TransformError(msg)) =>
System.err.println(s"❌ Transform error: $msg")
System.exit(1)
case Left(WriteError(msg)) =>
System.err.println(s"❌ Write error: $msg")
System.exit(1)
}
// Arrêter Spark proprement
SalesETL.spark.stop()
}
}3.5 Données de test
data/input/sales.csv :
transaction_id,product_category,amount,quantity,date,customer_id
TXN001,Electronics,1299.99,1,2024-01-15,CUST001
TXN002,Clothing,89.99,3,2024-01-15,CUST002
TXN003,Electronics,599.99,1,2024-01-16,CUST001
TXN004,Food,45.50,10,2024-01-16,CUST003
TXN005,Clothing,129.99,2,2024-01-17,CUST002
TXN006,Electronics,199.99,2,2024-01-17,CUST004
TXN007,Food,78.25,5,2024-01-18,CUST001
TXN008,Clothing,299.99,1,2024-01-18,CUST005
TXN009,Electronics,899.99,1,2024-01-19,CUST003
TXN010,Food,156.00,8,2024-01-19,CUST002
3.6 Exécuter dans IntelliJ
- Clic droit sur
Main.scala→ Run ‘Main’ - Ou créer une Run Configuration :
- Run → Edit Configurations → + → Application
- Main class :
com.example.Main - Program arguments :
data/input/sales.csv output/sales_summary - VM options :
--add-opens java.base/sun.nio.ch=ALL-UNNAMED(Java 17)
3.7 Builder le JAR
# Dans le terminal IntelliJ (View → Tool Windows → Terminal)
# Compiler
sbt compile
# Créer le fat JAR (inclut toutes les dépendances sauf Spark)
sbt assembly
# Le JAR est créé dans :
# target/scala-2.12/spark-etl-project-1.0.0.jar3.8 Exécuter avec spark-submit (local)
spark-submit \
--master local[*] \
--driver-memory 2g \
--class com.example.Main \
--packages io.delta:delta-spark_2.12:3.1.0 \
target/scala-2.12/spark-etl-project-1.0.0.jar \
data/input/sales.csv \
output/sales_summary3.9 Exécuter sur cluster (YARN/K8s)
# YARN
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--class com.example.Main \
--packages io.delta:delta-spark_2.12:3.1.0 \
spark-etl-project-1.0.0.jar \
hdfs:///data/input/sales.csv \
hdfs:///data/output/sales_summary
# Kubernetes (voir M27)
spark-submit \
--master k8s://https://k8s-api:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=my-spark:3.5.0 \
--class com.example.Main \
local:///opt/spark/jars/spark-etl-project-1.0.0.jar4. Connecteurs : JARs et Configuration
4.1 JARs requis par source de données
| Source | JAR | Maven coordinates |
|---|---|---|
| PostgreSQL | postgresql-42.7.0.jar | org.postgresql:postgresql:42.7.0 |
| MySQL | mysql-connector-j-8.0.33.jar | com.mysql:mysql-connector-j:8.0.33 |
| SQL Server | mssql-jdbc-12.4.0.jar | com.microsoft.sqlserver:mssql-jdbc:12.4.0 |
| Oracle | ojdbc11-23.3.0.0.jar | com.oracle.database.jdbc:ojdbc11:23.3.0.0 |
| Kafka | spark-sql-kafka-0-10_2.12-3.5.0.jar | org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 |
| Delta Lake | delta-spark_2.12-3.1.0.jar | io.delta:delta-spark_2.12:3.1.0 |
| Iceberg | iceberg-spark-runtime-3.5_2.12-1.4.0.jar | org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.0 |
| AWS S3 | hadoop-aws-3.3.4.jar + aws-java-sdk-bundle | org.apache.hadoop:hadoop-aws:3.3.4 |
| GCS | gcs-connector-hadoop3-2.2.17.jar | — (téléchargement manuel) |
4.2 Ajouter dans build.sbt
libraryDependencies ++= Seq(
// Spark (provided = déjà sur le cluster)
"org.apache.spark" %% "spark-sql" % "3.5.0" % "provided",
// Connecteurs (compile = inclus dans le JAR)
"org.postgresql" % "postgresql" % "42.7.0",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
"io.delta" %% "delta-spark" % "3.1.0",
)4.3 spark-submit : –jars vs –packages
| Option | Usage |
|---|---|
--jars |
Chemins locaux ou HDFS vers des JARs |
--packages |
Coordonnées Maven (téléchargement automatique) |
--repositories |
Repos Maven custom |
--driver-class-path |
JARs pour le driver uniquement |
# Avec JARs locaux
spark-submit \
--jars /path/to/postgresql-42.7.0.jar,/path/to/delta-spark_2.12-3.1.0.jar \
my-app.jar
# Avec téléchargement Maven
spark-submit \
--packages org.postgresql:postgresql:42.7.0,io.delta:delta-spark_2.12:3.1.0 \
my-app.jar
# Repo custom
spark-submit \
--repositories https://my-company.jfrog.io/artifactory/libs-release \
--packages com.mycompany:my-lib:1.0.0 \
my-app.jar4.4 Exemples de connexion
ℹ️ Note : Ces exemples supposent que vous avez accès aux services correspondants.
PostgreSQL :
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/mydb")
.option("dbtable", "customers")
.option("user", "postgres")
.option("password", "secret")
.option("driver", "org.postgresql.Driver")
.load()Kafka :
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()Delta Lake :
// Lecture
val df = spark.read.format("delta").load("/path/to/delta")
// Écriture
df.write.format("delta").mode("overwrite").save("/path/to/delta")
// Time travel
val dfV2 = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta")5. Catalyst Optimizer
Catalyst est le moteur d’optimisation de Spark SQL. Il transforme ton code en un plan d’exécution optimal.
5.1 Phases d’optimisation
┌─────────────────────────────────────────────────────────────────────────────┐
│ CATALYST OPTIMIZER │
│ │
│ Code SQL/DataFrame │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ ANALYSIS │ Résoudre les noms de colonnes, tables │
│ │ │ Vérifier les types │
│ └────────┬────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ LOGICAL OPTIM │ Predicate pushdown, Column pruning │
│ │ │ Constant folding, Filter reordering │
│ └────────┬────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ PHYSICAL PLAN │ Choisir les algorithmes (Sort-Merge vs Broadcast) │
│ │ │ Cost-Based Optimization │
│ └────────┬────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ CODE GENERATION │ Générer du bytecode Java optimisé │
│ │ (Tungsten) │ Whole-Stage Code Generation │
│ └────────┬────────┘ │
│ ▼ │
│ EXECUTION │
└─────────────────────────────────────────────────────────────────────────────┘
5.2 Lire un plan d’exécution
val df = spark.read.parquet("data/sales")
.filter($"amount" > 100)
.groupBy("category")
.agg(sum("amount"))
// Plan simple
df.explain()
// Plan détaillé (Parsed → Analyzed → Optimized → Physical)
df.explain(true)
// Plan formaté (Spark 3.0+)
df.explain("formatted")
// Plan avec coûts (si CBO activé)
df.explain("cost")5.3 Exemple de plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[category#12], functions=[sum(amount#14)])
+- Exchange hashpartitioning(category#12, 200) ← SHUFFLE
+- HashAggregate(keys=[category#12], functions=[partial_sum(amount#14)])
+- Project [category#12, amount#14] ← Colonnes sélectionnées
+- Filter (amount#14 > 100) ← Filtre pushdown
+- FileScan parquet [category#12,amount#14] ← Lecture
Ce qu’il faut regarder : - Exchange = Shuffle (coûteux !) - BroadcastHashJoin vs SortMergeJoin (broadcast = plus rapide si petite table) - FileScan : colonnes pruned, partitions pruned - Filter : poussé au plus près de la source
5.4 Cost-Based Optimization (CBO)
// Activer CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
// Collecter les statistiques (important !)
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS amount, category")6. Adaptive Query Execution (AQE)
AQE (Spark 3.0+) optimise le plan d’exécution pendant l’exécution, en se basant sur les statistiques réelles.
6.1 Fonctionnalités
| Feature | Description |
|---|---|
| Coalesce Partitions | Fusionner les petites partitions après shuffle |
| Broadcast Join Conversion | Convertir Sort-Merge → Broadcast si table petite |
| Skew Join Optimization | Splitter les partitions déséquilibrées |
6.2 Configuration
// Activer AQE (activé par défaut depuis Spark 3.2)
spark.conf.set("spark.sql.adaptive.enabled", "true")
// Coalesce automatique
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")
// Broadcast dynamique
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "10MB")
// Skew join
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")6.3 Voir AQE en action
// Le plan montre "AdaptiveSparkPlan"
df.explain()
// == Physical Plan ==
// AdaptiveSparkPlan isFinalPlan=true ← AQE activé !
// +- ...
// Après exécution, voir le plan final
df.collect() // Exécuter d'abord
df.explain() // Maintenant isFinalPlan=true7. Tungsten Engine
Tungsten est le moteur d’exécution bas-niveau de Spark, optimisé pour le CPU et la mémoire.
7.1 Optimisations Tungsten
| Optimisation | Description |
|---|---|
| Off-heap Memory | Stockage hors JVM heap (évite GC) |
| Binary Format | Données en format binaire compact |
| Cache-aware | Algorithmes optimisés pour le cache CPU |
| Whole-Stage CodeGen | Compile le plan en bytecode Java |
7.2 Whole-Stage Code Generation
Au lieu d’appeler des fonctions virtuelles pour chaque row, Tungsten génère du code spécialisé.
SANS CODEGEN AVEC CODEGEN
for (row in data) { // Code généré automatiquement
filter.eval(row) ← virtual call for (row in data) {
project.eval(row) ← virtual call if (row.amount > 100) {
agg.update(row) ← virtual call sum += row.amount
} }
}
7.3 Voir le code généré
// Activer le debug
spark.conf.set("spark.sql.codegen.wholeStage", "true")
// Voir le code généré
df.queryExecution.debug.codegen()8. Memory Management & Tuning
8.1 Architecture Mémoire Spark
┌─────────────────────────────────────────────────────────────────────────────┐
│ EXECUTOR MEMORY (spark.executor.memory) │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SPARK MEMORY (spark.memory.fraction = 0.6) │ │
│ │ │ │
│ │ ┌─────────────────────┐ ┌─────────────────────────────────┐ │ │
│ │ │ STORAGE MEMORY │ │ EXECUTION MEMORY │ │ │
│ │ │ │ │ │ │ │
│ │ │ Cache, Broadcast │ ⟷ │ Shuffle, Join, Sort, Agg │ │ │
│ │ │ │ │ │ │ │
│ │ │ (storageFraction │ │ (1 - storageFraction │ │ │
│ │ │ = 0.5) │ │ = 0.5) │ │ │
│ │ └─────────────────────┘ └─────────────────────────────────┘ │ │
│ │ │ │
│ │ ← ──────────── Unified Memory (frontière flexible) ──────────── → │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ USER MEMORY (0.4) │ │
│ │ Structures internes, UDFs, métadonnées │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ RESERVED MEMORY (300MB fixe) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
8.2 Configurations Mémoire
// Mémoire totale executor
spark.conf.set("spark.executor.memory", "8g")
// Fraction pour Spark (vs User)
spark.conf.set("spark.memory.fraction", "0.6") // 60% pour Spark
// Fraction Storage dans Spark Memory
spark.conf.set("spark.memory.storageFraction", "0.5") // 50% Storage, 50% Execution
// Off-heap (Tungsten)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")8.3 Pourquoi 80% des Jobs Spark sont Lents 🔥
┌─────────────────────────────────────────────────────────────────────────────┐
│ LES 5 CAUSES DE 80% DES JOBS SPARK LENTS │
│ │
│ 1️⃣ DATA SKEW (40%) │
│ └─ 1 partition avec 10M rows, les autres avec 1K │
│ └─ Symptôme : 199 tasks à 2s, 1 task à 45min │
│ └─ Fix : salting, repartition par clé, AQE skew join │
│ │
│ 2️⃣ SHUFFLE EXCESSIF (25%) │
│ └─ Trop de groupBy/join, pas de broadcast │
│ └─ Symptôme : "Shuffle Write" énorme dans Spark UI │
│ └─ Fix : broadcast join, réduire colonnes avant shuffle │
│ │
│ 3️⃣ MAUVAIS PARTITIONING (15%) │
│ └─ 200 partitions par défaut, fichiers trop petits/gros │
│ └─ Symptôme : 10000 tasks de 100KB ou 2 tasks de 50GB │
│ └─ Fix : repartition/coalesce, AQE coalesce, tuning shuffle.partitions│
│ │
│ 4️⃣ SPILL TO DISK (10%) │
│ └─ Mémoire insuffisante, données écrites sur disque │
│ └─ Symptôme : "Spill (Memory)" dans Stage details │
│ └─ Fix : augmenter executor memory, réduire taille partitions │
│ │
│ 5️⃣ SMALL FILES PROBLEM (10%) │
│ └─ Lire 10000 fichiers de 1MB au lieu de 100 de 100MB │
│ └─ Symptôme : temps de listing S3 très long, driver OOM │
│ └─ Fix : compaction Delta, maxPartitionBytes, bin-packing │
└─────────────────────────────────────────────────────────────────────────────┘
8.4 Diagnostic Toolkit
// 1. Vérifier le plan d'exécution
df.explain(true) // Plan complet
df.explain("cost") // Avec coûts estimés
// 2. Voir le nombre de partitions
println(s"Partitions: ${df.rdd.getNumPartitions}")
// 3. Détecter le skew (distribution par partition)
import org.apache.spark.sql.functions._
df.withColumn("partition_id", spark_partition_id())
.groupBy("partition_id")
.count()
.orderBy(desc("count"))
.show()
// 4. Voir la taille des partitions
df.rdd.mapPartitionsWithIndex { case (idx, iter) =>
Iterator((idx, iter.size))
}.toDF("partition", "rows").show()
// 5. Métriques pendant l'exécution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")8.5 Spark UI : Ce qu’il faut regarder
| Onglet | Ce qu’on cherche |
|---|---|
| Jobs | Durée totale, jobs qui traînent |
| Stages | Shuffle Read/Write, Spill, tâches en retard |
| Tasks | Distribution (min/median/max), stragglers |
| Storage | Cache utilisé, mémoire disponible |
| SQL | Plan physique, métriques par opérateur |
| Executors | Mémoire, GC time, shuffle read/write |
8.6 Debugging OOM
java.lang.OutOfMemoryError: Java heap space
CAUSES PROBABLES :
├── collect() sur un gros DataFrame
├── broadcast() d'une table trop grande
├── Trop de partitions small → overhead
├── Skew → 1 executor avec trop de données
└── Driver OOM → trop de metadata / résultats
SOLUTIONS :
├── Augmenter spark.executor.memory
├── Augmenter spark.driver.memory (si driver OOM)
├── Repartitionner les données
├── Éviter collect(), utiliser take() ou write()
└── Réduire spark.sql.autoBroadcastJoinThreshold
9. Exercices Pratiques
Exercice 1 : Pratique Scala — ADT Pipeline
Modéliser un pipeline de traitement avec des ADT.
// TODO: Créer un sealed trait pour les étapes du pipeline
// Étapes : Extract, Validate, Transform, Load
// Chaque étape peut Success ou Fail
// TODO: Implémenter une fonction qui exécute le pipeline
// et retourne un rapport détaillé de chaque étape💡 Solution
sealed trait PipelineStep
case object Extract extends PipelineStep
case object Validate extends PipelineStep
case object Transform extends PipelineStep
case object Load extends PipelineStep
sealed trait StepResult
case class StepSuccess(step: PipelineStep, durationMs: Long, records: Long) extends StepResult
case class StepFailure(step: PipelineStep, error: String) extends StepResult
case class PipelineReport(results: List[StepResult]) {
def isSuccess: Boolean = results.forall(_.isInstanceOf[StepSuccess])
def failedStep: Option[StepFailure] = results.collectFirst { case f: StepFailure => f }
}Exercice 2 : Either pour ETL
Implémenter un mini-ETL avec gestion d’erreurs typée.
// TODO: Créer une fonction qui :
// 1. Lit un fichier CSV (peut échouer : FileNotFound)
// 2. Valide que la colonne "amount" existe (peut échouer : SchemaError)
// 3. Filtre les montants > 0 (peut échouer : DataError si 0 records)
// 4. Écrit en Parquet
//
// Utiliser Either[ETLError, DataFrame] à chaque étape
// Chaîner avec for-comprehensionExercice 3 : Setup Almond + Spark
- Installer Almond (suivre les instructions section 2.1)
- Créer un notebook Scala
- Importer Spark et Delta Lake
- Créer un DataFrame, le transformer, l’écrire en Delta
Exercice 4 : Projet IntelliJ Complet
- Créer le projet selon la structure section 3
- Ajouter les fichiers de config
- Implémenter le code
- Créer le fichier CSV de test
- Exécuter dans IntelliJ
- Builder avec
sbt assembly - Lancer avec
spark-submit
Exercice 5 : Diagnostiquer un Job Lent
// Ce code est intentionnellement lent. Pourquoi ?
val df1 = spark.read.parquet("big_table") // 100M rows
val df2 = spark.read.parquet("small_table") // 1000 rows
val result = df1
.join(df2, "key") // Quel type de join ?
.groupBy("category")
.agg(sum("amount"))
.collect() // Problème ?
// TODO:
// 1. Identifier les problèmes
// 2. Proposer des optimisations
// 3. Réécrire le code optimisé💡 Solution
Problèmes : 1. Join sans broadcast → Sort-Merge Join (shuffle de 100M rows) 2. collect() sur un résultat potentiellement gros
Solution :
import org.apache.spark.sql.functions.broadcast
val result = df1
.join(broadcast(df2), "key") // Broadcast join !
.groupBy("category")
.agg(sum("amount"))
.write.parquet("output") // Écrire au lieu de collect10. Mini-Projet : ETL Scala Bronze → Silver → Gold
Objectif
Construire un pipeline ETL complet en Scala avec architecture Medallion.
┌─────────────────────────────────────────────────────────────────────────────┐
│ MEDALLION ARCHITECTURE │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ BRONZE │ ───▶ │ SILVER │ ───▶ │ GOLD │ │
│ │ (Raw) │ │ (Cleaned) │ │ (Aggregated)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ CSV/JSON Delta Lake Delta Lake │
│ + metadata + validation + business KPIs │
│ + ingestion_ts + dedup + reporting ready │
└─────────────────────────────────────────────────────────────────────────────┘
Livrables
- Projet IntelliJ complet avec structure
- ADT pour les erreurs et résultats
- 3 jobs : BronzeJob, SilverJob, GoldJob
- Main qui orchestre avec Either
- Tests avec ScalaTest
- Fat JAR et script spark-submit
Données
data/raw/transactions_*.json (plusieurs fichiers) :
{"transaction_id": "TXN001", "customer_id": "C001", "amount": 150.0, "category": "Electronics", "timestamp": "2024-01-15T10:30:00Z"}Jobs à implémenter
BronzeJob : - Lire les JSON - Ajouter _ingestion_timestamp, _source_file - Écrire en Delta (append)
SilverJob : - Lire Bronze - Valider schéma - Dédupliquer par transaction_id - Filtrer amount > 0 - Écrire en Delta avec MERGE
GoldJob : - Lire Silver - Agrégations : ventes par catégorie, par jour - Top customers - Écrire plusieurs tables Gold
📚 Ressources
Documentation
Livres
- Programming in Scala — Martin Odersky
- Functional Programming in Scala — Paul Chiusano, Rúnar Bjarnason
- Spark: The Definitive Guide — Bill Chambers, Matei Zaharia
Outils
- IntelliJ IDEA — IDE
- Almond — Kernel Scala pour Jupyter
- Metals — LSP pour VS Code
➡️ Prochaine étape
👉 Module suivant : 31_ml_engineering — ML Engineering
🎉 Félicitations ! Tu maîtrises maintenant Scala pour Spark et les optimisations avancées.