Kafka, Python & Structured Streaming

Le Temps Réel pour le Data Engineering

Bienvenue dans ce module où tu vas maîtriser le streaming de données — la capacité à traiter des flux continus d’événements en temps réel plutôt que des batches périodiques.


Prérequis

Module Compétence Pourquoi ?
✅ 19 PySpark Advanced DataFrame API
✅ 21 Spark on K8s Déploiement
✅ 23 Table Formats Delta Lake comme Sink

Objectifs

À la fin de ce module, tu seras capable de :

  • Comprendre les architectures de streaming (Lambda vs Kappa)
  • Déployer Apache Kafka et créer des topics
  • Écrire des producteurs/consommateurs Python natifs
  • Construire des pipelines Spark Structured Streaming
  • Maîtriser Watermarks et Windowing pour le temps d’événement
  • Utiliser foreachBatch + MERGE INTO pour des sinks transactionnels

1. Introduction — Pourquoi le Temps Réel ?

1.1 L’évolution des architectures de données

Historiquement, le traitement de données était batch : on collecte les données pendant X heures, puis on les traite. Mais les besoins modernes exigent une latence plus faible.

ARCHITECTURE LAMBDA (2010s)           ARCHITECTURE KAPPA (2020s)
┌─────────────────────────────┐      ┌─────────────────────────────┐
│                             │      │                             │
│  Source ──┬── Batch Layer   │      │  Source ─── Stream Layer    │
│           │   (Spark Batch) │      │             (Kafka + SSS)   │
│           │        │        │      │                  │          │
│           └── Speed Layer   │      │                  │          │
│               (Storm)       │      │                  │          │
│                  │          │      │                  │          │
│           Serving Layer     │      │           Data Lake         │
│                             │      │           (Delta)           │
│  ⚠️ 2 codebases à maintenir │      │  ✅ 1 seul pipeline         │
└─────────────────────────────┘      └─────────────────────────────┘

1.2 Cas d’usage du temps réel

Domaine Exemple Latence requise
Fraude Détecter transaction suspecte < 1 seconde
IoT Alerter si capteur anormal < 5 secondes
E-commerce Recommandations live < 100 ms
Monitoring Alerter si service down < 30 secondes
Finance Trading algorithmique < 10 ms

1.3 Batch vs Streaming : Les différences fondamentales

Aspect Batch Streaming
Données Bornées (bounded) Non-bornées (unbounded)
Traitement Périodique (horaire, quotidien) Continu
Latence Minutes à heures Secondes à millisecondes
État Recalculé à chaque run Maintenu entre événements
Complexité Plus simple Plus complexe (temps, état)

1.4 Micro-batch vs Continuous

Il existe deux modèles de traitement streaming :

MICRO-BATCH (Spark SSS défaut)        CONTINUOUS (Flink, Kafka Streams)
┌─────────────────────────────┐      ┌─────────────────────────────┐
│                             │      │                             │
│  ┌───┐ ┌───┐ ┌───┐ ┌───┐  │      │  ─────────────────────────▶ │
│  │ 1 │ │ 2 │ │ 3 │ │ 4 │  │      │  Traitement événement par   │
│  └───┘ └───┘ └───┘ └───┘  │      │  événement                  │
│  Batches de 100ms-1s       │      │                             │
│                             │      │                             │
│  Latence: ~1s              │      │  Latence: ~10ms             │
│  Throughput: Très élevé    │      │  Throughput: Élevé          │
└─────────────────────────────┘      └─────────────────────────────┘

Spark Structured Streaming utilise le micro-batch par défaut (suffisant pour 90% des cas).

1.5 Garanties de livraison

Un concept crucial en streaming : que se passe-t-il si le système plante ?

Garantie Description Quand l’utiliser
At-most-once Message traité 0 ou 1 fois Logs non critiques
At-least-once Message traité 1+ fois (doublons possibles) Compteurs, métriques
Exactly-once Message traité exactement 1 fois Transactions financières
AT-MOST-ONCE              AT-LEAST-ONCE           EXACTLY-ONCE
┌─────────────┐          ┌─────────────┐         ┌─────────────┐
│ Fire & Forget│          │ Retry until │         │ Transactional│
│             │          │ ACK         │         │ + Idempotent │
│ Peut perdre │          │ Peut dupliquer│        │ Parfait     │
│ des messages│          │ des messages │         │             │
└─────────────┘          └─────────────┘         └─────────────┘
     😢                       😐                      🎯

Exactly-once est le Saint Graal, mais plus complexe à implémenter. Spark SSS + Kafka + Delta Lake peuvent l’atteindre !


2. Apache Kafka — Le Bus de Messagerie

2.1 Qu’est-ce que Kafka ?

Apache Kafka est une plateforme de streaming distribuée créée par LinkedIn en 2011. C’est devenu le standard de facto pour le streaming de données.

Kafka n’est PAS une base de données, mais un log distribué où les messages sont :

  • Écrits de manière append-only (jamais modifiés)
  • Persistés sur disque (pas juste en mémoire)
  • Répliqués pour la tolérance aux pannes

2.2 Architecture de Kafka

┌─────────────────────────────────────────────────────────────────┐
│                      KAFKA CLUSTER                              │
│                                                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  Broker 1   │  │  Broker 2   │  │  Broker 3   │            │
│  │             │  │             │  │             │            │
│  │ Topic A P0  │  │ Topic A P1  │  │ Topic A P2  │  ← Partitions│
│  │ Topic B P1  │  │ Topic B P0  │  │ Topic B P2  │    réparties │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
│         │                │                │                    │
│         └────────────────┼────────────────┘                    │
│                          │                                     │
│                  ┌───────────────┐                             │
│                  │  ZooKeeper/   │  ← Coordination             │
│                  │  KRaft        │    (metadata, leaders)      │
│                  └───────────────┘                             │
└─────────────────────────────────────────────────────────────────┘
         ▲                                          │
         │                                          ▼
   ┌───────────┐                            ┌───────────┐
   │ Producers │                            │ Consumers │
   │ (Python)  │                            │ (Spark)   │
   └───────────┘                            └───────────┘

