📨 Distributed Messaging pour Data Engineers

Bienvenue dans ce module avancé où tu vas maîtriser les systèmes de messaging distribués. Tu apprendras les fonctionnalités avancées de Kafka, les alternatives comme RabbitMQ et Pulsar, et le Change Data Capture avec Debezium — des compétences essentielles pour construire des architectures data temps réel !


Prérequis

Niveau Compétence
✅ Requis Avoir suivi le module 24_kafka_streaming (Kafka, Spark SSS, Watermarks)
✅ Requis Maîtriser topics, partitions, offsets, consumer groups
✅ Requis Connaître kafka-python et confluent-kafka (producers/consumers)
✅ Requis Maîtriser Spark Structured Streaming (readStream, writeStream, foreachBatch)
✅ Requis Connaissances en Docker et Kubernetes (M14-M16, M27)
✅ Requis Bases de données relationnelles et SQL (pour Debezium CDC)
💡 Recommandé Expérience avec des pipelines streaming en production

🎯 Objectifs du module

À la fin de ce module, tu seras capable de :

  • Configurer les fonctionnalités avancées de Kafka (Quotas, Tiered Storage, Transactions)
  • Comprendre et choisir entre Kafka, RabbitMQ et Pulsar
  • Implémenter le Change Data Capture complet avec Debezium
  • Concevoir des architectures de messaging robustes
  • Gérer les patterns avancés : exactly-once, dead letter queues, event sourcing

Rappel : Ce qu’on a vu en M24 vs Ce qu’on approfondit ici

Module M24 (Intermediate) Ce module M29 (Advanced)
Architecture Lambda vs Kappa
Topics, Partitions, Offsets, Consumer Groups Tiered Storage, Quotas, Monitoring
Producers / Consumers (kafka-python, confluent-kafka) Transactions Kafka, Exactly-Once (EOS)
Schema Registry basics (Avro) Schema Registry avancé (compatibilité, évolution)
Spark Structured Streaming complet
Windowing, Watermarks, foreachBatch
Faust (aperçu)
Debezium (mentionné) Debezium CDC en profondeur
RabbitMQ (alternative queue-based)
Apache Pulsar (alternative multi-tenant)
Patterns : DLQ, Saga, Event Sourcing, CQRS

Schéma : Écosystème Messaging

┌─────────────────────────────────────────────────────────────────────────────┐
│                    DISTRIBUTED MESSAGING LANDSCAPE                          │
│                                                                             │
│   ┌─────────────────┐   ┌─────────────────┐   ┌─────────────────┐          │
│   │     KAFKA       │   │   RABBITMQ      │   │     PULSAR      │          │
│   │  Log-based      │   │  Queue-based    │   │  Multi-tenant   │          │
│   │  High throughput│   │  Flexible routing│  │  Geo-replication│          │
│   └─────────────────┘   └─────────────────┘   └─────────────────┘          │
│            │                    │                     │                     │
│            └────────────────────┼─────────────────────┘                     │
│                                 │                                           │
│                                 ▼                                           │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                        DEBEZIUM (CDC)                                │  │
│   │   Capture changes from databases → Stream to messaging systems      │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

ℹ️ Le savais-tu ?

Kafka traite plus de 7 trillions de messages par jour chez LinkedIn, son créateur.

RabbitMQ a été créé en 2007 et implémente le protocole AMQP, un standard ouvert pour le messaging.

Apache Pulsar a été développé par Yahoo! pour gérer leurs 100 milliards de messages quotidiens avec une architecture multi-tenant native.

Debezium (du latin “from the beginning”) capture chaque changement depuis le début du log de la base de données — c’est la base du Change Data Capture.

📖 Kafka at LinkedIn


1. Kafka Avancé

Cette section couvre les fonctionnalités avancées de Kafka pour la production à grande échelle. Tu connais déjà les bases (M24), on passe directement aux sujets avancés.

1.1 Quotas et Throttling

Les quotas permettent de limiter les ressources consommées par les clients pour éviter qu’un client ne monopolise le cluster.

Types de quotas

