Projet Intégrateur : Pipeline E-commerce Olist

Synthèse de Toutes les Compétences Data Engineering


Contexte

Tu viens de rejoindre l’équipe Data Engineering d’Olist, la plus grande plateforme e-commerce du Brésil. Olist connecte des petits commerçants aux grandes marketplaces comme Amazon, Mercado Libre, etc.

Actuellement, les données sont stockées dans des fichiers CSV et analysées manuellement par l’équipe BI. Ton manager te confie une mission critique :

“Nous avons besoin d’une architecture Lakehouse moderne. Tu dois construire un pipeline complet qui ingère nos données en temps réel, les transforme, et les met à disposition pour les dashboards analytiques.”


Ta Mission

Construire un Data Pipeline complet de bout en bout :

CSV (Kaggle) → Kafka → Spark SSS → Delta Lake (Bronze/Silver) → dbt (Gold) → Dashboard-ready

Dataset

Brazilian E-Commerce Public Dataset by Olist

🔗 Kaggle : https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce

Ce dataset contient ~100 000 commandes réelles (anonymisées) passées sur Olist entre 2016 et 2018.

Tables disponibles

Fichier Description Lignes
olist_orders_dataset.csv Commandes ~100K
olist_order_items_dataset.csv Lignes de commande ~113K
olist_customers_dataset.csv Clients ~100K
olist_products_dataset.csv Produits ~33K
olist_sellers_dataset.csv Vendeurs ~3K
olist_order_payments_dataset.csv Paiements ~104K
olist_order_reviews_dataset.csv Avis clients ~100K
olist_geolocation_dataset.csv Géolocalisation ~1M
product_category_name_translation.csv Traduction catégories ~71

Schéma relationnel

                              ┌──────────────────┐
                              │    customers     │
                              │  customer_id (PK)│
                              └────────┬─────────┘
                                       │
                                       │ 1:N
                                       ▼
┌──────────────┐    1:N     ┌──────────────────┐     N:1     ┌──────────────┐
│   sellers    │◄───────────│     orders       │────────────▶│   payments   │
│ seller_id(PK)│            │  order_id (PK)   │             │              │
└──────────────┘            │  customer_id(FK) │             └──────────────┘
       ▲                    └────────┬─────────┘
       │                             │
       │ N:1                         │ 1:N
       │                             ▼
┌──────────────┐    N:1     ┌──────────────────┐     1:N     ┌──────────────┐
│   products   │◄───────────│   order_items    │────────────▶│   reviews    │
│product_id(PK)│            │ order_id (FK)    │             │              │
└──────────────┘            │ product_id (FK)  │             └──────────────┘
                            │ seller_id (FK)   │
                            └──────────────────┘

Architecture Cible

┌─────────────────────────────────────────────────────────────────────────────────┐
│                         PIPELINE E-COMMERCE OLIST                               │
│                                                                                 │
│  ┌──────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    │
│  │   CSV Files  │    │    KAFKA    │    │   BRONZE    │    │   SILVER    │    │
│  │   (Kaggle)   │───▶│   Topics    │───▶│  (Delta)    │───▶│  (Delta)    │    │
│  │              │    │             │    │             │    │             │    │
│  │ • orders     │    │ • raw_orders│    │  Append     │    │ MERGE INTO  │    │
│  │ • items      │    │ • raw_items │    │  Raw data   │    │ Deduplicated│    │
│  │ • customers  │    │ • raw_custs │    │  Partitioned│    │ Enriched    │    │
│  │ • products   │    │ • raw_prods │    │             │    │ Validated   │    │
│  │ • sellers    │    │             │    │             │    │             │    │
│  └──────────────┘    └─────────────┘    └─────────────┘    └──────┬──────┘    │
│         │                   │                  │                  │           │
│         │            Spark SSS          Spark SSS          Spark SSS          │
│         │                                                  + foreachBatch     │
│         │                                                                     │
│         ▼                                                        │            │
│  ┌──────────────┐                                               ▼            │
│  │  Producers   │                                    ┌─────────────────┐     │
│  │  (Python)    │                                    │      GOLD       │     │
│  │              │                                    │     (dbt)       │     │
│  │ Simulate     │                                    │                 │     │
│  │ streaming    │                                    │ • daily_sales   │     │
│  └──────────────┘                                    │ • seller_perf   │     │
│                                                      │ • customer_rfm  │     │
│  ┌─────────────────────────────────────────────┐    │ • product_stats │     │
│  │           ORCHESTRATION (Airflow)           │    │ • delivery_perf │     │
│  │                                             │    └─────────────────┘     │
│  │  [Producers] → [Bronze] → [Silver] → [dbt] → [GE Validation]            │
│  └─────────────────────────────────────────────┘                            │
│                                                                              │
│  ┌─────────────────────────────────────────────┐                            │
│  │        DATA QUALITY (Great Expectations)    │                            │
│  │  • Bronze: schema, completeness             │                            │
│  │  • Silver: business rules, uniqueness       │                            │
│  │  • Gold: consistency, freshness             │                            │
│  └─────────────────────────────────────────────┘                            │
└─────────────────────────────────────────────────────────────────────────────────┘