2.3 Concepts clés

Concept Description
Broker Serveur Kafka qui stocke les messages
Topic Catégorie/flux de messages (comme une table)
Partition Sous-division d’un topic pour le parallélisme
Offset Position d’un message dans une partition
Producer Application qui envoie des messages
Consumer Application qui lit des messages
Consumer Group Groupe de consumers qui se partagent les partitions

2.4 Topics et Partitions

Topic: "orders" avec 3 partitions

Partition 0:  [msg0] [msg3] [msg6] [msg9]  ...  → Offset croissant
Partition 1:  [msg1] [msg4] [msg7] [msg10] ...
Partition 2:  [msg2] [msg5] [msg8] [msg11] ...

Chaque partition :
• Est ordonnée (FIFO dans la partition)
• Peut être lue par UN consumer du groupe
• Est répliquée sur plusieurs brokers

Clé de message : Détermine la partition. Messages avec la même clé → même partition → ordre garanti.

2.5 Installation Kafka avec Docker

Nous allons déployer Kafka en local avec Docker Compose. Deux options :

  • ZooKeeper : Mode classique (stable)
  • KRaft : Nouveau mode sans ZooKeeper (Kafka 3.3+)
Voir le code
# Docker Compose pour Kafka avec ZooKeeper

docker_compose_kafka = '''
version: "3.8"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"      # Pour les clients externes
      - "29092:29092"    # Pour les clients Docker internes
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  # Schema Registry (optionnel mais recommandé)
  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    container_name: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
'''

print(docker_compose_kafka)
print("\n# Démarrer avec : docker-compose up -d")
print("# Vérifier : docker-compose ps")
Voir le code
# Commandes CLI Kafka essentielles

kafka_cli = '''
# ═══════════════════════════════════════════════════════════════
# Gestion des Topics
# ═══════════════════════════════════════════════════════════════

# Créer un topic
docker exec kafka kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --partitions 3 \
  --replication-factor 1

# Lister les topics
docker exec kafka kafka-topics --list \
  --bootstrap-server localhost:9092

# Décrire un topic
docker exec kafka kafka-topics --describe \
  --bootstrap-server localhost:9092 \
  --topic orders

# ═══════════════════════════════════════════════════════════════
# Produire et Consommer (test rapide)
# ═══════════════════════════════════════════════════════════════

# Produire des messages (interactif)
docker exec -it kafka kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic orders

# Consommer des messages (depuis le début)
docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

# ═══════════════════════════════════════════════════════════════
# Consumer Groups
# ═══════════════════════════════════════════════════════════════

# Lister les consumer groups
docker exec kafka kafka-consumer-groups --list \
  --bootstrap-server localhost:9092

# Voir le lag d'un group
docker exec kafka kafka-consumer-groups --describe \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group
'''

print(kafka_cli)

Exercice 1 : Déployer Kafka et créer un topic

Objectif : Mettre en place l’infrastructure Kafka.

# 1. Créer docker-compose.yml avec le contenu ci-dessus

# 2. Démarrer Kafka
docker-compose up -d

# 3. Créer un topic "events" avec 3 partitions
# TODO

# 4. Vérifier que le topic existe
# TODO

# 5. Tester avec console-producer et console-consumer
# TODO
💡 Solution
# 3. Créer topic
docker exec kafka kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic events --partitions 3 --replication-factor 1

# 4. Vérifier
docker exec kafka kafka-topics --describe \
  --bootstrap-server localhost:9092 --topic events

# 5. Test
# Terminal 1: docker exec -it kafka kafka-console-producer --bootstrap-server localhost:9092 --topic events
# Terminal 2: docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic events --from-beginning

3. Schema Registry & Sérialisation

3.1 Le problème de la sérialisation

Kafka transporte des bytes. Il faut donc sérialiser/désérialiser les données. Trois formats populaires :

Format Avantages Inconvénients
JSON Lisible, flexible Verbeux, pas de schéma
Avro Compact, schéma, évolution Moins lisible
Protobuf Très compact, typage fort Plus complexe

3.2 Pourquoi un Schema Registry ?

Sans Schema Registry, chaque producteur/consommateur doit connaître le schéma. Problèmes :

  • Comment évoluer le schéma sans casser les consumers ?
  • Comment valider que les messages sont conformes ?
SANS SCHEMA REGISTRY              AVEC SCHEMA REGISTRY
┌─────────────────────┐          ┌─────────────────────┐
│ Producer            │          │ Producer            │
│ {schéma hardcodé}   │          │ → Enregistre schéma │
│         │           │          │ → Envoie schema_id  │
│         ▼           │          │         │           │
│    [message]        │          │    [id + message]   │
│         │           │          │         │           │
│         ▼           │          │         ▼           │
│ Consumer            │          │ Consumer            │
│ {schéma hardcodé}   │          │ → Récupère schéma   │
│                     │          │   par id            │
│ 😰 Schéma désync!   │          │ ✅ Toujours à jour  │
└─────────────────────┘          └─────────────────────┘

3.3 Compatibilité de schéma

Le Schema Registry vérifie la compatibilité lors de l’évolution :

