Table Formats : Delta Lake & Apache Iceberg

Du Data Lake au Data Lakehouse

Bienvenue dans ce module où tu vas transformer un Data Lake en Data Lakehouse — combinant flexibilité et ACID.


Prérequis

Module Compétence Pourquoi ?
✅ 19 PySpark Advanced DataFrame API
✅ 20 Spark SQL SQL, Catalyst
✅ 22 Cloud Storage MinIO, s3a://

Objectifs

  • Comprendre pourquoi Parquet seul ne suffit pas
  • Maîtriser ACID pour les Data Lakes
  • Utiliser Delta Lake : MERGE, Time Travel, Schema Evolution
  • Comprendre Iceberg : Hidden Partitioning
  • Optimiser avec OPTIMIZE, Z-ORDER, VACUUM

1. Introduction — Pourquoi les Table Formats ?

1.1 L’évolution du stockage

2000s: DATA WAREHOUSE     2010s: DATA LAKE        2020s: LAKEHOUSE
• Oracle, Teradata        • Hadoop, S3+Parquet    • Delta, Iceberg
• ACID ✅                 • Flexible              • ACID ✅
• Coûteux 💰              • Pas ACID ❌           • Flexible
• Rigide                  • Cheap 💵              • Cheap 💵

1.2 Le problème avec Parquet seul

Scénario Problème
Deux jobs écrivent en même temps 💥 Données corrompues
UPDATE 100 lignes sur 10M 😰 Réécrire toute la partition
Job échoue à mi-chemin 🗑️ Fichiers partiels
Voir données d’il y a 3 jours ❌ Impossible
Schéma source change 💔 Erreurs de lecture

Les Table Formats résolvent TOUS ces problèmes.

1.3 C’est quoi un Data Lakehouse ?

┌─────────────────────────────────────────────────────┐
│                  DATA LAKEHOUSE                     │
│                                                     │
│   DATA LAKE          +       DATA WAREHOUSE         │
│   • Stockage cheap           • ACID                 │
│   • Formats ouverts          • Schema enforce       │
│   • Flexibilité              • SQL performant       │
│                                                     │
│   = Le meilleur des deux mondes !                   │
└─────────────────────────────────────────────────────┘

Les Table Formats ajoutent une couche de métadonnées (Transaction Log) au-dessus des fichiers Parquet.

Exercice 1 : Identifier les limites de Parquet

Scénario Problème ?
Job A lit pendant que Job B écrit ?
UPDATE 100 lignes sur 10M ?
Suppression accidentelle d’une partition ?
💡 Réponses
  • Dirty Read : données partielles/incohérentes
  • Réécriture totale : toute la partition
  • Pas de rollback : données perdues

2. Les Limites du Data Lake Classique

2.1 Pas de transactions ACID

Propriété Signification Parquet seul ?
Atomicity Tout-ou-rien ❌ Fichiers partiels
Consistency Contraintes respectées ❌ Pas de validation
Isolation Opérations isolées ❌ Dirty reads
Durability Données persistantes ✅ Sur S3

2.2 Mutations coûteuses

UPDATE 1 ligne = lire 10M lignes → modifier → réécrire 10M lignes = minutes

2.3 Small Files Problem

Streaming 5min/jour = 288 fichiers/jour × 365 = 100K+ fichiers/an

2.4 Schéma rigide

Pas de schema enforcement natif. Changer partitionnement = réécriture totale.


3. Introduction aux Table Formats

3.1 Architecture

┌─────────────────────────────────────────────┐
│           QUERY ENGINE (Spark, Trino)       │
└─────────────────────┬───────────────────────┘
                      ▼
┌─────────────────────────────────────────────┐
│         TABLE FORMAT LAYER                  │
│      (Delta Lake / Iceberg / Hudi)          │
│  • Transaction Log                          │
│  • ACID, Time Travel, Schema Evolution      │
└─────────────────────┬───────────────────────┘
                      ▼
┌─────────────────────────────────────────────┐
│     FILE FORMAT (Parquet) sur STORAGE (S3)  │
└─────────────────────────────────────────────┘

3.2 Le Transaction Log

Version 0 : ADD file_001.parquet
Version 1 : ADD file_002.parquet
Version 2 : ADD file_003.parquet, REMOVE file_001.parquet

État actuel = { file_002, file_003 }
État v1     = { file_001, file_002 }  ← Time Travel !