Livrables Attendus

Tu dois produire les livrables suivants :

olist_pipeline/
│
├── docker-compose.yml           # 1. Infrastructure complète
│
├── producers/                   # 2. Producteurs Kafka
│   ├── orders_producer.py
│   ├── items_producer.py
│   ├── customers_producer.py
│   ├── products_producer.py
│   └── sellers_producer.py
│
├── spark_jobs/                  # 3 & 4. Jobs Spark
│   ├── bronze/
│   │   ├── ingest_orders.py
│   │   ├── ingest_items.py
│   │   └── ingest_customers.py
│   └── silver/
│       ├── silver_orders.py     # Avec MERGE INTO
│       ├── silver_customers.py
│       └── silver_order_items.py
│
├── dbt_olist/                   # 5. Projet dbt
│   ├── dbt_project.yml
│   ├── models/
│   │   ├── staging/
│   │   ├── intermediate/
│   │   └── gold/
│   └── tests/
│
├── great_expectations/          # 6. Data Quality
│   └── expectations/
│
├── dags/                        # 7. Airflow DAGs
│   └── olist_pipeline_dag.py
│
└── README.md                    # 8. Documentation

Tables Gold Attendues

Tu dois créer 5 models Gold dans dbt :

1. gold_daily_sales

Chiffre d’affaires quotidien.

Colonne Type Description
order_date DATE Date de commande
total_orders INT Nombre de commandes
total_revenue DECIMAL CA total
avg_order_value DECIMAL Panier moyen
total_items INT Nombre d’articles vendus

2. gold_seller_performance

Métriques par vendeur.

Colonne Type Description
seller_id STRING ID vendeur
seller_city STRING Ville
total_orders INT Commandes traitées
total_revenue DECIMAL CA généré
avg_review_score DECIMAL Note moyenne
avg_delivery_days DECIMAL Délai moyen livraison

3. gold_customer_rfm

Segmentation RFM (Recency, Frequency, Monetary).

Colonne Type Description
customer_unique_id STRING ID client unique
recency_days INT Jours depuis dernière commande
frequency INT Nombre de commandes
monetary DECIMAL Total dépensé
rfm_segment STRING Segment (Champions, At Risk, etc.)

4. gold_product_analytics

Performance des produits.

Colonne Type Description
product_category STRING Catégorie (EN)
total_sold INT Quantité vendue
total_revenue DECIMAL CA
avg_price DECIMAL Prix moyen
avg_review_score DECIMAL Note moyenne

5. gold_delivery_performance

Performance des livraisons.

Colonne Type Description
seller_state STRING État du vendeur
customer_state STRING État du client
total_deliveries INT Nombre de livraisons
avg_delivery_days DECIMAL Délai moyen
on_time_rate DECIMAL % livré à temps
late_rate DECIMAL % en retard

Spécifications Techniques

1. Infrastructure (Docker Compose)

Services requis :

Service Image Ports Rôle
zookeeper confluentinc/cp-zookeeper:7.5.0 2181 Coordination Kafka
kafka confluentinc/cp-kafka:7.5.0 9092, 29092 Message broker
schema-registry confluentinc/cp-schema-registry:7.5.0 8081 Gestion schémas
minio minio/minio 9000, 9001 S3 local (Delta Lake)
spark-master bitnami/spark:3.5 8080, 7077 Spark Master
spark-worker bitnami/spark:3.5 8081 Spark Worker
postgres postgres:15 5432 Métadonnées Airflow
airflow-webserver apache/airflow:2.8.0 8082 UI Airflow
airflow-scheduler apache/airflow:2.8.0 - Scheduler