Mode Description Exemple autorisé
BACKWARD Nouveau schéma peut lire ancien Ajouter champ optionnel
FORWARD Ancien schéma peut lire nouveau Supprimer champ optionnel
FULL Les deux Ajouter/supprimer champ optionnel
NONE Pas de vérification Tout (dangereux)
Voir le code
# Exemple de schéma Avro

avro_schema = '''
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["PENDING", "COMPLETED", "CANCELLED"]}},
    {"name": "notes", "type": ["null", "string"], "default": null}  // Champ optionnel
  ]
}
'''

print("Schéma Avro pour les commandes :")
print(avro_schema)
print("\n💡 Points clés :")
print("• 'type': 'record' → structure comme une classe")
print("• 'logicalType' → types avancés (timestamp, date, decimal)")
print("• ['null', 'string'] → champ optionnel (union type)")

4. Python Natif pour Kafka

Avant d’utiliser Spark, apprenons à interagir avec Kafka en Python pur. Deux librairies principales :

Librairie Avantages Quand l’utiliser
kafka-python Simple, pur Python Scripts simples, prototypage
confluent-kafka Performant, Schema Registry Production, Avro

4.1 Installation

Voir le code
# Installation des librairies Kafka Python
# !pip install kafka-python confluent-kafka fastavro

print("Librairies à installer :")
print("pip install kafka-python       # Client simple")
print("pip install confluent-kafka    # Client performant + Schema Registry")
print("pip install fastavro           # Sérialisation Avro")

4.2 Producteur Python (kafka-python)

Voir le code
# Producteur Kafka simple avec kafka-python

producer_simple = '''
from kafka import KafkaProducer
import json
import time
from datetime import datetime

# ═══════════════════════════════════════════════════════════════
# Configuration du producteur
# ═══════════════════════════════════════════════════════════════

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    
    # Sérialisation JSON
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    
    # Configuration de fiabilité
    acks='all',              # Attendre confirmation de tous les replicas
    retries=3,               # Réessayer en cas d'erreur
    retry_backoff_ms=100,    # Délai entre retries
)

# ═══════════════════════════════════════════════════════════════
# Envoyer des messages
# ═══════════════════════════════════════════════════════════════

def send_order(order_id, customer_id, amount):
    """Envoyer une commande au topic 'orders'"""
    message = {
        "order_id": order_id,
        "customer_id": customer_id,
        "amount": amount,
        "timestamp": datetime.now().isoformat(),
        "status": "PENDING"
    }
    
    # Envoyer avec une clé (même customer → même partition → ordre garanti)
    future = producer.send(
        topic='orders',
        key=customer_id,    # Clé pour le partitionnement
        value=message
    )
    
    # Attendre confirmation (synchrone)
    try:
        record_metadata = future.get(timeout=10)
        print(f"✅ Message envoyé: partition={record_metadata.partition}, offset={record_metadata.offset}")
    except Exception as e:
        print(f"❌ Erreur: {e}")

# Simuler un flux de commandes
for i in range(10):
    send_order(
        order_id=f"ORD-{i:04d}",
        customer_id=f"CUST-{i % 3:03d}",  # 3 customers
        amount=round(100 + i * 10.5, 2)
    )
    time.sleep(0.5)

# Important : flush avant de quitter
producer.flush()
producer.close()
'''

print(producer_simple)

4.3 Consommateur Python (kafka-python)

Voir le code
# Consommateur Kafka simple avec kafka-python

consumer_simple = '''
from kafka import KafkaConsumer
import json

# ═══════════════════════════════════════════════════════════════
# Configuration du consommateur
# ═══════════════════════════════════════════════════════════════

consumer = KafkaConsumer(
    'orders',                             # Topic(s) à consommer
    bootstrap_servers=['localhost:9092'],
    
    # Désérialisation JSON
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    
    # Consumer Group (pour le parallélisme)
    group_id='order-processor-group',
    
    # Où commencer à lire
    auto_offset_reset='earliest',  # 'earliest' = depuis le début, 'latest' = nouveaux messages seulement
    
    # Commit des offsets
    enable_auto_commit=True,       # Commit automatique
    auto_commit_interval_ms=5000,  # Toutes les 5 secondes
)

# ═══════════════════════════════════════════════════════════════
# Boucle de consommation
# ═══════════════════════════════════════════════════════════════

print("🎧 En attente de messages...")

try:
    for message in consumer:
        # Métadonnées du message
        print(f"\n📩 Message reçu:")
        print(f"   Topic: {message.topic}")
        print(f"   Partition: {message.partition}")
        print(f"   Offset: {message.offset}")
        print(f"   Key: {message.key}")
        print(f"   Timestamp: {message.timestamp}")
        
        # Contenu du message
        order = message.value
        print(f"   Order: {order}")
        
        # Logique métier : alerter si montant élevé
        if order.get('amount', 0) > 500:
            print(f"   ⚠️ ALERTE: Commande de {order['amount']}€ !")

except KeyboardInterrupt:
    print("\n🛑 Arrêt du consommateur")
finally:
    consumer.close()
'''

print(consumer_simple)

4.4 Producteur avec Avro et Schema Registry (confluent-kafka)

Voir le code
# Producteur avec Avro et Schema Registry