3.3 Comparaison Delta vs Iceberg vs Hudi

Critère Delta Lake Iceberg Hudi
Créateur Databricks Netflix Uber
Moteurs Spark +++ Multi-engine Spark, Flink
ACID
Time Travel ✅✅ ✅✅
Schema Evolution ✅✅ (best)
Hidden Partitioning ✅ (unique!)
Z-Ordering
Catalog requis Optionnel Obligatoire Obligatoire
CDC natif Change Data Feed ✅✅ (best)

3.4 Lequel choisir ?

  • 🎯 Delta Lake : Spark, Databricks, simplicité, débutants
  • 🎯 Iceberg : Multi-engine, Hidden Partitioning, AWS Athena
  • 🎯 Hudi : CDC massif, near-real-time, AWS EMR

4. Delta Lake — Deep Dive

4.1 Architecture : _delta_log/

s3://bucket/sales/
├── _delta_log/
│   ├── 00000000000000000000.json  ← Version 0
│   ├── 00000000000000000001.json  ← Version 1
│   ├── 00000000000000000010.checkpoint.parquet
│   └── _last_checkpoint
├── part-00000-abc.parquet
└── part-00001-def.parquet

Chaque JSON contient :

  • add : nouveau fichier ajouté
  • remove : fichier retiré
  • commitInfo : métadonnées de l’opération
  • stats : min/max pour Data Skipping
Voir le code
# Configuration Spark avec Delta Lake

config = '''
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Delta Lake Demo") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Pour MinIO
spark.conf.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
spark.conf.set("spark.hadoop.fs.s3a.access.key", "minioadmin")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "minioadmin")
spark.conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
'''
print(config)
Voir le code
# Créer une table Delta

create_examples = '''
# MÉTHODE 1 : Depuis DataFrame
df = spark.createDataFrame([
    (1, "Alice", 1200.0, "2024-01-15"),
    (2, "Bob", 350.0, "2024-01-15"),
], ["id", "customer", "amount", "date"])

df.write.format("delta").mode("overwrite").save("s3a://silver/sales/")

# MÉTHODE 2 : SQL
spark.sql("""
    CREATE TABLE sales (id INT, customer STRING, amount DOUBLE, date DATE)
    USING DELTA
    LOCATION 's3a://silver/sales/'
""")

# MÉTHODE 3 : Convertir Parquet existant (SANS copie !)
spark.sql("CONVERT TO DELTA parquet.`s3a://bronze/old_table/`")
'''
print(create_examples)

Exercice 2 : Créer une table Delta

# 1. Créer DataFrame
data = [(1, "Laptop", 1200.0), (2, "Mouse", 25.0)]
df = spark.createDataFrame(data, ["id", "product", "price"])

# 2. Sauvegarder en Delta
# TODO: df.write.format("delta")...

# 3. Lister _delta_log/
💡 Solution
df.write.format("delta").mode("overwrite").save("/tmp/products")

4.4 MERGE INTO : CDC et Upsert

TARGET               SOURCE CDC             APRÈS MERGE
┌────┬───────┐      ┌────┬───────┬────┐    ┌────┬───────┐
│ id │ name  │      │ id │ name  │ op │    │ id │ name  │
├────┼───────┤      ├────┼───────┼────┤    ├────┼───────┤
│ 1  │ Alice │      │ 1  │ Alice │ U  │ →  │ 1  │ ALICE │ Updated
│ 2  │ Bob   │      │ 3  │ New   │ I  │ →  │ 3  │ New   │ Inserted
└────┴───────┘      │ 2  │ Bob   │ D  │    └────┴───────┘ (Bob deleted)
                    └────┴───────┴────┘
Voir le code
# MERGE INTO - Exemple complet

merge_sql = '''
MERGE INTO target_table AS target
USING source_cdc AS source
ON target.id = source.id

WHEN MATCHED AND source.op = 'D' THEN DELETE

WHEN MATCHED AND source.op = 'U' THEN
    UPDATE SET target.name = source.name, target.amount = source.amount

WHEN NOT MATCHED THEN
    INSERT (id, name, amount) VALUES (source.id, source.name, source.amount)
'''

