🚀 Spark & Scala Deep Dive

Bienvenue dans ce module avancé où tu vas maîtriser Scala pour Spark et comprendre les internals de Spark pour écrire des jobs performants. Tu apprendras à développer, builder et déployer des applications Spark professionnelles.


Prérequis

Niveau Compétence
✅ Requis PySpark DataFrame API (M19)
✅ Requis Spark sur Kubernetes (M21)
✅ Requis Notions de programmation fonctionnelle
💡 Recommandé Expérience avec un IDE (VS Code, PyCharm)

🎯 Objectifs du module

À la fin de ce module, tu seras capable de :

  • Écrire du code Scala idiomatique pour Spark
  • Utiliser les ADT (Algebraic Data Types) et Either/Try pour des pipelines robustes
  • Configurer un environnement de développement complet (Notebook + IntelliJ)
  • Builder et déployer des applications Spark avec sbt et spark-submit
  • Comprendre Catalyst, AQE et Tungsten pour optimiser tes jobs
  • Diagnostiquer et résoudre les problèmes de performance

1. Scala pour Data Engineers

1.1 Pourquoi Scala pour Spark ?

Aspect Scala Python
Performance Natif JVM, pas de sérialisation Sérialisation Python ↔︎ JVM
Typage Statique, erreurs à la compilation Dynamique, erreurs au runtime
API Spark API native, toutes les features Wrapper, parfois en retard
Écosystème Kafka, Flink, Akka ML, Data Science
Courbe d’apprentissage Plus raide Plus accessible

Règle pratique : - Python : Exploration, prototypage, Data Science, petits pipelines - Scala : Production, gros volumes, performance critique, équipe backend Java/Scala

1.2 Syntaxe Essentielle

// ═══════════════════════════════════════════════════════════════
// Variables : val (immutable) vs var (mutable)
// ═══════════════════════════════════════════════════════════════

val name: String = "Spark"     // Immutable (préféré)
var counter: Int = 0           // Mutable (à éviter)
val inferred = 42              // Type inféré automatiquement

// ═══════════════════════════════════════════════════════════════
// Fonctions
// ═══════════════════════════════════════════════════════════════

// Fonction classique
def add(a: Int, b: Int): Int = {
  a + b
}

// Fonction one-liner (return implicite)
def multiply(a: Int, b: Int): Int = a * b

// Fonction anonyme (lambda)
val double = (x: Int) => x * 2

// Paramètres par défaut
def greet(name: String, greeting: String = "Hello"): String = 
  s"$greeting, $name!"

greet("Alice")              // "Hello, Alice!"
greet("Bob", "Bonjour")     // "Bonjour, Bob!"

// ═══════════════════════════════════════════════════════════════
// String interpolation
// ═══════════════════════════════════════════════════════════════

val version = 3.5
println(s"Spark version: $version")           // Simple
println(s"Next version: ${version + 0.1}")    // Expression
println(f"Pi = ${math.Pi}%.2f")               // Formaté

// ═══════════════════════════════════════════════════════════════
// Conditions et boucles
// ═══════════════════════════════════════════════════════════════

// if est une expression (retourne une valeur)
val status = if (counter > 0) "positive" else "zero or negative"

// for-comprehension
val squares = for (i <- 1 to 5) yield i * i  // Vector(1, 4, 9, 16, 25)

// for avec filtres
val evenSquares = for {
  i <- 1 to 10
  if i % 2 == 0
} yield i * i  // Vector(4, 16, 36, 64, 100)

1.3 Collections Fonctionnelles

Les collections Scala sont la base pour comprendre les transformations Spark (RDD, DataFrame).

// ═══════════════════════════════════════════════════════════════
// Types de collections
// ═══════════════════════════════════════════════════════════════

val list = List(1, 2, 3, 4, 5)           // Immutable, linked list
val vector = Vector(1, 2, 3, 4, 5)       // Immutable, indexed
val set = Set(1, 2, 3, 3, 3)             // Immutable, unique: Set(1, 2, 3)
val map = Map("a" -> 1, "b" -> 2)        // Immutable, key-value

// ═══════════════════════════════════════════════════════════════
// Transformations (comme Spark !)
// ═══════════════════════════════════════════════════════════════

val numbers = List(1, 2, 3, 4, 5)

// map : transformer chaque élément
numbers.map(x => x * 2)              // List(2, 4, 6, 8, 10)
numbers.map(_ * 2)                   // Syntaxe courte

// filter : garder les éléments qui matchent
numbers.filter(x => x > 2)           // List(3, 4, 5)
numbers.filter(_ > 2)                // Syntaxe courte

// flatMap : map + flatten
val words = List("hello world", "scala spark")
words.flatMap(_.split(" "))          // List("hello", "world", "scala", "spark")

// reduce : agréger en une valeur
numbers.reduce((a, b) => a + b)      // 15
numbers.reduce(_ + _)                // Syntaxe courte

// fold : reduce avec valeur initiale
numbers.fold(0)(_ + _)               // 15
numbers.fold(10)(_ + _)              // 25 (commence à 10)

// ═══════════════════════════════════════════════════════════════
// Chaînage (comme les pipelines Spark)
// ═══════════════════════════════════════════════════════════════

val result = numbers
  .filter(_ % 2 == 0)    // Garder les pairs
  .map(_ * 10)           // Multiplier par 10
  .sum                   // Sommer : 60 (20 + 40)

// ═══════════════════════════════════════════════════════════════
// groupBy (comme Spark groupBy !)
// ═══════════════════════════════════════════════════════════════

case class Sale(product: String, amount: Double)

val sales = List(
  Sale("laptop", 1000),
  Sale("phone", 500),
  Sale("laptop", 1200),
  Sale("phone", 600)
)

val byProduct = sales.groupBy(_.product)
// Map(
//   "laptop" -> List(Sale("laptop", 1000), Sale("laptop", 1200)),
//   "phone"  -> List(Sale("phone", 500), Sale("phone", 600))
// )