2. Producteurs Kafka

Chaque producteur doit :

  • Lire le fichier CSV correspondant
  • Envoyer les lignes une par une vers Kafka (simuler du streaming)
  • Ajouter un délai aléatoire (50-200ms) entre les messages
  • Simuler du late data : 5% des messages avec un timestamp décalé de 1-5 minutes
  • Simuler des doublons : 2% des messages envoyés 2 fois

Topics Kafka : - raw_orders - raw_order_items - raw_customers - raw_products - raw_sellers

Format des messages : JSON

{
  "order_id": "e481f51cbdc54678b7cc49136f2d6af7",
  "customer_id": "9ef432eb6251297304e76186b10a928d",
  "order_status": "delivered",
  "order_purchase_timestamp": "2017-10-02 10:56:33",
  "_ingestion_timestamp": "2024-01-15T10:30:00Z"
}

3. Couche Bronze (Spark SSS → Delta)

Mode : Append (données brutes, pas de transformation)

Chaque job Bronze doit :

  • Lire depuis le topic Kafka correspondant
  • Parser le JSON
  • Ajouter une colonne _bronze_ingested_at (timestamp d’ingestion)
  • Écrire en append dans Delta Lake
  • Partitionner par date d’ingestion (_ingestion_date)
  • Configurer le checkpointing

Chemins Delta :

s3a://lakehouse/bronze/orders/
s3a://lakehouse/bronze/order_items/
s3a://lakehouse/bronze/customers/
s3a://lakehouse/bronze/products/
s3a://lakehouse/bronze/sellers/

4. Couche Silver (Spark SSS + MERGE INTO) ⭐

Mode : foreachBatch + MERGE INTO (upserts, déduplication)

C’est l’étape clé du projet. Chaque job Silver doit :

  1. Lire depuis Bronze (streaming ou batch)
  2. Dédupliquer sur la clé primaire (garder le plus récent)
  3. Valider les données (filtrer les invalides)
  4. Enrichir si nécessaire (jointures)
  5. MERGE INTO Delta Lake Silver

Pattern à utiliser :