producer_avro = '''
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField

# ═══════════════════════════════════════════════════════════════
# Configuration Schema Registry
# ═══════════════════════════════════════════════════════════════

schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Schéma Avro
order_schema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long"}
  ]
}
"""

# Sérialiseurs
avro_serializer = AvroSerializer(
    schema_registry_client,
    order_schema,
    lambda obj, ctx: obj  # Conversion dict → Avro
)
string_serializer = StringSerializer('utf-8')

# ═══════════════════════════════════════════════════════════════
# Configuration producteur
# ═══════════════════════════════════════════════════════════════

producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all'
}
producer = Producer(producer_conf)

# Callback de confirmation
def delivery_report(err, msg):
    if err:
        print(f"❌ Erreur: {err}")
    else:
        print(f"✅ Message livré: {msg.topic()}[{msg.partition()}] @ {msg.offset()}")

# ═══════════════════════════════════════════════════════════════
# Envoyer des messages
# ═══════════════════════════════════════════════════════════════

import time

for i in range(5):
    order = {
        "order_id": f"ORD-{i:04d}",
        "customer_id": f"CUST-{i % 3:03d}",
        "amount": 100.0 + i * 25.5,
        "timestamp": int(time.time() * 1000)
    }
    
    producer.produce(
        topic='orders-avro',
        key=string_serializer(order["customer_id"]),
        value=avro_serializer(order, SerializationContext('orders-avro', MessageField.VALUE)),
        callback=delivery_report
    )
    producer.poll(0)  # Déclencher les callbacks

producer.flush()  # Attendre que tous les messages soient envoyés
'''

print(producer_avro)

Exercice 2 : Producteur et Consommateur Python

Objectif : Créer un système d’alertes en temps réel.

# 1. Créer un producteur qui envoie des logs au format:
# {"level": "INFO|WARN|ERROR|FATAL", "message": "...", "timestamp": "..."}

# 2. Créer un consommateur qui:
#    - Affiche tous les messages
#    - Alerte (print spécial) si level == "FATAL"

# TODO: Implémenter
💡 Solution
# Producer
import random
levels = ["INFO", "INFO", "INFO", "WARN", "ERROR", "FATAL"]
for i in range(20):
    log = {
        "level": random.choice(levels),
        "message": f"Event {i}",
        "timestamp": datetime.now().isoformat()
    }
    producer.send('logs', value=log)

# Consumer
for msg in consumer:
    log = msg.value
    if log['level'] == 'FATAL':
        print(f"🚨 FATAL ALERT: {log['message']}")
    else:
        print(f"[{log['level']}] {log['message']}")

4.5 Aperçu de Faust (Stream Processing Python)

Faust est un framework Python pour le traitement de streams, inspiré de Kafka Streams (Java). Idéal pour du traitement léger sans Spark.

Voir le code
# Exemple Faust (aperçu)

faust_example = '''
import faust

# Créer l'application Faust
app = faust.App(
    'order-processor',
    broker='kafka://localhost:9092',
    value_serializer='json'
)

# Définir le schéma du message
class Order(faust.Record):
    order_id: str
    customer_id: str
    amount: float

# Topic source
orders_topic = app.topic('orders', value_type=Order)

# Agent de traitement (comme un consumer intelligent)
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        print(f"Processing: {order.order_id}")
        
        # Logique métier
        if order.amount > 1000:
            print(f"⚠️ High-value order: {order.amount}")
            # Envoyer vers un autre topic
            await high_value_topic.send(value=order)

# Table pour agrégation (state)
order_counts = app.Table('order-counts', default=int)

@app.agent(orders_topic)
async def count_by_customer(orders):
    async for order in orders:
        order_counts[order.customer_id] += 1
        print(f"{order.customer_id}: {order_counts[order.customer_id]} orders")

# Lancer avec: faust -A myapp worker -l info
'''

print(faust_example)
print("\n💡 Quand utiliser Faust vs Spark SSS :")
print("• Faust : Traitement léger, alertes, routing, < 100K events/s")
print("• Spark SSS : Agrégations complexes, ML, joins, gros volumes")

5. Spark Structured Streaming (SSS)

5.1 Le modèle de programmation

L’idée géniale de Spark Structured Streaming : traiter un stream comme un DataFrame qui grandit à l’infini.

                     Temps →
                     
Stream d'événements: [e1] [e2] [e3] [e4] [e5] [e6] ...
                      │    │    │    │    │    │
                      ▼    ▼    ▼    ▼    ▼    ▼
┌─────────────────────────────────────────────────────┐
│              DataFrame "illimité"                   │
│  ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐      │
│  │ e1  │ e2  │ e3  │ e4  │ e5  │ e6  │ ... │      │
│  └─────┴─────┴─────┴─────┴─────┴─────┴─────┘      │
│                                                     │
│  Tu écris le même code que pour un batch !          │
│  df.filter().groupBy().agg()                        │
└─────────────────────────────────────────────────────┘

Avantage : Tu utilises la même API DataFrame que tu connais déjà !

Voir le code
# Configuration Spark pour le streaming avec Kafka

spark_streaming_config = '''
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Kafka Streaming Demo") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "io.delta:delta-spark_2.12:3.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Réduire les logs pour plus de clarté
spark.sparkContext.setLogLevel("WARN")
'''

print(spark_streaming_config)

5.2 Lire depuis Kafka avec readStream

Voir le code
# Lire un stream Kafka

read_kafka = '''
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# ═══════════════════════════════════════════════════════════════
# Lecture du stream Kafka
# ═══════════════════════════════════════════════════════════════

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "earliest") \
    .load()

# Le DataFrame brut contient ces colonnes :
# key (binary), value (binary), topic, partition, offset, timestamp, timestampType

print("Schéma Kafka brut:")
kafka_df.printSchema()

# ═══════════════════════════════════════════════════════════════
# Désérialisation JSON
# ═══════════════════════════════════════════════════════════════

# Définir le schéma du message JSON
order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", StringType(), True),
    StructField("status", StringType(), True)
])

# Parser le JSON
orders_df = kafka_df \
    .selectExpr("CAST(key AS STRING) as customer_key", "CAST(value AS STRING) as json_value") \
    .select(
        col("customer_key"),
        from_json(col("json_value"), order_schema).alias("data")
    ) \
    .select("customer_key", "data.*")

print("Schéma après parsing:")
orders_df.printSchema()
'''