val totalByProduct = sales
  .groupBy(_.product)
  .map { case (product, sales) => 
    (product, sales.map(_.amount).sum) 
  }
// Map("laptop" -> 2200.0, "phone" -> 1100.0)

1.4 Option : Gérer les valeurs absentes

// Option = Some(valeur) ou None (jamais null !)

def findUser(id: Int): Option[String] = {
  val users = Map(1 -> "Alice", 2 -> "Bob")
  users.get(id)  // Retourne Option[String]
}

findUser(1)  // Some("Alice")
findUser(99) // None

// Pattern matching
findUser(1) match {
  case Some(name) => println(s"Found: $name")
  case None       => println("User not found")
}

// getOrElse : valeur par défaut
val user = findUser(99).getOrElse("Unknown")

// map sur Option (sûr, pas d'exception)
val upperName = findUser(1).map(_.toUpperCase)  // Some("ALICE")
val noName = findUser(99).map(_.toUpperCase)    // None (pas d'erreur !)

// flatMap pour chaîner
def findEmail(name: String): Option[String] = {
  if (name == "Alice") Some("alice@example.com") else None
}

val email = findUser(1).flatMap(findEmail)  // Some("alice@example.com")

1.5 Case Classes et Pattern Matching

// ═══════════════════════════════════════════════════════════════
// Case Class = classe de données immuable
// ═══════════════════════════════════════════════════════════════

case class Customer(
  id: Long,
  name: String,
  email: String,
  country: String = "France"  // Valeur par défaut
)

// Création (pas besoin de "new")
val alice = Customer(1, "Alice", "alice@example.com")
val bob = Customer(2, "Bob", "bob@example.com", "USA")

// Accesseurs automatiques
println(alice.name)    // "Alice"
println(alice.country) // "France"

// equals automatique (compare les valeurs)
val alice2 = Customer(1, "Alice", "alice@example.com")
println(alice == alice2)  // true !

// copy : créer une copie modifiée (immutabilité)
val aliceUSA = alice.copy(country = "USA")

// toString automatique
println(alice)  // Customer(1,Alice,alice@example.com,France)

// ═══════════════════════════════════════════════════════════════
// Pattern Matching (switch++ ultra puissant)
// ═══════════════════════════════════════════════════════════════

def describeCustomer(c: Customer): String = c match {
  case Customer(_, "Alice", _, _)     => "C'est Alice !"
  case Customer(id, _, _, "France")   => s"Client français #$id"
  case Customer(_, name, _, country)  => s"$name de $country"
}

// Pattern matching avec guards
def customerTier(c: Customer, totalSpent: Double): String = c match {
  case Customer(_, _, _, "France") if totalSpent > 10000 => "VIP France"
  case _ if totalSpent > 5000                             => "Gold"
  case _                                                  => "Standard"
}

// ═══════════════════════════════════════════════════════════════
// Case classes et Spark
// ═══════════════════════════════════════════════════════════════

// Spark utilise les case classes pour créer des Datasets typés !
import spark.implicits._

case class Sale(product: String, amount: Double, date: String)

val sales = Seq(
  Sale("laptop", 1000, "2024-01-15"),
  Sale("phone", 500, "2024-01-16")
).toDS()  // Dataset[Sale] - typé !

// Accès typé
sales.filter(_.amount > 600).show()

1.6 Sealed Traits et ADT (Algebraic Data Types) 🔥

Les ADT permettent de modéliser tous les états possibles d’un système. Le compilateur garantit l’exhaustivité !

// ═══════════════════════════════════════════════════════════════
// sealed trait = toutes les sous-classes dans le même fichier
// Le compilateur CONNAÎT tous les cas possibles
// ═══════════════════════════════════════════════════════════════

// Modéliser le résultat d'un job ETL
sealed trait JobResult
case class Success(recordsProcessed: Long, durationMs: Long) extends JobResult
case class Failure(error: String, stage: String) extends JobResult
case object Skipped extends JobResult  // object = singleton

def handleResult(result: JobResult): Unit = result match {
  case Success(n, d) => println(s"$n records traités en ${d}ms")
  case Failure(e, s) => println(s"❌ Erreur à l'étape $s: $e")
  case Skipped       => println(s"⏭️ Job ignoré")
}
// Si tu oublies un cas, le compilateur te PRÉVIENT !

// ═══════════════════════════════════════════════════════════════
// Exemple : Sources de données
// ═══════════════════════════════════════════════════════════════

sealed trait DataSource
case class JdbcSource(url: String, table: String, user: String) extends DataSource
case class S3Source(bucket: String, path: String, format: String) extends DataSource
case class KafkaSource(brokers: String, topic: String) extends DataSource
case class LocalSource(path: String) extends DataSource

def readData(source: DataSource)(implicit spark: SparkSession): DataFrame = source match {
  case JdbcSource(url, table, user) =>
    spark.read
      .format("jdbc")
      .option("url", url)
      .option("dbtable", table)
      .option("user", user)
      .load()
      
  case S3Source(bucket, path, format) =>
    spark.read
      .format(format)
      .load(s"s3a://$bucket/$path")
      
  case KafkaSource(brokers, topic) =>
    spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      .load()
      
  case LocalSource(path) =>
    spark.read.parquet(path)
}

// ═══════════════════════════════════════════════════════════════
// Exemple : Modes de traitement
// ═══════════════════════════════════════════════════════════════

sealed trait WriteMode
case object Overwrite extends WriteMode
case object Append extends WriteMode
case class MergeByKey(keys: Seq[String]) extends WriteMode

def writeData(df: DataFrame, path: String, mode: WriteMode): Unit = mode match {
  case Overwrite       => df.write.mode("overwrite").parquet(path)
  case Append          => df.write.mode("append").parquet(path)
  case MergeByKey(keys) => 
    // Logique Delta Lake MERGE
    println(s"Merge on keys: ${keys.mkString(", ")}")
}