def upsert_to_silver(batch_df, batch_id):
    # 1. Déduplication
    deduped = batch_df.dropDuplicates(["order_id"])
    
    # 2. MERGE INTO
    delta_table = DeltaTable.forPath(spark, "s3a://lakehouse/silver/orders")
    
    delta_table.alias("target").merge(
        deduped.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdate(
        condition="source._bronze_ingested_at > target._bronze_ingested_at",
        set={...}
    ).whenNotMatchedInsertAll().execute()

bronze_stream.writeStream \
    .foreachBatch(upsert_to_silver) \
    .option("checkpointLocation", "/checkpoints/silver_orders") \
    .start()

Tables Silver :

Table Clé primaire Enrichissement
silver_orders order_id + customer info
silver_order_items order_id + product_id + seller_id + product info
silver_customers customer_id Dédup sur customer_unique_id
silver_products product_id + category translation
silver_sellers seller_id -

5. Couche Gold (dbt)

Structure du projet dbt :

dbt_olist/
├── dbt_project.yml
├── packages.yml                 # dbt-utils, dbt-expectations
│
├── models/
│   ├── staging/                 # Vues sur Silver
│   │   ├── _sources.yml
│   │   ├── stg_orders.sql
│   │   ├── stg_order_items.sql
│   │   ├── stg_customers.sql
│   │   ├── stg_products.sql
│   │   └── stg_sellers.sql
│   │
│   ├── intermediate/            # Transformations métier
│   │   ├── int_orders_enriched.sql
│   │   └── int_order_items_enriched.sql
│   │
│   └── gold/                    # Tables analytiques
│       ├── _gold__models.yml    # Tests + docs
│       ├── gold_daily_sales.sql
│       ├── gold_seller_performance.sql
│       ├── gold_customer_rfm.sql
│       ├── gold_product_analytics.sql
│       └── gold_delivery_performance.sql
│
├── macros/
│   └── rfm_segment.sql          # Macro pour segmentation RFM
│
└── tests/
    └── assert_positive_revenue.sql

Matérialisations : - staging/ : view - intermediate/ : ephemeral ou view - gold/ : incremental (avec unique_key)

6. Data Quality (Great Expectations)

Créer des suites d’expectations pour chaque couche :

Suite Bronze : - Schema validation (colonnes présentes) - expect_column_values_to_not_be_null sur les IDs

Suite Silver : - expect_column_values_to_be_unique sur les clés primaires - expect_column_values_to_be_between sur les montants (0 - 100000) - expect_column_values_to_be_in_set sur les statuts

Suite Gold : - expect_column_values_to_be_between sur les métriques - expect_table_row_count_to_be_between (freshness check) - Tests de cohérence (total Gold = total Silver)

7. Orchestration (Airflow)

DAG principal : olist_pipeline

┌─────────────────┐
│ check_freshness │  Vérifier que les données arrivent
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│ bronze_to_silver│  Spark job (MERGE INTO)
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│    dbt_run      │  dbt run --select gold
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│   dbt_test      │  dbt test
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  ge_validate    │  Great Expectations checkpoint
└────────┬────────┘
         │
    ┌────┴────┐
    ▼         ▼
┌───────┐ ┌───────┐
│notify │ │notify │
│success│ │failure│
└───────┘ └───────┘

Configuration : - Schedule : 0 6 * * * (tous les jours à 6h) - Retries : 2 - Alertes : Email ou Slack on_failure


💡 Hints & Ressources

Rappels des patterns clés

Module Pattern Où l’utiliser
24 readStream.format("kafka") Bronze ingestion
24 foreachBatch Silver MERGE
23 DeltaTable.forPath().merge() Silver MERGE
23 whenMatchedUpdate / whenNotMatchedInsert Silver MERGE
25 { ref('...') } dbt models
25 { config(materialized='incremental') } Gold models
25 is_incremental() Gold models
22 BashOperator Airflow tasks
Voir le code
# Hint 1 : Structure du producteur Kafka

producer_template = '''
from kafka import KafkaProducer
import pandas as pd
import json
import time
import random
from datetime import datetime, timedelta

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

df = pd.read_csv('data/olist_orders_dataset.csv')

for idx, row in df.iterrows():
    message = row.to_dict()
    
    # Ajouter timestamp d'ingestion
    ingestion_ts = datetime.now()
    
    # Simuler late data (5%)
    if random.random() < 0.05:
        ingestion_ts -= timedelta(minutes=random.randint(1, 5))
    
    message['_ingestion_timestamp'] = ingestion_ts.isoformat()
    
    producer.send('raw_orders', value=message)
    
    # Simuler doublons (2%)
    if random.random() < 0.02:
        producer.send('raw_orders', value=message)
    
    # Délai aléatoire
    time.sleep(random.uniform(0.05, 0.2))

producer.flush()
'''

print(producer_template)
Voir le code
# Hint 2 : Pattern MERGE INTO pour Silver

merge_pattern = '''
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp

def upsert_orders_to_silver(batch_df, batch_id):
    """Upsert orders vers Silver avec déduplication."""
    
    if batch_df.count() == 0:
        return
    
    # 1. Déduplication (garder le plus récent)
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number
    
    window = Window.partitionBy("order_id").orderBy(col("_ingestion_timestamp").desc())
    deduped = batch_df.withColumn("_row_num", row_number().over(window)) \
                      .filter(col("_row_num") == 1) \
                      .drop("_row_num")
    
    # 2. Ajouter timestamp Silver
    enriched = deduped.withColumn("_silver_updated_at", current_timestamp())
    
    # 3. MERGE INTO
    silver_path = "s3a://lakehouse/silver/orders"
    
    if DeltaTable.isDeltaTable(spark, silver_path):
        delta_table = DeltaTable.forPath(spark, silver_path)
        
        delta_table.alias("target").merge(
            enriched.alias("source"),
            "target.order_id = source.order_id"
        ).whenMatchedUpdate(
            condition="source._ingestion_timestamp > target._ingestion_timestamp",
            set={
                "order_status": "source.order_status",
                "order_delivered_customer_date": "source.order_delivered_customer_date",
                "_ingestion_timestamp": "source._ingestion_timestamp",
                "_silver_updated_at": "source._silver_updated_at"
            }
        ).whenNotMatchedInsertAll().execute()
    else:
        # Première exécution : créer la table
        enriched.write.format("delta").mode("overwrite").save(silver_path)
    
    print(f"Batch {batch_id}: {enriched.count()} records merged to Silver")
'''

print(merge_pattern)
Voir le code
# Hint 3 : Model dbt incremental

dbt_incremental = '''
-- models/gold/gold_daily_sales.sql

{{ config(
    materialized='incremental',
    unique_key='order_date',
    incremental_strategy='merge'
) }}

WITH orders AS (
    SELECT * FROM {{ ref('int_orders_enriched') }}
    {% if is_incremental() %}
    WHERE DATE(order_purchase_timestamp) >= (
        SELECT MAX(order_date) - INTERVAL 2 DAY FROM {{ this }}
    )
    {% endif %}
),

daily_agg AS (
    SELECT
        DATE(order_purchase_timestamp) AS order_date,
        COUNT(DISTINCT order_id) AS total_orders,
        SUM(total_amount) AS total_revenue,
        AVG(total_amount) AS avg_order_value,
        SUM(total_items) AS total_items
    FROM orders
    WHERE order_status = 'delivered'
    GROUP BY DATE(order_purchase_timestamp)
)

SELECT * FROM daily_agg
'''

print(dbt_incremental)
Voir le code
# Hint 4 : Macro RFM pour dbt

rfm_macro = '''
-- macros/rfm_segment.sql

{% macro rfm_segment(recency, frequency, monetary) %}
    CASE
        -- Champions : Récent, Fréquent, Gros dépensier
        WHEN {{ recency }} <= 30 AND {{ frequency }} >= 3 AND {{ monetary }} >= 500 THEN 'Champions'
        
        -- Loyal Customers : Fréquent
        WHEN {{ frequency }} >= 3 THEN 'Loyal Customers'
        
        -- Potential Loyalists : Récent, pas encore fréquent
        WHEN {{ recency }} <= 30 AND {{ frequency }} < 3 THEN 'Potential Loyalists'
        
        -- At Risk : Pas récent mais était fréquent
        WHEN {{ recency }} > 90 AND {{ frequency }} >= 2 THEN 'At Risk'
        
        -- Hibernating : Pas récent, peu fréquent
        WHEN {{ recency }} > 90 THEN 'Hibernating'
        
        -- Others
        ELSE 'Others'
    END
{% endmacro %}

-- Utilisation dans gold_customer_rfm.sql :
-- SELECT
--     customer_unique_id,
--     recency_days,
--     frequency,
--     monetary,
--     {{ rfm_segment('recency_days', 'frequency', 'monetary') }} AS rfm_segment
-- FROM rfm_base
'''

print(rfm_macro)
Voir le code
# Hint 5 : DAG Airflow

airflow_dag = '''
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-team@olist.com'],
}

with DAG(
    dag_id='olist_pipeline',
    default_args=default_args,
    description='Pipeline E-commerce Olist',
    schedule_interval='0 6 * * *',
    start_date=days_ago(1),
    catchup=False,
    tags=['olist', 'lakehouse'],
) as dag:
    
    # 1. Vérifier la fraîcheur des sources
    check_freshness = BashOperator(
        task_id='check_freshness',
        bash_command='cd /opt/dbt && dbt source freshness',
    )
    
    # 2. Spark : Bronze → Silver
    bronze_to_silver = BashOperator(
        task_id='bronze_to_silver',
        bash_command='spark-submit /opt/spark_jobs/silver/run_all_silver.py',
    )
    
    # 3. dbt run
    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /opt/dbt && dbt run --select gold',
    )
    
    # 4. dbt test
    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command='cd /opt/dbt && dbt test --select gold',
    )
    
    # 5. Great Expectations
    ge_validate = BashOperator(
        task_id='ge_validate',
        bash_command='great_expectations checkpoint run gold_checkpoint',
    )
    
    # 6. Notification succès
    notify_success = BashOperator(
        task_id='notify_success',
        bash_command='echo "Pipeline completed successfully!"',
        trigger_rule='all_success',
    )
    
    # Dépendances
    check_freshness >> bronze_to_silver >> dbt_run >> dbt_test >> ge_validate >> notify_success
'''

print(airflow_dag)

Critères d’Évaluation

Critère Points Détail
Infrastructure /10 Docker Compose fonctionne, tous les services up
Producteurs Kafka /10 5 producteurs, late data + doublons simulés
Bronze (Append) /10 Données ingérées, partitionnées, checkpointing
Silver (MERGE INTO) /20 ⭐ Pattern foreachBatch + MERGE correct, dédup
Gold (dbt) /20 5 models, incremental, ref() correct
Tests dbt /10 Tests passent, couverture suffisante
Great Expectations /10 Suites créées, checkpoint fonctionne
Airflow DAG /5 DAG fonctionne, dépendances correctes
Documentation /5 README clair, schémas, instructions
TOTAL /100

Compétences Validées

En complétant ce projet, tu valides les compétences suivantes :

Module Compétence Appliquée dans
14-16 Python, environnements Producteurs Kafka
17 SQL Transformations dbt
18-20 PySpark DataFrame Jobs Spark
22 Airflow Orchestration DAG
23 Delta Lake, MERGE INTO Bronze → Silver
24 Kafka, Spark SSS, foreachBatch Ingestion streaming
25 dbt, Great Expectations Gold + Qualité

Extensions Possibles (Bonus)

Si tu as terminé le projet principal, voici des extensions pour aller plus loin :

Extension Description Difficulté
Monitoring Ajouter Prometheus + Grafana pour monitorer le pipeline ⭐⭐
Kubernetes Déployer sur K8s avec Spark Operator ⭐⭐⭐
ML Pipeline Ajouter un modèle de prédiction (churn, LTV) ⭐⭐
CDC Utiliser Debezium pour capturer les changes ⭐⭐
Data Catalog Intégrer DataHub ou Amundsen ⭐⭐⭐
Streamlit Créer un dashboard interactif

📝 Solution Complète

📂 Cliquer pour voir la solution complète

docker-compose.yml

version: '3.8'

services:
  # ═══════════════════════════════════════════════════════════════
  # KAFKA
  # ═══════════════════════════════════════════════════════════════
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    container_name: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092

  # ═══════════════════════════════════════════════════════════════
  # STORAGE (MinIO = S3 local)
  # ═══════════════════════════════════════════════════════════════
  minio:
    image: minio/minio
    container_name: minio
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio_data:/data

  # Créer le bucket au démarrage
  minio-setup:
    image: minio/mc
    depends_on:
      - minio
    entrypoint: >
      /bin/sh -c "
      sleep 5;
      mc alias set myminio http://minio:9000 minioadmin minioadmin;
      mc mb myminio/lakehouse --ignore-existing;
      exit 0;
      "

  # ═══════════════════════════════════════════════════════════════
  # SPARK
  # ═══════════════════════════════════════════════════════════════
  spark-master:
    image: bitnami/spark:3.5
    container_name: spark-master
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
    ports:
      - "8080:8080"
      - "7077:7077"
    volumes:
      - ./spark_jobs:/opt/spark_jobs
      - ./data:/opt/data

  spark-worker:
    image: bitnami/spark:3.5
    container_name: spark-worker
    depends_on:
      - spark-master
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2G
      - SPARK_WORKER_CORES=2
    volumes:
      - ./spark_jobs:/opt/spark_jobs
      - ./data:/opt/data

  # ═══════════════════════════════════════════════════════════════
  # AIRFLOW
  # ═══════════════════════════════════════════════════════════════
  postgres:
    image: postgres:15
    container_name: postgres
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.8.0
    container_name: airflow-webserver
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    ports:
      - "8082:8080"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./dbt_olist:/opt/dbt
    command: bash -c "airflow db init && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"

  airflow-scheduler:
    image: apache/airflow:2.8.0
    container_name: airflow-scheduler
    depends_on:
      - airflow-webserver
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
      - ./dbt_olist:/opt/dbt
    command: airflow scheduler

volumes:
  minio_data:
  postgres_data:

producers/orders_producer.py

from kafka import KafkaProducer
import pandas as pd
import json
import time
import random
from datetime import datetime, timedelta

# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
TOPIC = 'raw_orders'
CSV_PATH = 'data/olist_orders_dataset.csv'

# Producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Lire le CSV
df = pd.read_csv(CSV_PATH)
print(f"Loaded {len(df)} orders")

# Envoyer les messages
for idx, row in df.iterrows():
    message = row.to_dict()
    
    # Timestamp d'ingestion
    ingestion_ts = datetime.now()
    
    # Simuler late data (5%)
    if random.random() < 0.05:
        ingestion_ts -= timedelta(minutes=random.randint(1, 5))
    
    message['_ingestion_timestamp'] = ingestion_ts.isoformat()
    
    # Envoyer
    producer.send(
        topic=TOPIC,
        key=message['order_id'],
        value=message
    )
    
    # Simuler doublons (2%)
    if random.random() < 0.02:
        producer.send(topic=TOPIC, key=message['order_id'], value=message)
    
    # Log progress
    if idx % 1000 == 0:
        print(f"Sent {idx}/{len(df)} messages")
    
    # Délai aléatoire (simuler streaming)
    time.sleep(random.uniform(0.05, 0.2))

producer.flush()
print(f"Done! Sent {len(df)} messages to {TOPIC}")

spark_jobs/bronze/ingest_orders.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, to_date
from pyspark.sql.types import StructType, StructField, StringType

# Spark Session
spark = SparkSession.builder \
    .appName("Bronze - Ingest Orders") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "io.delta:delta-spark_2.12:3.1.0,"
            "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

# Schema des orders
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("order_status", StringType()),
    StructField("order_purchase_timestamp", StringType()),
    StructField("order_approved_at", StringType()),
    StructField("order_delivered_carrier_date", StringType()),
    StructField("order_delivered_customer_date", StringType()),
    StructField("order_estimated_delivery_date", StringType()),
    StructField("_ingestion_timestamp", StringType())
])