print(read_kafka)

5.3 Output Modes et Sinks

Output Mode Description Quand l’utiliser
Append Seulement les nouvelles lignes Sans agrégation
Complete Toute la table résultat Avec agrégation, petits résultats
Update Seulement les lignes modifiées Avec agrégation, grands résultats
Voir le code
# Écrire le stream (différents sinks)

write_stream = '''
# ═══════════════════════════════════════════════════════════════
# Sink Console (pour debug)
# ═══════════════════════════════════════════════════════════════

query_console = orders_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", False) \
    .start()

# ═══════════════════════════════════════════════════════════════
# Sink Parquet/Delta avec Checkpointing
# ═══════════════════════════════════════════════════════════════

query_delta = orders_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/orders") \
    .option("path", "s3a://silver/orders_streaming/") \
    .start()

# ═══════════════════════════════════════════════════════════════
# Sink Kafka (pour pipeline)
# ═══════════════════════════════════════════════════════════════

query_kafka = orders_df \
    .selectExpr("customer_id AS key", "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "orders-processed") \
    .option("checkpointLocation", "/tmp/checkpoints/orders-kafka") \
    .start()

# Attendre la terminaison
query_console.awaitTermination()
'''

print(write_stream)
print("\n💡 Le checkpointLocation est CRUCIAL pour :")
print("• Reprendre après un crash (exactly-once)")
print("• Stocker l'état des agrégations")
print("• Suivre les offsets Kafka")

Exercice 3 : Pipeline Kafka → Spark SSS → Console

# 1. Lire le topic 'events' créé à l'exercice 1
# 2. Parser le JSON
# 3. Filtrer les events avec level = 'ERROR' ou 'FATAL'
# 4. Afficher dans la console
💡 Solution
schema = StructType([
    StructField("level", StringType()),
    StructField("message", StringType()),
    StructField("timestamp", StringType())
])

events_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*") \
    .filter(col("level").isin("ERROR", "FATAL"))

query = events_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

6. Gestion du Temps et de l’État

6.1 Le problème du temps d’événement

En streaming, il y a deux temps différents :

Temps Description Exemple
Event Time Quand l’événement s’est produit Timestamp dans le message
Processing Time Quand Spark traite l’événement Heure du serveur
Problème des messages désordonnés :

Event Time:    10:00   10:01   10:02   10:03   10:04
                 │       │       │       │       │
                 ▼       ▼       ▼       ▼       ▼
Arrivée:       [e1]    [e3]    [e2]    [e5]    [e4]   ← Désordonnés !
                 │       │       │       │       │
Processing:   10:05   10:05   10:06   10:06   10:07

Si tu agrèges par Processing Time → résultat faux
Si tu agrèges par Event Time → résultat correct

6.2 Windowing (Fenêtrage temporel)

Pour agréger sur le temps, on utilise des fenêtres :

TUMBLING WINDOW (non-chevauchantes)     SLIDING WINDOW (chevauchantes)

   [────5min────][────5min────]         [────5min────]
                                             [────5min────]
   10:00      10:05      10:10                  [────5min────]
                                        
   Chaque event appartient à            Chaque event peut appartenir
   UNE SEULE fenêtre                    à PLUSIEURS fenêtres
Voir le code
# Windowing avec Spark SSS

windowing_example = '''
from pyspark.sql.functions import window, col, count, sum as spark_sum, to_timestamp

# ═══════════════════════════════════════════════════════════════
# Préparer le timestamp d'événement
# ═══════════════════════════════════════════════════════════════

orders_with_ts = orders_df \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# ═══════════════════════════════════════════════════════════════
# Tumbling Window : Agrégation par fenêtre de 5 minutes
# ═══════════════════════════════════════════════════════════════

tumbling_agg = orders_with_ts \
    .groupBy(
        window(col("event_time"), "5 minutes"),  # Fenêtre de 5 min
        col("customer_id")
    ) \
    .agg(
        count("*").alias("order_count"),
        spark_sum("amount").alias("total_amount")
    )

# ═══════════════════════════════════════════════════════════════
# Sliding Window : Fenêtre de 10 min, glissant toutes les 5 min
# ═══════════════════════════════════════════════════════════════

sliding_agg = orders_with_ts \
    .groupBy(
        window(col("event_time"), "10 minutes", "5 minutes"),  # 10 min, slide 5 min
        col("customer_id")
    ) \
    .agg(
        count("*").alias("order_count")
    )

# Le résultat contient une colonne "window" avec start/end
'''

print(windowing_example)

6.3 Watermarks : Gérer les données en retard

Problème : Combien de temps attendre les messages en retard avant de fermer une fenêtre ?

Sans Watermark :                    Avec Watermark (10 min) :
                                    
État infini ! 😱                    État limité ✅
                                    
Fenêtre 10:00-10:05                 Fenêtre 10:00-10:05
  └─ Garde l'état POUR TOUJOURS       └─ Ferme à 10:15 (event time)
     en attendant les retards            Late data après → ignoré
                                    
Mémoire : EXPLOSE 💥               Mémoire : Stable ✅
Voir le code
# Watermarks avec Spark SSS