1.7 Gestion d’Erreurs : Either et Try 🔥

Fini les try/catch partout ! Scala offre des types pour gérer les erreurs proprement.

// ═══════════════════════════════════════════════════════════════
// Try[T] : capturer les exceptions (code legacy, I/O)
// ═══════════════════════════════════════════════════════════════

import scala.util.{Try, Success, Failure}

def parseNumber(s: String): Try[Int] = Try {
  s.toInt  // Peut lancer NumberFormatException
}

parseNumber("42")   // Success(42)
parseNumber("abc")  // Failure(NumberFormatException)

// Pattern matching
parseNumber("42") match {
  case Success(n)  => println(s"Nombre: $n")
  case Failure(ex) => println(s"Erreur: ${ex.getMessage}")
}

// getOrElse
val num = parseNumber("abc").getOrElse(0)  // 0

// map / flatMap (chaînage sûr)
val doubled = parseNumber("21").map(_ * 2)  // Success(42)

// recover : transformer l'erreur
val safe = parseNumber("abc").recover {
  case _: NumberFormatException => -1
}  // Success(-1)

// ═══════════════════════════════════════════════════════════════
// Either[L, R] : erreurs métier typées (pas d'exceptions)
// Left = erreur, Right = succès
// ═══════════════════════════════════════════════════════════════

// Définir les types d'erreurs
sealed trait ETLError
case class SourceNotFound(path: String) extends ETLError
case class SchemaError(expected: String, actual: String) extends ETLError
case class WriteError(message: String) extends ETLError
case class ConfigError(param: String) extends ETLError

// Fonctions qui retournent Either
def readSource(path: String): Either[ETLError, DataFrame] = {
  if (!new java.io.File(path).exists())
    Left(SourceNotFound(path))
  else
    Right(spark.read.parquet(path))
}

def validateSchema(df: DataFrame, expected: Seq[String]): Either[ETLError, DataFrame] = {
  val actual = df.columns.toSeq
  if (expected.forall(actual.contains))
    Right(df)
  else
    Left(SchemaError(expected.mkString(","), actual.mkString(",")))
}

def writeOutput(df: DataFrame, path: String): Either[ETLError, Long] = {
  try {
    df.write.parquet(path)
    Right(df.count())
  } catch {
    case e: Exception => Left(WriteError(e.getMessage))
  }
}

// ═══════════════════════════════════════════════════════════════
// Chaînage avec for-comprehension (le pattern ultime !)
// ═══════════════════════════════════════════════════════════════

def runETL(inputPath: String, outputPath: String): Either[ETLError, Long] = {
  for {
    rawData      <- readSource(inputPath)
    validated    <- validateSchema(rawData, Seq("id", "name", "amount"))
    transformed  = validated.filter(col("amount") > 0)  // = pour les transformations pures
    recordCount  <- writeOutput(transformed, outputPath)
  } yield recordCount
}

// Utilisation
runETL("data/input", "data/output") match {
  case Right(count) => println(s"✅ ETL terminé : $count records")
  case Left(SourceNotFound(p)) => println(s"❌ Source introuvable : $p")
  case Left(SchemaError(e, a)) => println(s"❌ Schéma invalide. Attendu: $e, Reçu: $a")
  case Left(WriteError(msg))   => println(s"❌ Erreur d'écriture : $msg")
  case Left(ConfigError(p))    => println(s"❌ Config manquante : $p")
}

// ═══════════════════════════════════════════════════════════════
// Quand utiliser quoi ?
// ═══════════════════════════════════════════════════════════════
Outil Quand l’utiliser Exemple
Option[T] Valeur absente (pas une erreur) findUser(id)
Try[T] Exceptions Java/legacy, I/O Try { file.read() }
Either[E, T] Erreurs métier typées Pipeline ETL, validation

2. Environnements de Développement

2.1 Scala dans Jupyter Notebook avec Almond

Almond est un kernel Scala pour Jupyter, idéal pour l’exploration et la formation.

Installation

# 1. Installer Coursier (gestionnaire de packages Scala)
curl -fL https://github.com/coursier/coursier/releases/latest/download/cs-x86_64-pc-linux.gz | gzip -d > cs
chmod +x cs
./cs setup  # Ajoute au PATH

# 2. Installer Almond
cs launch --fork almond -- --install

# 3. Vérifier
jupyter kernelspec list
# Devrait afficher : scala  /home/user/.local/share/jupyter/kernels/scala

# 4. Lancer Jupyter
jupyter notebook
# → Nouveau notebook → Kernel "Scala"

Configuration Spark dans Almond

// Dans une cellule du notebook Scala

// Importer les dépendances avec Ammonite
import $ivy.`org.apache.spark::spark-sql:3.5.0`
import $ivy.`io.delta::delta-spark:3.1.0`

// Créer la SparkSession
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Almond Spark")
  .master("local[*]")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

import spark.implicits._

// Test
val df = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
df.show()

Alternatives

Kernel Avantages Inconvénients
Almond Moderne, Ammonite, bien maintenu Setup manuel
spylon-kernel Simple, pip install Moins de features
Apache Toree Officiel Apache Spark Plus lourd, moins actif

2.2 IntelliJ IDEA

Pour les projets de production, IntelliJ IDEA est l’IDE de référence pour Scala.

Installation

# 1. Télécharger IntelliJ IDEA Community Edition (gratuit)
# https://www.jetbrains.com/idea/download/

# 2. Linux : extraire et lancer
tar -xzf ideaIC-*.tar.gz
cd idea-IC-*/bin
./idea.sh

# 3. Installer le plugin Scala
# File → Settings → Plugins → Marketplace → Rechercher "Scala" → Install

# 4. Configurer le JDK
# File → Project Structure → SDKs → + → Download JDK → Version 11 ou 17

2.3 Quand utiliser quoi ?

Environnement Use case
Notebook (Almond) Exploration, prototypage, formation, démos
IntelliJ + sbt Développement, tests, refactoring, projets production
spark-submit Exécution sur cluster (YARN, K8s)