# Lire depuis Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "raw_orders") \
    .option("startingOffsets", "earliest") \
    .load()

# Parser le JSON
parsed_df = kafka_df \
    .selectExpr("CAST(value AS STRING) as json_value") \
    .select(from_json(col("json_value"), order_schema).alias("data")) \
    .select("data.*") \
    .withColumn("_bronze_ingested_at", current_timestamp()) \
    .withColumn("_ingestion_date", to_date(col("_ingestion_timestamp")))

# Écrire en Bronze (Append)
query = parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3a://lakehouse/checkpoints/bronze_orders") \
    .option("path", "s3a://lakehouse/bronze/orders") \
    .partitionBy("_ingestion_date") \
    .start()

query.awaitTermination()

spark_jobs/silver/silver_orders.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Spark Session (même config que Bronze)
spark = SparkSession.builder \
    .appName("Silver - Orders MERGE") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "io.delta:delta-spark_2.12:3.1.0,"
            "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

BRONZE_PATH = "s3a://lakehouse/bronze/orders"
SILVER_PATH = "s3a://lakehouse/silver/orders"
CHECKPOINT_PATH = "s3a://lakehouse/checkpoints/silver_orders"

def upsert_to_silver(batch_df, batch_id):
    """Upsert vers Silver avec déduplication."""
    
    if batch_df.count() == 0:
        print(f"Batch {batch_id}: No data")
        return
    
    # 1. Déduplication (garder le plus récent par order_id)
    window = Window.partitionBy("order_id").orderBy(col("_bronze_ingested_at").desc())
    deduped = batch_df \
        .withColumn("_row_num", row_number().over(window)) \
        .filter(col("_row_num") == 1) \
        .drop("_row_num")
    
    # 2. Ajouter timestamp Silver
    enriched = deduped.withColumn("_silver_updated_at", current_timestamp())
    
    # 3. MERGE INTO
    if DeltaTable.isDeltaTable(spark, SILVER_PATH):
        delta_table = DeltaTable.forPath(spark, SILVER_PATH)
        
        delta_table.alias("target").merge(
            enriched.alias("source"),
            "target.order_id = source.order_id"
        ).whenMatchedUpdate(
            condition="source._bronze_ingested_at > target._bronze_ingested_at",
            set={
                "order_status": "source.order_status",
                "order_delivered_carrier_date": "source.order_delivered_carrier_date",
                "order_delivered_customer_date": "source.order_delivered_customer_date",
                "_bronze_ingested_at": "source._bronze_ingested_at",
                "_silver_updated_at": "source._silver_updated_at"
            }
        ).whenNotMatchedInsertAll().execute()
        
        print(f"Batch {batch_id}: MERGED {enriched.count()} records")
    else:
        # Première exécution
        enriched.write.format("delta").mode("overwrite").save(SILVER_PATH)
        print(f"Batch {batch_id}: CREATED table with {enriched.count()} records")