watermark_example = '''
from pyspark.sql.functions import window, col, count, to_timestamp

# ═══════════════════════════════════════════════════════════════
# Agrégation avec Watermark
# ═══════════════════════════════════════════════════════════════

# Le watermark dit : "Je tolère jusqu'à 10 minutes de retard"
# Après 10 min, les données en retard sont ignorées et l'état nettoyé

windowed_counts = orders_with_ts \
    .withWatermark("event_time", "10 minutes") \  # ← Crucial !
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("customer_id")
    ) \
    .count()

# ═══════════════════════════════════════════════════════════════
# Comment ça marche ?
# ═══════════════════════════════════════════════════════════════
#
# 1. Spark suit le "max event time" vu jusqu'ici
# 2. Watermark = max_event_time - 10 minutes
# 3. Les fenêtres dont window.end < watermark sont finalisées
# 4. Les données avec event_time < watermark sont ignorées
#
# Exemple :
# - Max event time vu : 10:30
# - Watermark : 10:20
# - Fenêtre 10:00-10:05 : FINALISÉE (end 10:05 < 10:20)
# - Message avec event_time 10:18 : ACCEPTÉ
# - Message avec event_time 10:15 : IGNORÉ (< watermark)

# Écrire en mode Update (pour les agrégations)
query = windowed_counts.writeStream \
    .format("console") \
    .outputMode("update") \
    .option("truncate", False) \
    .start()
'''

print(watermark_example)
print("\n💡 Choisir le bon watermark :")
print("• Trop court (1 min) → Perd des données en retard")
print("• Trop long (1 heure) → Trop de mémoire utilisée")
print("• Règle : Analyser le retard typique de tes données")

Exercice 4 : Agrégation avec Watermark

# Calculer le nombre d'erreurs par fenêtre de 5 minutes
# avec un watermark de 10 minutes

# TODO: Lire depuis 'events', filtrer ERROR/FATAL, agréger par window
💡 Solution
error_counts = events_df \
    .filter(col("level").isin("ERROR", "FATAL")) \
    .withColumn("event_time", to_timestamp(col("timestamp"))) \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window(col("event_time"), "5 minutes")) \
    .count()

query = error_counts.writeStream \
    .format("console") \
    .outputMode("update") \
    .start()

7. Opérations Avancées et Sinks Transactionnelles

7.1 Streaming Joins

Spark SSS supporte plusieurs types de joins :

Type Description Exemple
Stream-Static Stream JOIN table batch Enrichir orders avec customers
Stream-Stream Deux streams JOIN clicks avec impressions
Voir le code
# Streaming Joins

join_examples = '''
# ═══════════════════════════════════════════════════════════════
# Stream-Static Join : Enrichir avec une dimension
# ═══════════════════════════════════════════════════════════════

# Table statique (dimension)
customers_df = spark.read.parquet("s3a://dims/customers/")

# Stream
orders_stream = spark.readStream.format("kafka")...

# Join
enriched_orders = orders_stream.join(
    customers_df,
    orders_stream.customer_id == customers_df.id,
    "left"  # LEFT JOIN pour garder toutes les commandes
)

# ═══════════════════════════════════════════════════════════════
# Stream-Stream Join : Deux flux
# ═══════════════════════════════════════════════════════════════

# Deux streams
clicks_stream = spark.readStream.format("kafka").option("subscribe", "clicks")...
impressions_stream = spark.readStream.format("kafka").option("subscribe", "impressions")...

# Join avec watermark (obligatoire pour stream-stream)
clicks_with_wm = clicks_stream \
    .withWatermark("click_time", "10 minutes")

impressions_with_wm = impressions_stream \
    .withWatermark("impression_time", "10 minutes")

# Join avec condition de temps
matched = clicks_with_wm.join(
    impressions_with_wm,
    expr("""
        clicks.ad_id = impressions.ad_id AND
        click_time >= impression_time AND
        click_time <= impression_time + interval 1 hour
    """),
    "inner"
)
'''

print(join_examples)

7.2 foreachBatch : Le pont entre streaming et batch

foreachBatch permet d’appliquer n’importe quelle logique batch sur chaque micro-batch. C’est crucial pour les upserts avec MERGE INTO.

Voir le code
# foreachBatch avec MERGE INTO Delta

foreach_batch_example = '''
from delta.tables import DeltaTable

# ═══════════════════════════════════════════════════════════════
# Fonction de traitement par micro-batch
# ═══════════════════════════════════════════════════════════════

def upsert_to_delta(micro_batch_df, batch_id):
    """
    Appelée pour chaque micro-batch.
    micro_batch_df : DataFrame Spark normal (pas streaming)
    batch_id : Identifiant unique du batch
    """
    print(f"Processing batch {batch_id} with {micro_batch_df.count()} records")
    
    # Vérifier si la table existe
    if DeltaTable.isDeltaTable(spark, "s3a://silver/customers/"):
        # Table existe → MERGE (upsert)
        target = DeltaTable.forPath(spark, "s3a://silver/customers/")
        
        target.alias("target").merge(
            micro_batch_df.alias("source"),
            "target.customer_id = source.customer_id"
        ).whenMatchedUpdate(
            set={
                "name": "source.name",
                "email": "source.email",
                "updated_at": "current_timestamp()"
            }
        ).whenNotMatchedInsert(
            values={
                "customer_id": "source.customer_id",
                "name": "source.name",
                "email": "source.email",
                "created_at": "current_timestamp()",
                "updated_at": "current_timestamp()"
            }
        ).execute()
    else:
        # Table n'existe pas → CREATE
        micro_batch_df.write \
            .format("delta") \
            .mode("overwrite") \
            .save("s3a://silver/customers/")

# ═══════════════════════════════════════════════════════════════
# Utiliser foreachBatch
# ═══════════════════════════════════════════════════════════════

query = customers_stream.writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "/tmp/checkpoints/customers-upsert") \
    .outputMode("update") \
    .start()
'''