3. Projet Complet avec IntelliJ (Step-by-Step)

On va créer un projet Spark/Scala complet de A à Z.

3.1 Créer le projet

  1. File → New → Project
  2. Sélectionner sbt (à gauche)
  3. Configurer :
    • Name : spark-etl-project
    • Location : /home/user/projects/spark-etl-project
    • JDK : 11 ou 17
    • sbt version : 1.9.x
    • Scala version : 2.12.18 (compatible Spark 3.5)
  4. Create

3.2 Structure du projet

spark-etl-project/
├── build.sbt                      # Dépendances et config
├── project/
│   ├── build.properties           # Version sbt
│   └── plugins.sbt                # Plugins (sbt-assembly)
├── src/
│   ├── main/
│   │   ├── scala/
│   │   │   └── com/
│   │   │       └── example/
│   │   │           ├── Main.scala
│   │   │           ├── config/
│   │   │           │   └── AppConfig.scala
│   │   │           ├── jobs/
│   │   │           │   └── SalesETL.scala
│   │   │           ├── models/
│   │   │           │   └── Models.scala
│   │   │           └── utils/
│   │   │               └── SparkSessionWrapper.scala
│   │   └── resources/
│   │       ├── application.conf
│   │       └── log4j2.properties
│   └── test/
│       └── scala/
│           └── com/
│               └── example/
│                   └── jobs/
│                       └── SalesETLSpec.scala
├── data/
│   └── input/
│       └── sales.csv
└── output/                        # Généré

3.3 Fichiers de configuration

build.sbt :

name := "spark-etl-project"
version := "1.0.0"
scalaVersion := "2.12.18"

// Spark 3.5 (provided = déjà sur le cluster)
val sparkVersion = "3.5.0"

libraryDependencies ++= Seq(
  // Spark (provided pour le cluster, compile pour local)
  "org.apache.spark" %% "spark-sql"  % sparkVersion % "provided",
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  
  // Delta Lake
  "io.delta" %% "delta-spark" % "3.1.0",
  
  // Config
  "com.typesafe" % "config" % "1.4.3",
  
  // Tests
  "org.scalatest" %% "scalatest" % "3.2.17" % Test
)

// Pour exécuter en local dans IntelliJ (override provided)
Compile / run := Defaults.runTask(
  Compile / fullClasspath,
  Compile / run / mainClass,
  Compile / run / runner
).evaluated

// Assembly config (fat JAR)
assembly / assemblyMergeStrategy := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case "reference.conf"              => MergeStrategy.concat
  case x                              => MergeStrategy.first
}

assembly / assemblyJarName := s"${name.value}-${version.value}.jar"

project/build.properties :

sbt.version=1.9.8

project/plugins.sbt :

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")

3.4 Code Scala

src/main/scala/com/example/utils/SparkSessionWrapper.scala :

package com.example.utils

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {
  
  lazy val spark: SparkSession = SparkSession.builder()
    .appName("SparkETL")
    .master(sys.env.getOrElse("SPARK_MASTER", "local[*]"))
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .getOrCreate()
    
  // Réduire les logs Spark
  spark.sparkContext.setLogLevel("WARN")
}

src/main/scala/com/example/models/Models.scala :

package com.example.models

// ═══════════════════════════════════════════════════════════════
// Modèles de données (case classes)
// ═══════════════════════════════════════════════════════════════

case class Sale(
  transactionId: String,
  productCategory: String,
  amount: Double,
  quantity: Int,
  date: String,
  customerId: String
)

case class SalesSummary(
  productCategory: String,
  totalSales: Double,
  totalQuantity: Long,
  avgSale: Double,
  transactionCount: Long
)

// ═══════════════════════════════════════════════════════════════
// ADT pour les résultats de job
// ═══════════════════════════════════════════════════════════════

sealed trait JobResult
case class JobSuccess(
  recordsRead: Long,
  recordsWritten: Long,
  durationMs: Long
) extends JobResult

case class JobFailure(
  stage: String,
  error: String
) extends JobResult

// ═══════════════════════════════════════════════════════════════
// ADT pour les erreurs ETL
// ═══════════════════════════════════════════════════════════════

sealed trait ETLError
case class SourceNotFound(path: String) extends ETLError
case class InvalidSchema(message: String) extends ETLError
case class TransformError(message: String) extends ETLError
case class WriteError(message: String) extends ETLError

src/main/scala/com/example/jobs/SalesETL.scala :

package com.example.jobs

import com.example.models._
import com.example.utils.SparkSessionWrapper
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._

object SalesETL extends SparkSessionWrapper {
  
  import spark.implicits._
  
  // ═══════════════════════════════════════════════════════════════
  // EXTRACT
  // ═══════════════════════════════════════════════════════════════
  
  def extract(inputPath: String): Either[ETLError, Dataset[Sale]] = {
    try {
      val df = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(inputPath)
        
      // Renommer les colonnes pour matcher la case class
      val cleaned = df
        .withColumnRenamed("transaction_id", "transactionId")
        .withColumnRenamed("product_category", "productCategory")
        .withColumnRenamed("customer_id", "customerId")
        
      Right(cleaned.as[Sale])
    } catch {
      case e: org.apache.spark.sql.AnalysisException =>
        Left(SourceNotFound(inputPath))
      case e: Exception =>
        Left(InvalidSchema(e.getMessage))
    }
  }
  
  // ═══════════════════════════════════════════════════════════════
  // TRANSFORM
  // ═══════════════════════════════════════════════════════════════
  
  def transform(sales: Dataset[Sale]): Either[ETLError, Dataset[SalesSummary]] = {
    try {
      val summary = sales
        .filter(_.amount > 0)
        .groupByKey(_.productCategory)
        .agg(
          sum($"amount").as[Double],
          sum($"quantity").as[Long],
          avg($"amount").as[Double],
          count("*").as[Long]
        )
        .map { case (category, total, qty, avgSale, count) =>
          SalesSummary(category, total, qty, avgSale, count)
        }
        
      Right(summary)
    } catch {
      case e: Exception =>
        Left(TransformError(e.getMessage))
    }
  }
  
