📨 Distributed Messaging pour Data Engineers
Bienvenue dans ce module avancé où tu vas maîtriser les systèmes de messaging distribués. Tu apprendras les fonctionnalités avancées de Kafka, les alternatives comme RabbitMQ et Pulsar, et le Change Data Capture avec Debezium — des compétences essentielles pour construire des architectures data temps réel !
Prérequis
| Niveau | Compétence |
|---|---|
| ✅ Requis | Avoir suivi le module 24_kafka_streaming (Kafka, Spark SSS, Watermarks) |
| ✅ Requis | Maîtriser topics, partitions, offsets, consumer groups |
| ✅ Requis | Connaître kafka-python et confluent-kafka (producers/consumers) |
| ✅ Requis | Maîtriser Spark Structured Streaming (readStream, writeStream, foreachBatch) |
| ✅ Requis | Connaissances en Docker et Kubernetes (M14-M16, M27) |
| ✅ Requis | Bases de données relationnelles et SQL (pour Debezium CDC) |
| 💡 Recommandé | Expérience avec des pipelines streaming en production |
🎯 Objectifs du module
À la fin de ce module, tu seras capable de :
- Configurer les fonctionnalités avancées de Kafka (Quotas, Tiered Storage, Transactions)
- Comprendre et choisir entre Kafka, RabbitMQ et Pulsar
- Implémenter le Change Data Capture complet avec Debezium
- Concevoir des architectures de messaging robustes
- Gérer les patterns avancés : exactly-once, dead letter queues, event sourcing
Rappel : Ce qu’on a vu en M24 vs Ce qu’on approfondit ici
| Module M24 (Intermediate) | Ce module M29 (Advanced) |
|---|---|
| Architecture Lambda vs Kappa | — |
| Topics, Partitions, Offsets, Consumer Groups | Tiered Storage, Quotas, Monitoring |
| Producers / Consumers (kafka-python, confluent-kafka) | Transactions Kafka, Exactly-Once (EOS) |
| Schema Registry basics (Avro) | Schema Registry avancé (compatibilité, évolution) |
| Spark Structured Streaming complet | — |
| Windowing, Watermarks, foreachBatch | — |
| Faust (aperçu) | — |
| Debezium (mentionné) | Debezium CDC en profondeur |
| — | RabbitMQ (alternative queue-based) |
| — | Apache Pulsar (alternative multi-tenant) |
| — | Patterns : DLQ, Saga, Event Sourcing, CQRS |
Schéma : Écosystème Messaging
┌─────────────────────────────────────────────────────────────────────────────┐
│ DISTRIBUTED MESSAGING LANDSCAPE │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ KAFKA │ │ RABBITMQ │ │ PULSAR │ │
│ │ Log-based │ │ Queue-based │ │ Multi-tenant │ │
│ │ High throughput│ │ Flexible routing│ │ Geo-replication│ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │ │ │ │
│ └────────────────────┼─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DEBEZIUM (CDC) │ │
│ │ Capture changes from databases → Stream to messaging systems │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
ℹ️ Le savais-tu ?
Kafka traite plus de 7 trillions de messages par jour chez LinkedIn, son créateur.
RabbitMQ a été créé en 2007 et implémente le protocole AMQP, un standard ouvert pour le messaging.
Apache Pulsar a été développé par Yahoo! pour gérer leurs 100 milliards de messages quotidiens avec une architecture multi-tenant native.
Debezium (du latin “from the beginning”) capture chaque changement depuis le début du log de la base de données — c’est la base du Change Data Capture.
1. Kafka Avancé
Cette section couvre les fonctionnalités avancées de Kafka pour la production à grande échelle. Tu connais déjà les bases (M24), on passe directement aux sujets avancés.
1.1 Quotas et Throttling
Les quotas permettent de limiter les ressources consommées par les clients pour éviter qu’un client ne monopolise le cluster.
Types de quotas
| Quota | Description | Unité |
|---|---|---|
| producer_byte_rate | Débit max en écriture | bytes/sec |
| consumer_byte_rate | Débit max en lecture | bytes/sec |
| request_percentage | % CPU du broker | % |
| controller_mutation_rate | Taux de mutations (create/delete) | mutations/sec |
Configurer les quotas
# Quota par user
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152' \
--entity-type users --entity-name data-pipeline-user
# Quota par client-id
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'producer_byte_rate=5242880' \
--entity-type clients --entity-name etl-producer
# Quota par user + client-id (plus spécifique)
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'producer_byte_rate=10485760' \
--entity-type users --entity-name spark-user \
--entity-type clients --entity-name spark-producer
# Quota par défaut pour tous les users
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'producer_byte_rate=1048576' \
--entity-type users --entity-default
# Voir les quotas configurés
kafka-configs.sh --bootstrap-server localhost:9092 \
--describe --entity-type users --entity-name data-pipeline-userQuotas dans le code Python
from confluent_kafka import Producer
# Le client doit spécifier son client.id pour être identifié par les quotas
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'client.id': 'etl-producer', # Identifiant pour les quotas
'acks': 'all',
})
# Si le quota est dépassé, Kafka throttle automatiquement le client
# Le producer recevra des délais dans les réponses1.2 Tiered Storage (KIP-405)
Le Tiered Storage permet de stocker les données anciennes sur un stockage moins cher (S3, GCS, Azure Blob) tout en gardant les données récentes sur disque local.
┌─────────────────────────────────────────────────────────────────────────────┐
│ TIERED STORAGE │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LOCAL TIER (Hot Data) │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │Seg 5│ │Seg 6│ │Seg 7│ │Seg 8│ │Seg 9│ ← Données récentes │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ (SSD local, rapide) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ Offload automatique │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ REMOTE TIER (Cold Data) │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │Seg 1│ │Seg 2│ │Seg 3│ │Seg 4│ ← Données anciennes │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ (S3/GCS/Azure, économique) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Configuration Tiered Storage
# server.properties (broker)
# Activer le tiered storage
remote.log.storage.system.enable=true
# Plugin de stockage (exemple S3)
remote.log.storage.manager.class.name=org.apache.kafka.tiered.storage.s3.S3RemoteStorageManager
remote.log.storage.manager.class.path=/opt/kafka/plugins/tiered-storage-s3.jar
# Configuration S3
remote.log.storage.s3.bucket=my-kafka-tiered-storage
remote.log.storage.s3.region=eu-west-1
# Rétention locale (données chaudes)
local.retention.ms=86400000 # 1 jour en local
# Rétention totale (incluant remote)
retention.ms=2592000000 # 30 jours au total
# Activer le tiered storage sur un topic existant
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name events \
--add-config 'remote.storage.enable=true,local.retention.ms=86400000,retention.ms=2592000000'Avantages du Tiered Storage
| Avantage | Description |
|---|---|
| Coût réduit | Stockage S3 ~10x moins cher que SSD |
| Rétention illimitée | Garder des années de données |
| Cluster plus petit | Moins de disque local nécessaire |
| Replay facilité | Relire des données anciennes pour reprocessing |
1.3 Transactions et Exactly-Once Semantics (EOS)
En M24, tu as vu les garanties de livraison (at-most-once, at-least-once, exactly-once). Ici, on va implémenter exactly-once avec les transactions Kafka.
Rappel des garanties
| Niveau | Description | Risque |
|---|---|---|
| At-most-once | Fire & forget | Perte de messages |
| At-least-once | Retry jusqu’à ACK | Doublons possibles |
| Exactly-once | Transactions + idempotence | Aucun (mais plus complexe) |
Producer Idempotent (pas de doublons)
L’idempotence garantit qu’un message n’est écrit qu’une seule fois même en cas de retry réseau.
from confluent_kafka import Producer
# Producer idempotent — PAS de duplicatas même avec retries
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'enable.idempotence': True, # ← Active l'idempotence
'acks': 'all', # Requis pour idempotence
'retries': 2147483647, # Retries infinis (best practice)
'max.in.flight.requests.per.connection': 5, # Max 5 avec idempotence
})
# Comment ça marche ?
# 1. Le producer assigne un Producer ID (PID) et un Sequence Number à chaque message
# 2. Le broker détecte les doublons en comparant (PID, Sequence)
# 3. Si un retry envoie le même message, le broker le reconnaît et ignore le doublonTransactions complètes (multi-topics atomique)
Les transactions permettent d’écrire sur plusieurs topics/partitions de manière atomique : tout réussit ou tout échoue.
from confluent_kafka import Producer, KafkaException
# Producer transactionnel
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'etl-pipeline-001', # ID unique et STABLE
'enable.idempotence': True, # Implicitement activé
'acks': 'all',
})
# Initialiser les transactions (une seule fois au démarrage)
producer.init_transactions()
try:
# Démarrer une transaction
producer.begin_transaction()
# Écrire sur PLUSIEURS topics (atomique)
producer.produce('orders-processed', key='order-1', value='{"status": "done"}')
producer.produce('audit-log', key='order-1', value='{"action": "order_processed"}')
producer.produce('metrics', key='counter', value='{"orders_processed": 1}')
# Commit la transaction — TOUT ou RIEN
producer.commit_transaction()
print("✅ Transaction committed successfully")
except KafkaException as e:
# Abort en cas d'erreur — aucun message n'est visible
producer.abort_transaction()
print(f"❌ Transaction aborted: {e}")Pattern Read-Process-Write (Exactly-Once complet)
Le pattern le plus puissant : lire, traiter, écrire, et commiter les offsets dans la même transaction.
from confluent_kafka import Consumer, Producer, KafkaException
# Consumer avec isolation transactionnelle
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'etl-group',
'isolation.level': 'read_committed', # ← Ne lit que les messages committés
'enable.auto.commit': False, # ← Commit manuel dans la transaction
})
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'etl-processor-001',
'enable.idempotence': True,
})
producer.init_transactions()
consumer.subscribe(['raw-events'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
try:
# 1. Démarrer la transaction
producer.begin_transaction()
# 2. Traiter le message
processed = process_message(msg.value())
# 3. Écrire le résultat
producer.produce('processed-events', value=processed)
# 4. Commit les offsets DANS la transaction
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata()
)
# 5. Commit atomique : écriture + offset ensemble
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
print(f"Transaction failed: {e}")1.4 Schema Registry Avancé
En M24, tu as vu les bases du Schema Registry avec Avro. Approfondissons les modes de compatibilité et l’évolution de schémas.
Modes de compatibilité détaillés
| Mode | Nouveau consumer lit ancien | Ancien consumer lit nouveau | Changements autorisés |
|---|---|---|---|
| BACKWARD | ✅ Oui | ❌ Non | Ajouter champs optionnels, supprimer champs |
| FORWARD | ❌ Non | ✅ Oui | Ajouter champs, supprimer champs optionnels |
| FULL | ✅ Oui | ✅ Oui | Ajouter/supprimer champs optionnels uniquement |
| NONE | — | — | Tout (⚠️ dangereux en production) |
# Configurer la compatibilité globale
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
# Configurer par sujet (override global)
curl -X PUT http://localhost:8081/config/orders-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "FULL"}'
# Tester la compatibilité AVANT de publier un nouveau schéma
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/json" \
-d '{"schema": "{...}"}'
# Réponse: {"is_compatible": true} ou {"is_compatible": false}Évolution de schéma — Exemple pratique
# Version 1 du schéma
schema_v1 = '''
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
'''
# Version 2 — Ajouter un champ optionnel (BACKWARD compatible)
schema_v2 = '''
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
]
}
'''
# ✅ Les nouveaux consumers peuvent lire les anciens messages (currency = "EUR" par défaut)2. RabbitMQ — L’Alternative Queue-Based
RabbitMQ est un message broker traditionnel implémentant le protocole AMQP. Contrairement à Kafka (log-based), RabbitMQ est queue-based avec un routage flexible.
Kafka vs RabbitMQ — Comparaison détaillée
| Aspect | Kafka | RabbitMQ |
|---|---|---|
| Modèle | Log distribué (append-only) | Message queue (FIFO) |
| Persistance | Toujours sur disque | Optionnelle (mémoire ou disque) |
| Ordre | Garanti par partition | Garanti par queue |
| Replay | ✅ Natif (offsets) | ❌ Messages supprimés après ACK |
| Routage | Topics + Partitions | Exchanges (fanout, direct, topic, headers) |
| Throughput | Très élevé (millions/sec) | Élevé (dizaines de milliers/sec) |
| Latence | Millisecondes | Sub-milliseconde |
| Use case principal | Event streaming, analytics | Task queues, RPC, notifications |
Architecture RabbitMQ
┌─────────────────────────────────────────────────────────────────────────────┐
│ RABBITMQ ARCHITECTURE │
│ │
│ Producer ────────┐ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ EXCHANGE │ ← Routing logic (type: fanout/direct/topic) │
│ └───────┬───────┘ │
│ │ Bindings (routing rules) │
│ ┌───────────┼───────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Queue 1 │ │ Queue 2 │ │ Queue 3 │ ← Messages stockés ici │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Consumer 1 Consumer 2 Consumer 3 │
└─────────────────────────────────────────────────────────────────────────────┘
Types d’Exchanges
| Exchange | Routage | Use case |
|---|---|---|
| Direct | routing_key exact match | Logs par niveau (error→error_queue) |
| Fanout | Broadcast à toutes les queues liées | Notifications, cache invalidation |
| Topic | Pattern matching (*.error, logs.#) | Logs multi-critères (app.module.level) |
| Headers | Match sur headers du message | Routage complexe multi-attributs |
Installation
# Docker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# Management UI: http://localhost:15672 (guest/guest)Producer Python (pika)
import pika
import json
# Connexion
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Déclarer un exchange de type topic
channel.exchange_declare(exchange='data_events', exchange_type='topic', durable=True)
# Publier un message
message = {'event_type': 'order_created', 'order_id': 'ORD-001', 'amount': 99.99}
channel.basic_publish(
exchange='data_events',
routing_key='orders.created',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
)
connection.close()Consumer Python
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_processor', durable=True)
channel.queue_bind(exchange='data_events', queue='order_processor', routing_key='orders.*')
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received: {message}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='order_processor', on_message_callback=callback)
channel.start_consuming()Quand utiliser RabbitMQ vs Kafka ?
| ✅ RabbitMQ | ✅ Kafka |
|---|---|
| Task queues (jobs async) | Event streaming temps réel |
| RPC (request/reply) | Log aggregation |
| Routage complexe (exchanges) | Replay de données historiques |
| Faible latence critique (<1ms) | Très haut throughput (millions/sec) |
3. Apache Pulsar — Le Challenger Multi-Tenant
Apache Pulsar combine les avantages de Kafka (log-based) et RabbitMQ (queuing) avec une architecture cloud-native.
Kafka vs Pulsar
| Aspect | Kafka | Pulsar |
|---|---|---|
| Architecture | Brokers = Storage + Compute | Séparation Brokers / BookKeeper |
| Multi-tenancy | Limité | Natif (tenants, namespaces) |
| Geo-replication | MirrorMaker (externe) | Natif et synchrone |
| Queuing | Non natif | Natif (shared subscriptions) |
Types de Subscriptions Pulsar
| Type | Description | Équivalent |
|---|---|---|
| Exclusive | 1 seul consumer | Kafka standard |
| Failover | Failover automatique | — |
| Shared | Load balanced (round-robin) | RabbitMQ |
| Key_Shared | Ordre par clé | Kafka partitions |
Producer/Consumer Python
import pulsar
import json
client = pulsar.Client('pulsar://localhost:6650')
# Producer
producer = client.create_producer('persistent://public/default/orders')
producer.send(json.dumps({'order_id': 'ORD-001'}).encode('utf-8'))
# Consumer (shared = load balanced)
consumer = client.subscribe(
'persistent://public/default/orders',
subscription_name='order-processor',
consumer_type=pulsar.ConsumerType.Shared
)
msg = consumer.receive()
print(json.loads(msg.data()))
consumer.acknowledge(msg)
client.close()4. Debezium — Change Data Capture en Profondeur
Debezium est une plateforme open-source de Change Data Capture (CDC) qui capture les changements dans les bases de données et les streame vers Kafka.
En M24, Debezium était mentionné dans le quiz. Ici, on l’implémente en profondeur.
Pourquoi le CDC ?
| Approche traditionnelle | CDC avec Debezium |
|---|---|
| Batch ETL (SELECT * toutes les heures) | Streaming temps réel |
| Query la DB source (charge CPU/IO) | Lit le transaction log (léger) |
| Détection des DELETEs difficile | Capture TOUS les changements |
| Latence élevée (heures) | Latence sub-seconde |
Architecture Debezium
┌─────────────────────────────────────────────────────────────────────────────┐
│ DEBEZIUM ARCHITECTURE │
│ │
│ ┌─────────────┐ │
│ │ PostgreSQL │ Transaction Log (WAL) │
│ │ (source) │──────────────────┐ │
│ └─────────────┘ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ MySQL │───▶│ KAFKA CONNECT │───▶│ KAFKA TOPICS │ │
│ │ (source) │ │ + Debezium │ │ (change events) │ │
│ └─────────────┘ │ Connectors │ │ │ │
│ └─────────────────────┘ │ • dbserver.schema │ │
│ ┌─────────────┐ │ .table │ │
│ │ MongoDB │───────────────────────────────┘ │
│ │ (source) │ │ │
│ └─────────────┘ ▼ │
│ ┌─────────────────────┐ │
│ │ Consumers │ │
│ │ • Data Warehouse │ │
│ │ • Elasticsearch │ │
│ │ • Microservices │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Connecteurs Debezium
| Base de données | Méthode de capture | Maturité |
|---|---|---|
| PostgreSQL | Logical replication (pgoutput) | ⭐⭐⭐⭐⭐ |
| MySQL/MariaDB | Binary log (binlog) | ⭐⭐⭐⭐⭐ |
| MongoDB | Oplog / Change Streams | ⭐⭐⭐⭐⭐ |
| SQL Server | CDC tables | ⭐⭐⭐⭐ |
| Oracle | LogMiner | ⭐⭐⭐⭐ |
Docker Compose complet
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
postgres:
image: postgres:15
ports: ["5432:5432"]
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: source_db
command: ["postgres", "-c", "wal_level=logical"] # CRUCIAL
connect:
image: debezium/connect:2.5
depends_on: [kafka, postgres]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: debezium-connect
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statusesEnregistrer le connecteur
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "source_db",
"topic.prefix": "cdc",
"table.include.list": "public.orders",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms"
}
}'Format des messages Debezium
| Champ | Description |
|---|---|
| before | État AVANT le changement (null pour INSERT) |
| after | État APRÈS le changement (null pour DELETE) |
| op | Opération : c=create, u=update, d=delete, r=read (snapshot) |
| source | Métadonnées (table, transaction ID, LSN) |
Consumer Python CDC
from confluent_kafka import Consumer
import json
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'cdc-processor',
'auto.offset.reset': 'earliest',
})
consumer.subscribe(['cdc.public.orders'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
event = json.loads(msg.value())
op = event.get('__op')
data = {k: v for k, v in event.items() if not k.startswith('__')}
if op in ('c', 'r'): # INSERT ou SNAPSHOT
print(f"INSERT: {data}")
elif op == 'u': # UPDATE
print(f"UPDATE: {data}")
elif op == 'd': # DELETE
print(f"DELETE: id={data.get('id')}")Outbox Pattern
Le Outbox Pattern garantit la cohérence entre les mises à jour DB et l’envoi d’événements.
-- Table outbox
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Dans une TRANSACTION
BEGIN;
UPDATE orders SET status = 'shipped' WHERE id = 1;
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '1', 'OrderShipped', '{"order_id": 1}');
COMMIT;
-- Debezium capture l'INSERT dans outbox → KafkaUse Cases CDC
| Use Case | Description |
|---|---|
| Sync Data Warehouse | Réplication temps réel vers Snowflake, BigQuery |
| Cache invalidation | Invalider Redis quand la DB change |
| Search indexing | Sync vers Elasticsearch |
| Microservices events | Outbox pattern |
| Audit log | Compliance, RGPD |
5. Patterns de Messaging Distribué
5.1 Dead Letter Queue (DLQ)
MAX_RETRIES = 3
while True:
msg = consumer.poll(1.0)
headers = dict(msg.headers() or [])
retry_count = int(headers.get('retry_count', b'0'))
try:
process_message(msg.value())
consumer.commit(msg)
except Exception as e:
if retry_count >= MAX_RETRIES:
# Envoyer vers DLQ
producer.produce('orders-dlq', key=msg.key(), value=msg.value(),
headers=[('error', str(e))])
else:
# Retry
producer.produce('orders-retry', key=msg.key(), value=msg.value(),
headers=[('retry_count', str(retry_count + 1))])
consumer.commit(msg)5.2 Saga Pattern
Create Order ──▶ Reserve Stock ──▶ Process Payment ──▶ Ship Order
│ │ │
▼ ▼ ▼
Cancel Order ◀── Release Stock ◀── Refund Payment (compensation)
5.3 Event Sourcing
events = [
{'type': 'OrderCreated', 'order_id': 1, 'amount': 100},
{'type': 'PaymentReceived', 'order_id': 1},
{'type': 'OrderShipped', 'order_id': 1},
]
def rebuild_state(events):
state = {}
for e in events:
if e['type'] == 'OrderCreated':
state = {'id': e['order_id'], 'status': 'created'}
elif e['type'] == 'PaymentReceived':
state['status'] = 'paid'
elif e['type'] == 'OrderShipped':
state['status'] = 'shipped'
return state5.4 CQRS
Commands ──▶ Events (Kafka) ──▶ Projector ──▶ Read Model (optimized)
│ │
▼ ▼
Write DB Query API
(PostgreSQL) (Elasticsearch)
6. Exercices Pratiques
Exercice 1 : Kafka Transactions
Implémenter un producer transactionnel qui écrit sur 2 topics atomiquement.
Exercice 2 : RabbitMQ Task Queue
Créer une task queue avec priorités et DLQ.
Exercice 3 : Pipeline CDC Complet
Déployer PostgreSQL + Kafka + Debezium et sync vers un data warehouse.
Exercice 4 : Comparatif Performance
Comparer Kafka vs RabbitMQ (100K messages, throughput, latence).
Exercice 5 : Outbox Pattern
Implémenter le pattern Outbox avec Debezium.
📚 Ressources
- Apache Kafka Docs
- RabbitMQ Docs
- Apache Pulsar Docs
- Debezium Docs
- Kafka: The Definitive Guide — Neha Narkhede
- Designing Data-Intensive Applications — Martin Kleppmann
➡️ Prochaine étape
👉 Module suivant : 30_spark_scala_deep_dive — Spark & Scala Deep Dive
🎉 Félicitations ! Tu as terminé le module Distributed Messaging.