TARGET SOURCE CDC APRÈS MERGE
┌────┬───────┐ ┌────┬───────┬────┐ ┌────┬───────┐
│ id │ name │ │ id │ name │ op │ │ id │ name │
├────┼───────┤ ├────┼───────┼────┤ ├────┼───────┤
│ 1 │ Alice │ │ 1 │ Alice │ U │ → │ 1 │ ALICE │ Updated
│ 2 │ Bob │ │ 3 │ New │ I │ → │ 3 │ New │ Inserted
└────┴───────┘ │ 2 │ Bob │ D │ └────┴───────┘ (Bob deleted)
└────┴───────┴────┘
Voir le code
# MERGE INTO - Exemple completmerge_sql ='''MERGE INTO target_table AS targetUSING source_cdc AS sourceON target.id = source.idWHEN MATCHED AND source.op = 'D' THEN DELETEWHEN MATCHED AND source.op = 'U' THEN UPDATE SET target.name = source.name, target.amount = source.amountWHEN NOT MATCHED THEN INSERT (id, name, amount) VALUES (source.id, source.name, source.amount)'''merge_python ='''from delta.tables import DeltaTabletarget = DeltaTable.forPath(spark, "s3a://silver/customers/")source = spark.read.parquet("s3a://bronze/cdc/")target.alias("t").merge(source.alias("s"), "t.id = s.id") \ .whenMatchedDelete(condition="s.op = 'D'") \ .whenMatchedUpdate(condition="s.op = 'U'", set={"name": "s.name"}) \ .whenNotMatchedInsert(values={"id": "s.id", "name": "s.name"}) \ .execute()'''print("SQL:", merge_sql)print("\nPython:", merge_python)
Version 0 Version 1 Version 2 Version 3
(Create) (INSERT) (UPDATE) (DELETE)
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ 1 row │ → │ 3 rows │ → │ 3 rows │ → │ 2 rows │
└────────┘ └────────┘ │modified│ └────────┘
Tu peux lire N'IMPORTE quelle version !
Voir le code
# Time Traveltime_travel ='''# Voir l'historiquespark.sql("DESCRIBE HISTORY sales").show()# Lire par versiondf_v1 = spark.sql("SELECT * FROM sales VERSION AS OF 1")df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("path/")# Lire par timestampdf = spark.sql("SELECT * FROM sales TIMESTAMP AS OF '2024-01-03 12:00:00'")# RESTORE : Revenir à une versionspark.sql("RESTORE TABLE sales TO VERSION AS OF 5")'''print(time_travel)print("\n💡 Use cases: Audit, rollback, debug, ML reproductibility")
Exercice 4 : Restaurer après erreur
# OUPS ! DELETE sans WHEREspark.sql("DELETE FROM sales")# TODO: Utiliser Time Travel pour restaurer
💡 Solution
spark.sql("DESCRIBE HISTORY sales").show() # Trouver version avant DELETEspark.sql("RESTORE TABLE sales TO VERSION AS OF 1")
ALTERTABLEeventsADDCOLUMNsource STRINGALTERTABLEeventsRENAMECOLUMN payload TO event_data -- Sans réécriture !ALTERTABLEeventsDROPCOLUMN deprecated -- Sans réécriture !
Q1. Pourquoi Parquet seul ne garantit pas ACID ?
R
Pas de Transaction Log pour atomicité/isolation.
Q2. Différence Schema Enforcement vs Evolution ?
R
Enforcement rejette, Evolution adapte.
Q3. Quel format supporte Hidden Partitioning ?
R
Apache Iceberg uniquement.
Q4. Rôle du Transaction Log Delta ?
R
Enregistre add/remove pour ACID et Time Travel.
Q5. Objectif de ZORDER BY ?
R
Organiser données pour meilleur Data Skipping.
Q6. Quelle opération supprime les vieux fichiers ?
R
VACUUM (Delta) ou expire_snapshots (Iceberg).