  // ═══════════════════════════════════════════════════════════════
  // LOAD
  // ═══════════════════════════════════════════════════════════════
  
  def load(data: Dataset[SalesSummary], outputPath: String): Either[ETLError, Long] = {
    try {
      data.write
        .format("delta")
        .mode("overwrite")
        .save(outputPath)
        
      Right(data.count())
    } catch {
      case e: Exception =>
        Left(WriteError(e.getMessage))
    }
  }
  
  // ═══════════════════════════════════════════════════════════════
  // RUN (orchestration avec for-comprehension)
  // ═══════════════════════════════════════════════════════════════
  
  def run(inputPath: String, outputPath: String): Either[ETLError, JobSuccess] = {
    val startTime = System.currentTimeMillis()
    
    for {
      sales   <- extract(inputPath)
      _       = println(s"📥 Extracted ${sales.count()} records")
      
      summary <- transform(sales)
      _       = println(s"🔄 Transformed to ${summary.count()} categories")
      
      count   <- load(summary, outputPath)
      _       = println(s"📤 Loaded $count records to $outputPath")
      
    } yield JobSuccess(
      recordsRead = sales.count(),
      recordsWritten = count,
      durationMs = System.currentTimeMillis() - startTime
    )
  }
}

src/main/scala/com/example/Main.scala :

package com.example

import com.example.jobs.SalesETL
import com.example.models._

object Main {
  
  def main(args: Array[String]): Unit = {
    
    val inputPath = args.lift(0).getOrElse("data/input/sales.csv")
    val outputPath = args.lift(1).getOrElse("output/sales_summary")
    
    println(s"🚀 Starting ETL Job")
    println(s"   Input:  $inputPath")
    println(s"   Output: $outputPath")
    println()
    
    SalesETL.run(inputPath, outputPath) match {
      case Right(JobSuccess(read, written, duration)) =>
        println()
        println(s"✅ Job completed successfully!")
        println(s"   Records read:    $read")
        println(s"   Records written: $written")
        println(s"   Duration:        ${duration}ms")
        
      case Left(SourceNotFound(path)) =>
        System.err.println(s"❌ Source not found: $path")
        System.exit(1)
        
      case Left(InvalidSchema(msg)) =>
        System.err.println(s"❌ Invalid schema: $msg")
        System.exit(1)
        
      case Left(TransformError(msg)) =>
        System.err.println(s"❌ Transform error: $msg")
        System.exit(1)
        
      case Left(WriteError(msg)) =>
        System.err.println(s"❌ Write error: $msg")
        System.exit(1)
    }
    
    // Arrêter Spark proprement
    SalesETL.spark.stop()
  }
}

3.5 Données de test

data/input/sales.csv :

transaction_id,product_category,amount,quantity,date,customer_id
TXN001,Electronics,1299.99,1,2024-01-15,CUST001
TXN002,Clothing,89.99,3,2024-01-15,CUST002
TXN003,Electronics,599.99,1,2024-01-16,CUST001
TXN004,Food,45.50,10,2024-01-16,CUST003
TXN005,Clothing,129.99,2,2024-01-17,CUST002
TXN006,Electronics,199.99,2,2024-01-17,CUST004
TXN007,Food,78.25,5,2024-01-18,CUST001
TXN008,Clothing,299.99,1,2024-01-18,CUST005
TXN009,Electronics,899.99,1,2024-01-19,CUST003
TXN010,Food,156.00,8,2024-01-19,CUST002

3.6 Exécuter dans IntelliJ

  1. Clic droit sur Main.scalaRun ‘Main’
  2. Ou créer une Run Configuration :
    • Run → Edit Configurations → + → Application
    • Main class : com.example.Main
    • Program arguments : data/input/sales.csv output/sales_summary
    • VM options : --add-opens java.base/sun.nio.ch=ALL-UNNAMED (Java 17)

3.7 Builder le JAR

# Dans le terminal IntelliJ (View → Tool Windows → Terminal)

# Compiler
sbt compile

# Créer le fat JAR (inclut toutes les dépendances sauf Spark)
sbt assembly

# Le JAR est créé dans :
# target/scala-2.12/spark-etl-project-1.0.0.jar

3.8 Exécuter avec spark-submit (local)

spark-submit \
  --master local[*] \
  --driver-memory 2g \
  --class com.example.Main \
  --packages io.delta:delta-spark_2.12:3.1.0 \
  target/scala-2.12/spark-etl-project-1.0.0.jar \
  data/input/sales.csv \
  output/sales_summary

3.9 Exécuter sur cluster (YARN/K8s)

# YARN
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --class com.example.Main \
  --packages io.delta:delta-spark_2.12:3.1.0 \
  spark-etl-project-1.0.0.jar \
  hdfs:///data/input/sales.csv \
  hdfs:///data/output/sales_summary

# Kubernetes (voir M27)
spark-submit \
  --master k8s://https://k8s-api:6443 \
  --deploy-mode cluster \
  --conf spark.kubernetes.container.image=my-spark:3.5.0 \
  --class com.example.Main \
  local:///opt/spark/jars/spark-etl-project-1.0.0.jar

4. Connecteurs : JARs et Configuration

4.1 JARs requis par source de données

Source JAR Maven coordinates
PostgreSQL postgresql-42.7.0.jar org.postgresql:postgresql:42.7.0
MySQL mysql-connector-j-8.0.33.jar com.mysql:mysql-connector-j:8.0.33
SQL Server mssql-jdbc-12.4.0.jar com.microsoft.sqlserver:mssql-jdbc:12.4.0
Oracle ojdbc11-23.3.0.0.jar com.oracle.database.jdbc:ojdbc11:23.3.0.0
Kafka spark-sql-kafka-0-10_2.12-3.5.0.jar org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
Delta Lake delta-spark_2.12-3.1.0.jar io.delta:delta-spark_2.12:3.1.0
Iceberg iceberg-spark-runtime-3.5_2.12-1.4.0.jar org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.0
AWS S3 hadoop-aws-3.3.4.jar + aws-java-sdk-bundle org.apache.hadoop:hadoop-aws:3.3.4
GCS gcs-connector-hadoop3-2.2.17.jar — (téléchargement manuel)