merge_python = '''
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, "s3a://silver/customers/")
source = spark.read.parquet("s3a://bronze/cdc/")

target.alias("t").merge(source.alias("s"), "t.id = s.id") \
    .whenMatchedDelete(condition="s.op = 'D'") \
    .whenMatchedUpdate(condition="s.op = 'U'", set={"name": "s.name"}) \
    .whenNotMatchedInsert(values={"id": "s.id", "name": "s.name"}) \
    .execute()
'''

print("SQL:", merge_sql)
print("\nPython:", merge_python)

Exercice 3 : CDC avec MERGE INTO

# Initial
initial = [(1, "Alice"), (2, "Bob")]

# CDC
cdc = [(1, "ALICE", "U"), (3, "New", "I"), (2, None, "D")]

# TODO: Créer table, appliquer MERGE
💡 Solution
target = DeltaTable.forPath(spark, "/tmp/customers")
target.alias("t").merge(df_cdc.alias("s"), "t.id = s.id") \
    .whenMatchedDelete(condition="s.op = 'D'") \
    .whenMatchedUpdate(condition="s.op = 'U'", set={"name": "s.name"}) \
    .whenNotMatchedInsert(values={"id": "s.id", "name": "s.name"}) \
    .execute()

4.5 Time Travel

Version 0     Version 1     Version 2     Version 3
(Create)      (INSERT)      (UPDATE)      (DELETE)
   │              │              │              │
   ▼              ▼              ▼              ▼
┌────────┐   ┌────────┐   ┌────────┐   ┌────────┐
│ 1 row  │ → │ 3 rows │ → │ 3 rows │ → │ 2 rows │
└────────┘   └────────┘   │modified│   └────────┘

Tu peux lire N'IMPORTE quelle version !
Voir le code
# Time Travel

time_travel = '''
# Voir l'historique
spark.sql("DESCRIBE HISTORY sales").show()

# Lire par version
df_v1 = spark.sql("SELECT * FROM sales VERSION AS OF 1")
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("path/")

# Lire par timestamp
df = spark.sql("SELECT * FROM sales TIMESTAMP AS OF '2024-01-03 12:00:00'")

# RESTORE : Revenir à une version
spark.sql("RESTORE TABLE sales TO VERSION AS OF 5")
'''

print(time_travel)
print("\n💡 Use cases: Audit, rollback, debug, ML reproductibility")

Exercice 4 : Restaurer après erreur

# OUPS ! DELETE sans WHERE
spark.sql("DELETE FROM sales")

# TODO: Utiliser Time Travel pour restaurer
💡 Solution
spark.sql("DESCRIBE HISTORY sales").show()  # Trouver version avant DELETE
spark.sql("RESTORE TABLE sales TO VERSION AS OF 1")

4.6 Schema Management

Mode Comportement Option
Enforcement Rejette schémas différents (défaut)
Evolution Ajoute nouvelles colonnes mergeSchema=true
Overwrite Remplace schéma entier overwriteSchema=true
# Schema Evolution
df_new.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("path/")

5. Apache Iceberg — Deep Dive

5.1 Architecture

s3://bucket/sales/
├── metadata/
│   ├── v1.metadata.json     ← Metadata File
│   ├── snap-xxx.avro        ← Manifest List
│   └── xxx-m0.avro          ← Manifest File
└── data/
    └── part-00000.parquet

Structure arborescente vs log séquentiel = plus rapide pour grandes tables.

5.2 Hidden Partitioning (Killer Feature)

CREATE TABLE events (
    event_id LONG,
    event_ts TIMESTAMP,
    user_id LONG
)
USING iceberg
PARTITIONED BY (
    days(event_ts),      -- Partition par jour (caché !)
    bucket(16, user_id)  -- Bucket hash
)

-- L'utilisateur n'a PAS besoin de connaître le partitionnement !
SELECT * FROM events WHERE event_ts = '2024-01-15 10:00:00'
-- Partition pruning automatique !

Transforms : years(), months(), days(), hours(), bucket(N, col), truncate(L, col)

Exercice 5 : Table Iceberg avec Hidden Partitioning

💡 Solution
CREATE TABLE my_catalog.db.page_views (
    view_id LONG, view_timestamp TIMESTAMP, user_id LONG
)
USING iceberg
PARTITIONED BY (days(view_timestamp), bucket(8, user_id))

5.3 Schema Evolution avancée

ALTER TABLE events ADD COLUMN source STRING
ALTER TABLE events RENAME COLUMN payload TO event_data  -- Sans réécriture !
ALTER TABLE events DROP COLUMN deprecated              -- Sans réécriture !