print(foreach_batch_example)
print("\n💡 Avantages de foreachBatch :")
print("• Utiliser MERGE INTO (upserts)")
print("• Écrire vers plusieurs sinks")
print("• Appliquer n'importe quelle logique batch")
print("• Exactly-once avec checkpointing")

Exercice 5 : Upsert streaming avec foreachBatch

# Créer un pipeline qui :
# 1. Lit des updates de statut de commande depuis Kafka
# 2. Fait un MERGE INTO vers une table Delta 'order_status'
# 3. Met à jour le statut si la commande existe, sinon insère
💡 Solution
def upsert_order_status(df, batch_id):
    if DeltaTable.isDeltaTable(spark, "/tmp/order_status"):
        target = DeltaTable.forPath(spark, "/tmp/order_status")
        target.alias("t").merge(
            df.alias("s"), "t.order_id = s.order_id"
        ).whenMatchedUpdate(
            set={"status": "s.status", "updated_at": "current_timestamp()"}
        ).whenNotMatchedInsertAll().execute()
    else:
        df.write.format("delta").save("/tmp/order_status")

status_stream.writeStream \
    .foreachBatch(upsert_order_status) \
    .option("checkpointLocation", "/tmp/cp/status") \
    .start()

8. Kafka Connect & Debezium (Aperçu)

8.1 Qu’est-ce que Kafka Connect ?

Kafka Connect est un framework pour connecter Kafka à d’autres systèmes sans code :

┌─────────────────────────────────────────────────────────────────┐
│                     KAFKA CONNECT                               │
│                                                                 │
│   SOURCES                      SINKS                           │
│   ┌──────────────┐             ┌──────────────┐                │
│   │ PostgreSQL   │             │ Elasticsearch│                │
│   │ MySQL        │ ──▶ KAFKA ──▶│ S3           │                │
│   │ MongoDB      │             │ Snowflake    │                │
│   │ Files (CSV)  │             │ BigQuery     │                │
│   │ APIs         │             │ Redis        │                │
│   └──────────────┘             └──────────────┘                │
│                                                                 │
│   Configuration JSON, pas de code !                            │
└─────────────────────────────────────────────────────────────────┘

8.2 Debezium : CDC temps réel

Debezium est un connecteur Kafka Connect pour le Change Data Capture :

┌────────────────┐         ┌──────────────┐         ┌────────────┐
│   PostgreSQL   │         │   Debezium   │         │   Kafka    │
│                │         │              │         │            │
│   WAL logs ────┼────────▶│  Connector   │────────▶│  Topic     │
│   (changes)    │         │              │         │  per table │
└────────────────┘         └──────────────┘         └────────────┘

Chaque INSERT/UPDATE/DELETE → Message Kafka automatique !
Voir le code
# Exemple de configuration Debezium

debezium_config = '''
{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "inventory",
    "database.server.name": "dbserver1",
    "table.include.list": "public.customers,public.orders",
    "plugin.name": "pgoutput",
    
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}
'''

print("Configuration Debezium pour PostgreSQL :")
print(debezium_config)
print("\n💡 Quand utiliser Debezium vs code custom :")
print("• Debezium : CDC standard depuis DB, pas de code à maintenir")
print("• Code custom : Logique complexe, sources non supportées")

9. Déploiement & Observabilité

9.1 Déployer SSS sur Kubernetes

Pour la production, on utilise le Spark Operator (voir Module 21) :

Voir le code
# SparkApplication pour un job streaming

spark_streaming_k8s = '''
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: kafka-to-delta-streaming
  namespace: spark
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: my-registry/spark-streaming:latest
  mainApplicationFile: s3a://code/streaming_job.py
  sparkVersion: "3.5.0"
  
  # Important pour le streaming !
  restartPolicy:
    type: Always  # Redémarrer automatiquement si crash
    onFailureRetries: 3
    onFailureRetryInterval: 60
  
  driver:
    cores: 1
    memory: "2g"
  
  executor:
    cores: 2
    instances: 3
    memory: "4g"
  
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
      - io.delta:delta-spark_2.12:3.1.0
'''

print(spark_streaming_k8s)

9.2 Métriques à surveiller

Métrique Description Seuil d’alerte
Input Rate Messages/sec entrants -
Processing Rate Messages/sec traités < Input Rate
Batch Duration Temps de traitement > Trigger interval
Backlog Messages en attente Croissant
Watermark Lag Retard du watermark > Seuil attendu
Voir le code
# Monitoring du streaming

monitoring = '''
# ═══════════════════════════════════════════════════════════════
# Accéder aux métriques dans le code
# ═══════════════════════════════════════════════════════════════

query = df.writeStream...

# Progression du dernier batch
print(query.lastProgress)

# Status actuel
print(query.status)

# Exemple de lastProgress :
# {
#   "id": "abc123",
#   "runId": "def456",
#   "batchId": 42,
#   "numInputRows": 1000,
#   "inputRowsPerSecond": 500.0,
#   "processedRowsPerSecond": 450.0,
#   "durationMs": {
#     "triggerExecution": 2000,
#     "getBatch": 100,
#     "queryPlanning": 50
#   },
#   "eventTime": {
#     "watermark": "2024-01-15T10:20:00.000Z"
#   }
# }

# ═══════════════════════════════════════════════════════════════
# Listener pour métriques custom
# ═══════════════════════════════════════════════════════════════

class MetricsListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")
    
    def onQueryProgress(self, event):
        # Envoyer vers Prometheus/Grafana
        metrics.gauge("streaming_input_rate", event.progress.inputRowsPerSecond)
        metrics.gauge("streaming_batch_duration", event.progress.durationMs["triggerExecution"])
    
    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

spark.streams.addListener(MetricsListener())
'''