4.2 Ajouter dans build.sbt

libraryDependencies ++= Seq(
  // Spark (provided = déjà sur le cluster)
  "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided",
  
  // Connecteurs (compile = inclus dans le JAR)
  "org.postgresql" % "postgresql" % "42.7.0",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
  "io.delta" %% "delta-spark" % "3.1.0",
)

4.3 spark-submit : –jars vs –packages

Option Usage
--jars Chemins locaux ou HDFS vers des JARs
--packages Coordonnées Maven (téléchargement automatique)
--repositories Repos Maven custom
--driver-class-path JARs pour le driver uniquement
# Avec JARs locaux
spark-submit \
  --jars /path/to/postgresql-42.7.0.jar,/path/to/delta-spark_2.12-3.1.0.jar \
  my-app.jar

# Avec téléchargement Maven
spark-submit \
  --packages org.postgresql:postgresql:42.7.0,io.delta:delta-spark_2.12:3.1.0 \
  my-app.jar

# Repo custom
spark-submit \
  --repositories https://my-company.jfrog.io/artifactory/libs-release \
  --packages com.mycompany:my-lib:1.0.0 \
  my-app.jar

4.4 Exemples de connexion

ℹ️ Note : Ces exemples supposent que vous avez accès aux services correspondants.

PostgreSQL :

val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost:5432/mydb")
  .option("dbtable", "customers")
  .option("user", "postgres")
  .option("password", "secret")
  .option("driver", "org.postgresql.Driver")
  .load()

Kafka :

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "earliest")
  .load()

Delta Lake :

// Lecture
val df = spark.read.format("delta").load("/path/to/delta")

// Écriture
df.write.format("delta").mode("overwrite").save("/path/to/delta")

// Time travel
val dfV2 = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta")

5. Catalyst Optimizer

Catalyst est le moteur d’optimisation de Spark SQL. Il transforme ton code en un plan d’exécution optimal.

5.1 Phases d’optimisation

┌─────────────────────────────────────────────────────────────────────────────┐
│                         CATALYST OPTIMIZER                                  │
│                                                                             │
│   Code SQL/DataFrame                                                        │
│         │                                                                   │
│         ▼                                                                   │
│   ┌─────────────────┐                                                       │
│   │    ANALYSIS     │  Résoudre les noms de colonnes, tables               │
│   │                 │  Vérifier les types                                   │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│   ┌─────────────────┐                                                       │
│   │ LOGICAL OPTIM   │  Predicate pushdown, Column pruning                  │
│   │                 │  Constant folding, Filter reordering                 │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│   ┌─────────────────┐                                                       │
│   │ PHYSICAL PLAN   │  Choisir les algorithmes (Sort-Merge vs Broadcast)   │
│   │                 │  Cost-Based Optimization                             │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│   ┌─────────────────┐                                                       │
│   │ CODE GENERATION │  Générer du bytecode Java optimisé                   │
│   │   (Tungsten)    │  Whole-Stage Code Generation                         │
│   └────────┬────────┘                                                       │
│            ▼                                                                │
│        EXECUTION                                                            │
└─────────────────────────────────────────────────────────────────────────────┘

5.2 Lire un plan d’exécution

val df = spark.read.parquet("data/sales")
  .filter($"amount" > 100)
  .groupBy("category")
  .agg(sum("amount"))

// Plan simple
df.explain()

// Plan détaillé (Parsed → Analyzed → Optimized → Physical)
df.explain(true)

// Plan formaté (Spark 3.0+)
df.explain("formatted")

// Plan avec coûts (si CBO activé)
df.explain("cost")