# Lire Bronze en streaming
bronze_stream = spark.readStream \
    .format("delta") \
    .load(BRONZE_PATH)

# Écrire avec foreachBatch
query = bronze_stream.writeStream \
    .foreachBatch(upsert_to_silver) \
    .option("checkpointLocation", CHECKPOINT_PATH) \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

dbt_olist/models/gold/gold_daily_sales.sql

{{ config(
    materialized='incremental',
    unique_key='order_date',
    incremental_strategy='merge'
) }}

WITH orders AS (
    SELECT
        o.order_id,
        o.order_status,
        DATE(o.order_purchase_timestamp) AS order_date,
        oi.price,
        oi.freight_value
    FROM {{ ref('stg_orders') }} o
    JOIN {{ ref('stg_order_items') }} oi ON o.order_id = oi.order_id
    WHERE o.order_status = 'delivered'
    {% if is_incremental() %}
    AND DATE(o.order_purchase_timestamp) >= (
        SELECT MAX(order_date) - INTERVAL 2 DAY FROM {{ this }}
    )
    {% endif %}
)

SELECT
    order_date,
    COUNT(DISTINCT order_id) AS total_orders,
    SUM(price + freight_value) AS total_revenue,
    AVG(price + freight_value) AS avg_order_value,
    COUNT(*) AS total_items