Quota Description Unité
producer_byte_rate Débit max en écriture bytes/sec
consumer_byte_rate Débit max en lecture bytes/sec
request_percentage % CPU du broker %
controller_mutation_rate Taux de mutations (create/delete) mutations/sec

Configurer les quotas

# Quota par user
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152' \
  --entity-type users --entity-name data-pipeline-user

# Quota par client-id
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'producer_byte_rate=5242880' \
  --entity-type clients --entity-name etl-producer

# Quota par user + client-id (plus spécifique)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'producer_byte_rate=10485760' \
  --entity-type users --entity-name spark-user \
  --entity-type clients --entity-name spark-producer

# Quota par défaut pour tous les users
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'producer_byte_rate=1048576' \
  --entity-type users --entity-default

# Voir les quotas configurés
kafka-configs.sh --bootstrap-server localhost:9092 \
  --describe --entity-type users --entity-name data-pipeline-user

Quotas dans le code Python

from confluent_kafka import Producer

# Le client doit spécifier son client.id pour être identifié par les quotas
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'etl-producer',  # Identifiant pour les quotas
    'acks': 'all',
})

# Si le quota est dépassé, Kafka throttle automatiquement le client
# Le producer recevra des délais dans les réponses

1.2 Tiered Storage (KIP-405)

Le Tiered Storage permet de stocker les données anciennes sur un stockage moins cher (S3, GCS, Azure Blob) tout en gardant les données récentes sur disque local.

┌─────────────────────────────────────────────────────────────────────────────┐
│                         TIERED STORAGE                                      │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    LOCAL TIER (Hot Data)                             │  │
│   │   ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                           │  │
│   │   │Seg 5│ │Seg 6│ │Seg 7│ │Seg 8│ │Seg 9│  ← Données récentes      │  │
│   │   └─────┘ └─────┘ └─────┘ └─────┘ └─────┘    (SSD local, rapide)    │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                    │                                        │
│                        Offload automatique                                  │
│                                    ▼                                        │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                   REMOTE TIER (Cold Data)                            │  │
│   │   ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                                   │  │
│   │   │Seg 1│ │Seg 2│ │Seg 3│ │Seg 4│  ← Données anciennes             │  │
│   │   └─────┘ └─────┘ └─────┘ └─────┘    (S3/GCS/Azure, économique)    │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

Configuration Tiered Storage

# server.properties (broker)

# Activer le tiered storage
remote.log.storage.system.enable=true

# Plugin de stockage (exemple S3)
remote.log.storage.manager.class.name=org.apache.kafka.tiered.storage.s3.S3RemoteStorageManager
remote.log.storage.manager.class.path=/opt/kafka/plugins/tiered-storage-s3.jar

# Configuration S3
remote.log.storage.s3.bucket=my-kafka-tiered-storage
remote.log.storage.s3.region=eu-west-1

# Rétention locale (données chaudes)
local.retention.ms=86400000  # 1 jour en local

# Rétention totale (incluant remote)
retention.ms=2592000000  # 30 jours au total
# Activer le tiered storage sur un topic existant
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --entity-type topics --entity-name events \
  --add-config 'remote.storage.enable=true,local.retention.ms=86400000,retention.ms=2592000000'

Avantages du Tiered Storage

Avantage Description
Coût réduit Stockage S3 ~10x moins cher que SSD
Rétention illimitée Garder des années de données
Cluster plus petit Moins de disque local nécessaire
Replay facilité Relire des données anciennes pour reprocessing

1.3 Transactions et Exactly-Once Semantics (EOS)

En M24, tu as vu les garanties de livraison (at-most-once, at-least-once, exactly-once). Ici, on va implémenter exactly-once avec les transactions Kafka.

Rappel des garanties

Niveau Description Risque
At-most-once Fire & forget Perte de messages
At-least-once Retry jusqu’à ACK Doublons possibles
Exactly-once Transactions + idempotence Aucun (mais plus complexe)

Producer Idempotent (pas de doublons)

L’idempotence garantit qu’un message n’est écrit qu’une seule fois même en cas de retry réseau.

from confluent_kafka import Producer