5.4 Catalog (obligatoire)

Iceberg nécessite un catalog : Hive Metastore, AWS Glue, Nessie, Unity Catalog.


7. Maintenance & Optimisation

7.1 Compaction (OPTIMIZE)

-- Delta
OPTIMIZE sales
OPTIMIZE sales WHERE date >= '2024-01-01'

-- Iceberg
CALL catalog.system.rewrite_data_files(table => 'db.sales')

7.2 Z-Ordering

Organise données par colonnes filtrées → meilleur Data Skipping.

OPTIMIZE sales ZORDER BY (customer_id, product_id)

Règle : Partition par faible cardinalité, Z-ORDER par haute cardinalité.

7.3 VACUUM

-- Supprimer fichiers > 7 jours
VACUUM sales
VACUUM sales RETAIN 168 HOURS

-- Iceberg
CALL catalog.system.expire_snapshots(table => 'db.sales', retain_last => 5)

⚠️ Après VACUUM, Time Travel sur versions supprimées impossible !

Exercice 7 : Compacter une table

# Vérifier fichiers avant
spark.sql("DESCRIBE DETAIL delta.`path`").select("numFiles").show()

# TODO: OPTIMIZE + ZORDER

# Vérifier après
💡 Solution
spark.sql("OPTIMIZE delta.`path` ZORDER BY (id)")

8. Migration Parquet → Delta

8.1 Conversion in-place (SANS copie !)

CONVERT TO DELTA parquet.`s3a://bronze/old_table/`
CONVERT TO DELTA parquet.`s3a://bronze/partitioned/`
    PARTITIONED BY (date STRING)

Crée _delta_log/ sans déplacer les données. Temps : secondes.

8.2 Migration avec CTAS

CREATE TABLE new_delta USING DELTA AS
SELECT * FROM parquet.`s3a://bronze/old/`

9. Apache Hudi — Aperçu

Copy-on-Write vs Merge-on-Read

CoW MoR
Écriture Réécrit fichier entier (🐢) Écrit delta log (🚀)
Lecture Lit fichier (🚀) Merge base + deltas (🐢)
Use case Lectures fréquentes CDC intensif

Quand utiliser Hudi : CDC massif, near-real-time, AWS EMR.


10. Mini-Projet : Lakehouse avec MinIO

┌─────────────────────────────────────────────────────┐
│                    MinIO                            │
│  bronze/          silver/           gold/           │
│  (CSV)     →     (Delta)      →    (Delta)          │
│  orders.csv       orders/           daily_sales/    │
│                   _delta_log/       _delta_log/     │
│      ↓               ↓                  ↓           │
│   Upload       MERGE INTO          OPTIMIZE         │
│               (Dedupe+CDC)         ZORDER BY        │
└─────────────────────────────────────────────────────┘

Étapes :

  1. Démarrer MinIO
  2. Configurer Spark + Delta
  3. Bronze → Silver (MERGE)
  4. Time Travel audit
  5. Silver → Gold (agrégations)
  6. OPTIMIZE + ZORDER
  7. VACUUM

Quiz

Q1. Pourquoi Parquet seul ne garantit pas ACID ?
R Pas de Transaction Log pour atomicité/isolation.
Q2. Différence Schema Enforcement vs Evolution ?
R Enforcement rejette, Evolution adapte.
Q3. Quel format supporte Hidden Partitioning ?
R Apache Iceberg uniquement.
Q4. Rôle du Transaction Log Delta ?
R Enregistre add/remove pour ACID et Time Travel.
Q5. Objectif de ZORDER BY ?
R Organiser données pour meilleur Data Skipping.
Q6. Quelle opération supprime les vieux fichiers ?
R VACUUM (Delta) ou expire_snapshots (Iceberg).

📚 Ressources


➡️ Prochaine étape

👉 Module 24 : 24_kafka_streaming — Kafka & Streaming


📝 Récapitulatif

Concept Appris
Data Lakehouse Lake + Warehouse
ACID Atomicité, Consistance, Isolation, Durabilité
Delta Lake Transaction Log, MERGE, Time Travel
Iceberg Hidden Partitioning, Schema Evolution
Maintenance OPTIMIZE, ZORDER, VACUUM
Migration CONVERT TO DELTA

🎉 Félicitations ! Module Table Formats terminé.

Retour au sommet