5.3 Exemple de plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[category#12], functions=[sum(amount#14)])
   +- Exchange hashpartitioning(category#12, 200)        ← SHUFFLE
      +- HashAggregate(keys=[category#12], functions=[partial_sum(amount#14)])
         +- Project [category#12, amount#14]             ← Colonnes sélectionnées
            +- Filter (amount#14 > 100)                  ← Filtre pushdown
               +- FileScan parquet [category#12,amount#14]  ← Lecture

Ce qu’il faut regarder : - Exchange = Shuffle (coûteux !) - BroadcastHashJoin vs SortMergeJoin (broadcast = plus rapide si petite table) - FileScan : colonnes pruned, partitions pruned - Filter : poussé au plus près de la source

5.4 Cost-Based Optimization (CBO)

// Activer CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")

// Collecter les statistiques (important !)
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS amount, category")

6. Adaptive Query Execution (AQE)

AQE (Spark 3.0+) optimise le plan d’exécution pendant l’exécution, en se basant sur les statistiques réelles.

6.1 Fonctionnalités

Feature Description
Coalesce Partitions Fusionner les petites partitions après shuffle
Broadcast Join Conversion Convertir Sort-Merge → Broadcast si table petite
Skew Join Optimization Splitter les partitions déséquilibrées

6.2 Configuration

// Activer AQE (activé par défaut depuis Spark 3.2)
spark.conf.set("spark.sql.adaptive.enabled", "true")

// Coalesce automatique
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")

// Broadcast dynamique
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "10MB")

// Skew join
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

6.3 Voir AQE en action

// Le plan montre "AdaptiveSparkPlan"
df.explain()

// == Physical Plan ==
// AdaptiveSparkPlan isFinalPlan=true    ← AQE activé !
// +- ...

// Après exécution, voir le plan final
df.collect()  // Exécuter d'abord
df.explain()  // Maintenant isFinalPlan=true

7. Tungsten Engine

Tungsten est le moteur d’exécution bas-niveau de Spark, optimisé pour le CPU et la mémoire.

7.1 Optimisations Tungsten

Optimisation Description
Off-heap Memory Stockage hors JVM heap (évite GC)
Binary Format Données en format binaire compact
Cache-aware Algorithmes optimisés pour le cache CPU
Whole-Stage CodeGen Compile le plan en bytecode Java

7.2 Whole-Stage Code Generation

Au lieu d’appeler des fonctions virtuelles pour chaque row, Tungsten génère du code spécialisé.

SANS CODEGEN                          AVEC CODEGEN
                                      
for (row in data) {                   // Code généré automatiquement
  filter.eval(row)   ← virtual call   for (row in data) {
  project.eval(row)  ← virtual call     if (row.amount > 100) {
  agg.update(row)    ← virtual call       sum += row.amount
}                                       }
                                      }

7.3 Voir le code généré

// Activer le debug
spark.conf.set("spark.sql.codegen.wholeStage", "true")

// Voir le code généré
df.queryExecution.debug.codegen()

8. Memory Management & Tuning

8.1 Architecture Mémoire Spark

┌─────────────────────────────────────────────────────────────────────────────┐
│                     EXECUTOR MEMORY (spark.executor.memory)                 │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                SPARK MEMORY (spark.memory.fraction = 0.6)           │  │
│   │                                                                     │  │
│   │   ┌─────────────────────┐   ┌─────────────────────────────────┐    │  │
│   │   │   STORAGE MEMORY    │   │      EXECUTION MEMORY           │    │  │
│   │   │                     │   │                                 │    │  │
│   │   │   Cache, Broadcast  │ ⟷ │   Shuffle, Join, Sort, Agg     │    │  │
│   │   │                     │   │                                 │    │  │
│   │   │   (storageFraction  │   │   (1 - storageFraction          │    │  │
│   │   │    = 0.5)           │   │    = 0.5)                       │    │  │
│   │   └─────────────────────┘   └─────────────────────────────────┘    │  │
│   │                                                                     │  │
│   │   ← ──────────── Unified Memory (frontière flexible) ──────────── → │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    USER MEMORY (0.4)                                │  │
│   │   Structures internes, UDFs, métadonnées                            │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    RESERVED MEMORY (300MB fixe)                     │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

8.2 Configurations Mémoire

// Mémoire totale executor
spark.conf.set("spark.executor.memory", "8g")

// Fraction pour Spark (vs User)
spark.conf.set("spark.memory.fraction", "0.6")  // 60% pour Spark

// Fraction Storage dans Spark Memory
spark.conf.set("spark.memory.storageFraction", "0.5")  // 50% Storage, 50% Execution

// Off-heap (Tungsten)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

8.3 Pourquoi 80% des Jobs Spark sont Lents 🔥

┌─────────────────────────────────────────────────────────────────────────────┐
│            LES 5 CAUSES DE 80% DES JOBS SPARK LENTS                         │
│                                                                             │
│   1️⃣ DATA SKEW (40%)                                                        │
│      └─ 1 partition avec 10M rows, les autres avec 1K                       │
│      └─ Symptôme : 199 tasks à 2s, 1 task à 45min                          │
│      └─ Fix : salting, repartition par clé, AQE skew join                  │
│                                                                             │
│   2️⃣ SHUFFLE EXCESSIF (25%)                                                 │
│      └─ Trop de groupBy/join, pas de broadcast                              │
│      └─ Symptôme : "Shuffle Write" énorme dans Spark UI                    │
│      └─ Fix : broadcast join, réduire colonnes avant shuffle               │
│                                                                             │
│   3️⃣ MAUVAIS PARTITIONING (15%)                                             │
│      └─ 200 partitions par défaut, fichiers trop petits/gros                │
│      └─ Symptôme : 10000 tasks de 100KB ou 2 tasks de 50GB                 │
│      └─ Fix : repartition/coalesce, AQE coalesce, tuning shuffle.partitions│
│                                                                             │
│   4️⃣ SPILL TO DISK (10%)                                                    │
│      └─ Mémoire insuffisante, données écrites sur disque                    │
│      └─ Symptôme : "Spill (Memory)" dans Stage details                     │
│      └─ Fix : augmenter executor memory, réduire taille partitions         │
│                                                                             │
│   5️⃣ SMALL FILES PROBLEM (10%)                                              │
│      └─ Lire 10000 fichiers de 1MB au lieu de 100 de 100MB                  │
│      └─ Symptôme : temps de listing S3 très long, driver OOM               │
│      └─ Fix : compaction Delta, maxPartitionBytes, bin-packing             │
└─────────────────────────────────────────────────────────────────────────────┘

8.4 Diagnostic Toolkit

// 1. Vérifier le plan d'exécution
df.explain(true)          // Plan complet
df.explain("cost")        // Avec coûts estimés

// 2. Voir le nombre de partitions
println(s"Partitions: ${df.rdd.getNumPartitions}")

// 3. Détecter le skew (distribution par partition)
import org.apache.spark.sql.functions._
df.withColumn("partition_id", spark_partition_id())
  .groupBy("partition_id")
  .count()
  .orderBy(desc("count"))
  .show()

// 4. Voir la taille des partitions
df.rdd.mapPartitionsWithIndex { case (idx, iter) =>
  Iterator((idx, iter.size))
}.toDF("partition", "rows").show()

// 5. Métriques pendant l'exécution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

8.5 Spark UI : Ce qu’il faut regarder

Onglet Ce qu’on cherche
Jobs Durée totale, jobs qui traînent
Stages Shuffle Read/Write, Spill, tâches en retard
Tasks Distribution (min/median/max), stragglers
Storage Cache utilisé, mémoire disponible
SQL Plan physique, métriques par opérateur
Executors Mémoire, GC time, shuffle read/write

8.6 Debugging OOM

java.lang.OutOfMemoryError: Java heap space

CAUSES PROBABLES :
├── collect() sur un gros DataFrame
├── broadcast() d'une table trop grande
├── Trop de partitions small → overhead
├── Skew → 1 executor avec trop de données
└── Driver OOM → trop de metadata / résultats

SOLUTIONS :
├── Augmenter spark.executor.memory
├── Augmenter spark.driver.memory (si driver OOM)
├── Repartitionner les données
├── Éviter collect(), utiliser take() ou write()
└── Réduire spark.sql.autoBroadcastJoinThreshold

9. Exercices Pratiques

Exercice 1 : Pratique Scala — ADT Pipeline

Modéliser un pipeline de traitement avec des ADT.

// TODO: Créer un sealed trait pour les étapes du pipeline
// Étapes : Extract, Validate, Transform, Load
// Chaque étape peut Success ou Fail

// TODO: Implémenter une fonction qui exécute le pipeline
// et retourne un rapport détaillé de chaque étape
💡 Solution
sealed trait PipelineStep
case object Extract extends PipelineStep
case object Validate extends PipelineStep
case object Transform extends PipelineStep
case object Load extends PipelineStep

sealed trait StepResult
case class StepSuccess(step: PipelineStep, durationMs: Long, records: Long) extends StepResult
case class StepFailure(step: PipelineStep, error: String) extends StepResult

case class PipelineReport(results: List[StepResult]) {
  def isSuccess: Boolean = results.forall(_.isInstanceOf[StepSuccess])
  def failedStep: Option[StepFailure] = results.collectFirst { case f: StepFailure => f }
}

Exercice 2 : Either pour ETL

Implémenter un mini-ETL avec gestion d’erreurs typée.

// TODO: Créer une fonction qui :
// 1. Lit un fichier CSV (peut échouer : FileNotFound)
// 2. Valide que la colonne "amount" existe (peut échouer : SchemaError)
// 3. Filtre les montants > 0 (peut échouer : DataError si 0 records)
// 4. Écrit en Parquet
//
// Utiliser Either[ETLError, DataFrame] à chaque étape
// Chaîner avec for-comprehension

Exercice 3 : Setup Almond + Spark

  1. Installer Almond (suivre les instructions section 2.1)
  2. Créer un notebook Scala
  3. Importer Spark et Delta Lake
  4. Créer un DataFrame, le transformer, l’écrire en Delta

Exercice 4 : Projet IntelliJ Complet

  1. Créer le projet selon la structure section 3
  2. Ajouter les fichiers de config
  3. Implémenter le code
  4. Créer le fichier CSV de test
  5. Exécuter dans IntelliJ
  6. Builder avec sbt assembly
  7. Lancer avec spark-submit

Exercice 5 : Diagnostiquer un Job Lent

// Ce code est intentionnellement lent. Pourquoi ?
val df1 = spark.read.parquet("big_table")  // 100M rows
val df2 = spark.read.parquet("small_table")  // 1000 rows

val result = df1
  .join(df2, "key")  // Quel type de join ?
  .groupBy("category")
  .agg(sum("amount"))
  .collect()  // Problème ?

// TODO: 
// 1. Identifier les problèmes
// 2. Proposer des optimisations
// 3. Réécrire le code optimisé
💡 Solution

Problèmes : 1. Join sans broadcast → Sort-Merge Join (shuffle de 100M rows) 2. collect() sur un résultat potentiellement gros

Solution :

import org.apache.spark.sql.functions.broadcast

val result = df1
  .join(broadcast(df2), "key")  // Broadcast join !
  .groupBy("category")
  .agg(sum("amount"))
  .write.parquet("output")  // Écrire au lieu de collect

10. Mini-Projet : ETL Scala Bronze → Silver → Gold

Objectif

Construire un pipeline ETL complet en Scala avec architecture Medallion.

┌─────────────────────────────────────────────────────────────────────────────┐
│                         MEDALLION ARCHITECTURE                              │
│                                                                             │
│   ┌─────────────┐      ┌─────────────┐      ┌─────────────┐                │
│   │   BRONZE    │ ───▶ │   SILVER    │ ───▶ │    GOLD     │                │
│   │   (Raw)     │      │  (Cleaned)  │      │ (Aggregated)│                │
│   └─────────────┘      └─────────────┘      └─────────────┘                │
│                                                                             │
│   CSV/JSON           Delta Lake           Delta Lake                        │
│   + metadata         + validation         + business KPIs                   │
│   + ingestion_ts     + dedup              + reporting ready                 │
└─────────────────────────────────────────────────────────────────────────────┘

Livrables

  1. Projet IntelliJ complet avec structure
  2. ADT pour les erreurs et résultats
  3. 3 jobs : BronzeJob, SilverJob, GoldJob
  4. Main qui orchestre avec Either
  5. Tests avec ScalaTest
  6. Fat JAR et script spark-submit

Données

data/raw/transactions_*.json (plusieurs fichiers) :

{"transaction_id": "TXN001", "customer_id": "C001", "amount": 150.0, "category": "Electronics", "timestamp": "2024-01-15T10:30:00Z"}

Jobs à implémenter

BronzeJob : - Lire les JSON - Ajouter _ingestion_timestamp, _source_file - Écrire en Delta (append)

SilverJob : - Lire Bronze - Valider schéma - Dédupliquer par transaction_id - Filtrer amount > 0 - Écrire en Delta avec MERGE

GoldJob : - Lire Silver - Agrégations : ventes par catégorie, par jour - Top customers - Écrire plusieurs tables Gold


📚 Ressources

Documentation

Livres

  • Programming in Scala — Martin Odersky
  • Functional Programming in Scala — Paul Chiusano, Rúnar Bjarnason
  • Spark: The Definitive Guide — Bill Chambers, Matei Zaharia

Outils


➡️ Prochaine étape

👉 Module suivant : 31_ml_engineering — ML Engineering


🎉 Félicitations ! Tu maîtrises maintenant Scala pour Spark et les optimisations avancées.

Retour au sommet