# Producer idempotent — PAS de duplicatas même avec retries
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,  # ← Active l'idempotence
    'acks': 'all',               # Requis pour idempotence
    'retries': 2147483647,       # Retries infinis (best practice)
    'max.in.flight.requests.per.connection': 5,  # Max 5 avec idempotence
})

# Comment ça marche ?
# 1. Le producer assigne un Producer ID (PID) et un Sequence Number à chaque message
# 2. Le broker détecte les doublons en comparant (PID, Sequence)
# 3. Si un retry envoie le même message, le broker le reconnaît et ignore le doublon

Transactions complètes (multi-topics atomique)

Les transactions permettent d’écrire sur plusieurs topics/partitions de manière atomique : tout réussit ou tout échoue.

from confluent_kafka import Producer, KafkaException

# Producer transactionnel
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'etl-pipeline-001',  # ID unique et STABLE
    'enable.idempotence': True,              # Implicitement activé
    'acks': 'all',
})

# Initialiser les transactions (une seule fois au démarrage)
producer.init_transactions()

try:
    # Démarrer une transaction
    producer.begin_transaction()
    
    # Écrire sur PLUSIEURS topics (atomique)
    producer.produce('orders-processed', key='order-1', value='{"status": "done"}')
    producer.produce('audit-log', key='order-1', value='{"action": "order_processed"}')
    producer.produce('metrics', key='counter', value='{"orders_processed": 1}')
    
    # Commit la transaction — TOUT ou RIEN
    producer.commit_transaction()
    print("✅ Transaction committed successfully")
    
except KafkaException as e:
    # Abort en cas d'erreur — aucun message n'est visible
    producer.abort_transaction()
    print(f"❌ Transaction aborted: {e}")

Pattern Read-Process-Write (Exactly-Once complet)

Le pattern le plus puissant : lire, traiter, écrire, et commiter les offsets dans la même transaction.

from confluent_kafka import Consumer, Producer, KafkaException

# Consumer avec isolation transactionnelle
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'etl-group',
    'isolation.level': 'read_committed',  # ← Ne lit que les messages committés
    'enable.auto.commit': False,          # ← Commit manuel dans la transaction
})

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'etl-processor-001',
    'enable.idempotence': True,
})