print(monitoring)

10. Mini-Projet : Pipeline Temps Réel Complet

Objectif

Construire un pipeline d’ingestion transactionnel : Python Producer → Kafka → Spark SSS → Delta Lake

┌────────────────┐   ┌───────────────┐   ┌──────────────────┐   ┌──────────────┐
│   Producteur   │   │     Kafka     │   │ Spark Structured │   │    Delta     │
│   (Python)     │──▶│   (Docker)    │──▶│    Streaming     │──▶│    Lake      │
│                │   │               │   │                  │   │              │
│ Simule des     │   │ Topic:        │   │ • Watermark      │   │ • MERGE INTO │
│ transactions   │   │ transactions  │   │ • Window 5 min   │   │ • Silver     │
└────────────────┘   └───────────────┘   └──────────────────┘   └──────────────┘
Voir le code
# ÉTAPE 1 : Producteur Python

producer_code = '''
from kafka import KafkaProducer
import json
import time
import random
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

customers = ["CUST-001", "CUST-002", "CUST-003", "CUST-004", "CUST-005"]
products = ["Laptop", "Phone", "Tablet", "Watch", "Headphones"]

print("🚀 Sending transactions...")

for i in range(100):
    transaction = {
        "transaction_id": f"TXN-{i:06d}",
        "customer_id": random.choice(customers),
        "product": random.choice(products),
        "amount": round(random.uniform(10, 500), 2),
        "timestamp": datetime.now().isoformat()
    }
    
    producer.send("transactions", value=transaction)
    print(f"Sent: {transaction[\'transaction_id\']}")
    time.sleep(0.5)  # Simuler un flux

producer.flush()
print("✅ Done!")
'''

print("# producer.py")
print(producer_code)
Voir le code
# ÉTAPE 2 : Job Spark Streaming

streaming_job = '''
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum, count, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from delta.tables import DeltaTable

# Spark Session
spark = SparkSession.builder \
    .appName("Realtime Transactions") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "io.delta:delta-spark_2.12:3.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# Schéma
schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product", StringType()),
    StructField("amount", DoubleType()),
    StructField("timestamp", StringType())
])

# Lire Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .load()

# Parser JSON
transactions = raw_stream \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# Agrégation par fenêtre de 5 min avec watermark 10 min
windowed_stats = transactions \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("customer_id")
    ) \
    .agg(
        count("*").alias("transaction_count"),
        spark_sum("amount").alias("total_amount")
    )

# Fonction pour upsert vers Delta
def upsert_to_delta(batch_df, batch_id):
    # Flatten window column
    flat_df = batch_df.selectExpr(
        "window.start as window_start",
        "window.end as window_end",
        "customer_id",
        "transaction_count",
        "total_amount"
    )
    
    if flat_df.count() == 0:
        return
    
    target_path = "/tmp/delta/customer_stats"
    
    if DeltaTable.isDeltaTable(spark, target_path):
        target = DeltaTable.forPath(spark, target_path)
        target.alias("t").merge(
            flat_df.alias("s"),
            "t.window_start = s.window_start AND t.customer_id = s.customer_id"
        ).whenMatchedUpdate(set={
            "transaction_count": "s.transaction_count",
            "total_amount": "s.total_amount"
        }).whenNotMatchedInsertAll().execute()
    else:
        flat_df.write.format("delta").save(target_path)
    
    print(f"Batch {batch_id}: Processed {flat_df.count()} records")

# Lancer le stream
query = windowed_stats.writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "/tmp/checkpoints/customer_stats") \
    .outputMode("update") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()
'''

print("# streaming_job.py")
print(streaming_job)

Quiz

Q1. Différence entre Event Time et Processing Time ?
R Event Time = quand l’événement s’est produit. Processing Time = quand Spark le traite.
Q2. Rôle du Watermark ?
R Définir la tolérance au retard et permettre le nettoyage de l’état.
Q3. Différence At-least-once vs Exactly-once ?
R At-least-once peut dupliquer. Exactly-once garantit un seul traitement.
Q4. Pourquoi utiliser foreachBatch ?
R Pour appliquer une logique batch (MERGE INTO) sur chaque micro-batch.
Q5. Qu’est-ce qu’un Consumer Group ?
R Groupe de consumers qui se partagent les partitions pour paralléliser.
Q6. Tumbling vs Sliding Window ?
R Tumbling = non-chevauchantes. Sliding = chevauchantes.
Q7. Rôle du checkpointLocation ?
R Stocker les offsets et l’état pour recovery et exactly-once.
Q8. Quand utiliser Debezium ?
R Pour du CDC temps réel depuis une base de données vers Kafka.

📚 Ressources


➡️ Prochaine étape

👉 Module 25 : 25_dbt_data_quality — dbt + Data Quality


📝 Récapitulatif

Concept Appris
Kafka Topics, Partitions, Offsets, Consumer Groups
Python Kafka kafka-python, confluent-kafka, Faust
Spark SSS readStream, writeStream, Output Modes
Temps Event Time, Watermarks, Windowing
Avancé foreachBatch, MERGE INTO, Stream Joins
Ops Checkpointing, Monitoring, K8s

🎉 Félicitations ! Tu maîtrises maintenant le streaming de données.

Retour au sommet