FROM orders
GROUP BY order_date
ORDER BY order_date

dbt_olist/models/gold/gold_customer_rfm.sql

{{ config(materialized='table') }}

WITH customer_orders AS (
    SELECT
        c.customer_unique_id,
        o.order_id,
        o.order_purchase_timestamp,
        oi.price + oi.freight_value AS order_value
    FROM {{ ref('stg_customers') }} c
    JOIN {{ ref('stg_orders') }} o ON c.customer_id = o.customer_id
    JOIN {{ ref('stg_order_items') }} oi ON o.order_id = oi.order_id
    WHERE o.order_status = 'delivered'
),

rfm_base AS (
    SELECT
        customer_unique_id,
        DATEDIFF(day, MAX(order_purchase_timestamp), CURRENT_DATE) AS recency_days,
        COUNT(DISTINCT order_id) AS frequency,
        SUM(order_value) AS monetary
    FROM customer_orders
    GROUP BY customer_unique_id
)

SELECT
    customer_unique_id,
    recency_days,
    frequency,
    ROUND(monetary, 2) AS monetary,
    {{ rfm_segment('recency_days', 'frequency', 'monetary') }} AS rfm_segment
FROM rfm_base

dbt_olist/models/gold/_gold__models.yml

version: 2

models:
  - name: gold_daily_sales
    description: "Chiffre d'affaires quotidien"
    columns:
      - name: order_date
        tests:
          - unique
          - not_null
      - name: total_revenue
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0

  - name: gold_customer_rfm
    description: "Segmentation RFM des clients"
    columns:
      - name: customer_unique_id
        tests:
          - unique
          - not_null
      - name: rfm_segment
        tests:
          - accepted_values:
              values: ['Champions', 'Loyal Customers', 'Potential Loyalists', 'At Risk', 'Hibernating', 'Others']

  - name: gold_seller_performance
    description: "Performance des vendeurs"
    columns:
      - name: seller_id
        tests:
          - unique
          - not_null

  - name: gold_product_analytics
    description: "Analyse des produits par catégorie"
    columns:
      - name: product_category
        tests:
          - unique
          - not_null

  - name: gold_delivery_performance
    description: "Performance des livraisons"
    columns:
      - name: on_time_rate
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 1

🎉 Félicitations !

En complétant ce projet, tu as construit un pipeline de données complet utilisable en production.

Tu maîtrises maintenant :

  • ✅ L’ingestion temps réel avec Kafka
  • ✅ Le traitement streaming avec Spark Structured Streaming
  • ✅ L’architecture Lakehouse avec Delta Lake
  • ✅ Le pattern MERGE INTO pour les upserts
  • ✅ La modélisation analytique avec dbt
  • ✅ La validation de qualité avec Great Expectations
  • ✅ L’orchestration avec Airflow

📚 Ressources


🚀 Et Maintenant ?

Ce projet constitue une base solide pour ton portfolio. Tu peux :

  1. Le déployer sur le cloud (AWS, GCP, Azure)
  2. Ajouter du monitoring (Prometheus + Grafana)
  3. Intégrer du ML (prédiction de churn, recommandations)
  4. Le présenter en entretien comme preuve de tes compétences

Bonne chance pour la suite de ton parcours Data Engineering ! 🎓

Retour au sommet