producer.init_transactions()
consumer.subscribe(['raw-events'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        continue
    
    try:
        # 1. Démarrer la transaction
        producer.begin_transaction()
        
        # 2. Traiter le message
        processed = process_message(msg.value())
        
        # 3. Écrire le résultat
        producer.produce('processed-events', value=processed)
        
        # 4. Commit les offsets DANS la transaction
        producer.send_offsets_to_transaction(
            consumer.position(consumer.assignment()),
            consumer.consumer_group_metadata()
        )
        
        # 5. Commit atomique : écriture + offset ensemble
        producer.commit_transaction()
        
    except Exception as e:
        producer.abort_transaction()
        print(f"Transaction failed: {e}")

1.4 Schema Registry Avancé

En M24, tu as vu les bases du Schema Registry avec Avro. Approfondissons les modes de compatibilité et l’évolution de schémas.

Modes de compatibilité détaillés

Mode Nouveau consumer lit ancien Ancien consumer lit nouveau Changements autorisés
BACKWARD ✅ Oui ❌ Non Ajouter champs optionnels, supprimer champs
FORWARD ❌ Non ✅ Oui Ajouter champs, supprimer champs optionnels
FULL ✅ Oui ✅ Oui Ajouter/supprimer champs optionnels uniquement
NONE Tout (⚠️ dangereux en production)
# Configurer la compatibilité globale
curl -X PUT http://localhost:8081/config \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

# Configurer par sujet (override global)
curl -X PUT http://localhost:8081/config/orders-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "FULL"}'

# Tester la compatibilité AVANT de publier un nouveau schéma
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/json" \
  -d '{"schema": "{...}"}'
# Réponse: {"is_compatible": true} ou {"is_compatible": false}

Évolution de schéma — Exemple pratique

# Version 1 du schéma
schema_v1 = '''
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}
'''

# Version 2 — Ajouter un champ optionnel (BACKWARD compatible)
schema_v2 = '''
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}
'''
# ✅ Les nouveaux consumers peuvent lire les anciens messages (currency = "EUR" par défaut)

2. RabbitMQ — L’Alternative Queue-Based

RabbitMQ est un message broker traditionnel implémentant le protocole AMQP. Contrairement à Kafka (log-based), RabbitMQ est queue-based avec un routage flexible.

Kafka vs RabbitMQ — Comparaison détaillée

Aspect Kafka RabbitMQ
Modèle Log distribué (append-only) Message queue (FIFO)
Persistance Toujours sur disque Optionnelle (mémoire ou disque)
Ordre Garanti par partition Garanti par queue
Replay ✅ Natif (offsets) ❌ Messages supprimés après ACK
Routage Topics + Partitions Exchanges (fanout, direct, topic, headers)
Throughput Très élevé (millions/sec) Élevé (dizaines de milliers/sec)
Latence Millisecondes Sub-milliseconde
Use case principal Event streaming, analytics Task queues, RPC, notifications

Architecture RabbitMQ

┌─────────────────────────────────────────────────────────────────────────────┐
│                         RABBITMQ ARCHITECTURE                               │
│                                                                             │
│   Producer ────────┐                                                        │
│                    │                                                        │
│                    ▼                                                        │
│            ┌───────────────┐                                                │
│            │   EXCHANGE    │  ← Routing logic (type: fanout/direct/topic)  │
│            └───────┬───────┘                                                │
│                    │ Bindings (routing rules)                               │
│        ┌───────────┼───────────┐                                            │
│        │           │           │                                            │
│        ▼           ▼           ▼                                            │
│   ┌─────────┐ ┌─────────┐ ┌─────────┐                                       │
│   │ Queue 1 │ │ Queue 2 │ │ Queue 3 │  ← Messages stockés ici              │
│   └────┬────┘ └────┬────┘ └────┬────┘                                       │
│        │           │           │                                            │
│        ▼           ▼           ▼                                            │
│   Consumer 1   Consumer 2   Consumer 3                                      │
└─────────────────────────────────────────────────────────────────────────────┘

Types d’Exchanges

Exchange Routage Use case
Direct routing_key exact match Logs par niveau (error→error_queue)
Fanout Broadcast à toutes les queues liées Notifications, cache invalidation
Topic Pattern matching (*.error, logs.#) Logs multi-critères (app.module.level)
Headers Match sur headers du message Routage complexe multi-attributs

Installation

# Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

# Management UI: http://localhost:15672 (guest/guest)

Producer Python (pika)

import pika
import json

# Connexion
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Déclarer un exchange de type topic
channel.exchange_declare(exchange='data_events', exchange_type='topic', durable=True)

# Publier un message
message = {'event_type': 'order_created', 'order_id': 'ORD-001', 'amount': 99.99}

channel.basic_publish(
    exchange='data_events',
    routing_key='orders.created',
    body=json.dumps(message),
    properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
)

connection.close()

Consumer Python

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_processor', durable=True)
channel.queue_bind(exchange='data_events', queue='order_processor', routing_key='orders.*')

def callback(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received: {message}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='order_processor', on_message_callback=callback)
channel.start_consuming()

Quand utiliser RabbitMQ vs Kafka ?

✅ RabbitMQ ✅ Kafka
Task queues (jobs async) Event streaming temps réel
RPC (request/reply) Log aggregation
Routage complexe (exchanges) Replay de données historiques
Faible latence critique (<1ms) Très haut throughput (millions/sec)

3. Apache Pulsar — Le Challenger Multi-Tenant

Apache Pulsar combine les avantages de Kafka (log-based) et RabbitMQ (queuing) avec une architecture cloud-native.

Kafka vs Pulsar

Aspect Kafka Pulsar
Architecture Brokers = Storage + Compute Séparation Brokers / BookKeeper
Multi-tenancy Limité Natif (tenants, namespaces)
Geo-replication MirrorMaker (externe) Natif et synchrone
Queuing Non natif Natif (shared subscriptions)

Types de Subscriptions Pulsar

Type Description Équivalent
Exclusive 1 seul consumer Kafka standard
Failover Failover automatique
Shared Load balanced (round-robin) RabbitMQ
Key_Shared Ordre par clé Kafka partitions

Producer/Consumer Python

import pulsar
import json

client = pulsar.Client('pulsar://localhost:6650')

# Producer
producer = client.create_producer('persistent://public/default/orders')
producer.send(json.dumps({'order_id': 'ORD-001'}).encode('utf-8'))

# Consumer (shared = load balanced)
consumer = client.subscribe(
    'persistent://public/default/orders',
    subscription_name='order-processor',
    consumer_type=pulsar.ConsumerType.Shared
)

msg = consumer.receive()
print(json.loads(msg.data()))
consumer.acknowledge(msg)
client.close()

4. Debezium — Change Data Capture en Profondeur

Debezium est une plateforme open-source de Change Data Capture (CDC) qui capture les changements dans les bases de données et les streame vers Kafka.

En M24, Debezium était mentionné dans le quiz. Ici, on l’implémente en profondeur.

Pourquoi le CDC ?

Approche traditionnelle CDC avec Debezium
Batch ETL (SELECT * toutes les heures) Streaming temps réel
Query la DB source (charge CPU/IO) Lit le transaction log (léger)
Détection des DELETEs difficile Capture TOUS les changements
Latence élevée (heures) Latence sub-seconde

Architecture Debezium

┌─────────────────────────────────────────────────────────────────────────────┐
│                         DEBEZIUM ARCHITECTURE                               │
│                                                                             │
│   ┌─────────────┐                                                           │
│   │  PostgreSQL │     Transaction Log (WAL)                                 │
│   │   (source)  │──────────────────┐                                        │
│   └─────────────┘                  │                                        │
│                                    ▼                                        │
│   ┌─────────────┐    ┌─────────────────────┐    ┌─────────────────────┐    │
│   │    MySQL    │───▶│   KAFKA CONNECT     │───▶│   KAFKA TOPICS      │    │
│   │   (source)  │    │  + Debezium         │    │  (change events)    │    │
│   └─────────────┘    │    Connectors       │    │                     │    │
│                      └─────────────────────┘    │  • dbserver.schema  │    │
│   ┌─────────────┐                               │    .table           │    │
│   │   MongoDB   │───────────────────────────────┘                          │
│   │   (source)  │                                         │                │
│   └─────────────┘                                         ▼                │
│                                                ┌─────────────────────┐     │
│                                                │    Consumers        │     │
│                                                │  • Data Warehouse   │     │
│                                                │  • Elasticsearch    │     │
│                                                │  • Microservices    │     │
│                                                └─────────────────────┘     │
└─────────────────────────────────────────────────────────────────────────────┘

Connecteurs Debezium

Base de données Méthode de capture Maturité
PostgreSQL Logical replication (pgoutput) ⭐⭐⭐⭐⭐
MySQL/MariaDB Binary log (binlog) ⭐⭐⭐⭐⭐
MongoDB Oplog / Change Streams ⭐⭐⭐⭐⭐
SQL Server CDC tables ⭐⭐⭐⭐
Oracle LogMiner ⭐⭐⭐⭐

Docker Compose complet

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

  postgres:
    image: postgres:15
    ports: ["5432:5432"]
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: source_db
    command: ["postgres", "-c", "wal_level=logical"]  # CRUCIAL

  connect:
    image: debezium/connect:2.5
    depends_on: [kafka, postgres]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: debezium-connect
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Enregistrer le connecteur

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "source_db",
      "topic.prefix": "cdc",
      "table.include.list": "public.orders",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "transforms.unwrap.delete.handling.mode": "rewrite",
      "transforms.unwrap.add.fields": "op,source.ts_ms"
    }
  }'

Format des messages Debezium

Champ Description
before État AVANT le changement (null pour INSERT)
after État APRÈS le changement (null pour DELETE)
op Opération : c=create, u=update, d=delete, r=read (snapshot)
source Métadonnées (table, transaction ID, LSN)

Consumer Python CDC

from confluent_kafka import Consumer
import json

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'cdc-processor',
    'auto.offset.reset': 'earliest',
})
consumer.subscribe(['cdc.public.orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    
    event = json.loads(msg.value())
    op = event.get('__op')
    data = {k: v for k, v in event.items() if not k.startswith('__')}
    
    if op in ('c', 'r'):  # INSERT ou SNAPSHOT
        print(f"INSERT: {data}")
    elif op == 'u':  # UPDATE
        print(f"UPDATE: {data}")
    elif op == 'd':  # DELETE
        print(f"DELETE: id={data.get('id')}")

Outbox Pattern

Le Outbox Pattern garantit la cohérence entre les mises à jour DB et l’envoi d’événements.

-- Table outbox
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Dans une TRANSACTION
BEGIN;
    UPDATE orders SET status = 'shipped' WHERE id = 1;
    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES ('Order', '1', 'OrderShipped', '{"order_id": 1}');
COMMIT;
-- Debezium capture l'INSERT dans outbox → Kafka

Use Cases CDC

Use Case Description
Sync Data Warehouse Réplication temps réel vers Snowflake, BigQuery
Cache invalidation Invalider Redis quand la DB change
Search indexing Sync vers Elasticsearch
Microservices events Outbox pattern
Audit log Compliance, RGPD

5. Patterns de Messaging Distribué

5.1 Dead Letter Queue (DLQ)

MAX_RETRIES = 3

while True:
    msg = consumer.poll(1.0)
    headers = dict(msg.headers() or [])
    retry_count = int(headers.get('retry_count', b'0'))
    
    try:
        process_message(msg.value())
        consumer.commit(msg)
    except Exception as e:
        if retry_count >= MAX_RETRIES:
            # Envoyer vers DLQ
            producer.produce('orders-dlq', key=msg.key(), value=msg.value(),
                           headers=[('error', str(e))])
        else:
            # Retry
            producer.produce('orders-retry', key=msg.key(), value=msg.value(),
                           headers=[('retry_count', str(retry_count + 1))])
        consumer.commit(msg)

5.2 Saga Pattern

Create Order ──▶ Reserve Stock ──▶ Process Payment ──▶ Ship Order
      │               │                  │
      ▼               ▼                  ▼
Cancel Order ◀── Release Stock ◀── Refund Payment  (compensation)

5.3 Event Sourcing

events = [
    {'type': 'OrderCreated', 'order_id': 1, 'amount': 100},
    {'type': 'PaymentReceived', 'order_id': 1},
    {'type': 'OrderShipped', 'order_id': 1},
]

def rebuild_state(events):
    state = {}
    for e in events:
        if e['type'] == 'OrderCreated':
            state = {'id': e['order_id'], 'status': 'created'}
        elif e['type'] == 'PaymentReceived':
            state['status'] = 'paid'
        elif e['type'] == 'OrderShipped':
            state['status'] = 'shipped'
    return state

5.4 CQRS

Commands ──▶ Events (Kafka) ──▶ Projector ──▶ Read Model (optimized)
   │                                              │
   ▼                                              ▼
Write DB                                      Query API
(PostgreSQL)                                  (Elasticsearch)

6. Exercices Pratiques

Exercice 1 : Kafka Transactions

Implémenter un producer transactionnel qui écrit sur 2 topics atomiquement.

Exercice 2 : RabbitMQ Task Queue

Créer une task queue avec priorités et DLQ.

Exercice 3 : Pipeline CDC Complet

Déployer PostgreSQL + Kafka + Debezium et sync vers un data warehouse.

Exercice 4 : Comparatif Performance

Comparer Kafka vs RabbitMQ (100K messages, throughput, latence).

Exercice 5 : Outbox Pattern

Implémenter le pattern Outbox avec Debezium.


📚 Ressources


➡️ Prochaine étape

👉 Module suivant : 30_spark_scala_deep_dive — Spark & Scala Deep Dive


🎉 Félicitations ! Tu as terminé le module Distributed Messaging.

Retour au sommet