Synthèse de Toutes les Compétences Data Engineering
Contexte
Tu viens de rejoindre l’équipe Data Engineering d’Olist , la plus grande plateforme e-commerce du Brésil. Olist connecte des petits commerçants aux grandes marketplaces comme Amazon, Mercado Libre, etc.
Actuellement, les données sont stockées dans des fichiers CSV et analysées manuellement par l’équipe BI. Ton manager te confie une mission critique :
“Nous avons besoin d’une architecture Lakehouse moderne. Tu dois construire un pipeline complet qui ingère nos données en temps réel, les transforme, et les met à disposition pour les dashboards analytiques.”
Ta Mission
Construire un Data Pipeline complet de bout en bout :
CSV (Kaggle) → Kafka → Spark SSS → Delta Lake (Bronze/Silver) → dbt (Gold) → Dashboard-ready
Dataset
Brazilian E-Commerce Public Dataset by Olist
🔗 Kaggle : https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce
Ce dataset contient ~100 000 commandes réelles (anonymisées) passées sur Olist entre 2016 et 2018.
Tables disponibles
olist_orders_dataset.csv
Commandes
~100K
olist_order_items_dataset.csv
Lignes de commande
~113K
olist_customers_dataset.csv
Clients
~100K
olist_products_dataset.csv
Produits
~33K
olist_sellers_dataset.csv
Vendeurs
~3K
olist_order_payments_dataset.csv
Paiements
~104K
olist_order_reviews_dataset.csv
Avis clients
~100K
olist_geolocation_dataset.csv
Géolocalisation
~1M
product_category_name_translation.csv
Traduction catégories
~71
Schéma relationnel
┌──────────────────┐
│ customers │
│ customer_id (PK)│
└────────┬─────────┘
│
│ 1:N
▼
┌──────────────┐ 1:N ┌──────────────────┐ N:1 ┌──────────────┐
│ sellers │◄───────────│ orders │────────────▶│ payments │
│ seller_id(PK)│ │ order_id (PK) │ │ │
└──────────────┘ │ customer_id(FK) │ └──────────────┘
▲ └────────┬─────────┘
│ │
│ N:1 │ 1:N
│ ▼
┌──────────────┐ N:1 ┌──────────────────┐ 1:N ┌──────────────┐
│ products │◄───────────│ order_items │────────────▶│ reviews │
│product_id(PK)│ │ order_id (FK) │ │ │
└──────────────┘ │ product_id (FK) │ └──────────────┘
│ seller_id (FK) │
└──────────────────┘
Architecture Cible
┌─────────────────────────────────────────────────────────────────────────────────┐
│ PIPELINE E-COMMERCE OLIST │
│ │
│ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ CSV Files │ │ KAFKA │ │ BRONZE │ │ SILVER │ │
│ │ (Kaggle) │───▶│ Topics │───▶│ (Delta) │───▶│ (Delta) │ │
│ │ │ │ │ │ │ │ │ │
│ │ • orders │ │ • raw_orders│ │ Append │ │ MERGE INTO │ │
│ │ • items │ │ • raw_items │ │ Raw data │ │ Deduplicated│ │
│ │ • customers │ │ • raw_custs │ │ Partitioned│ │ Enriched │ │
│ │ • products │ │ • raw_prods │ │ │ │ Validated │ │
│ │ • sellers │ │ │ │ │ │ │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ │ Spark SSS Spark SSS Spark SSS │
│ │ + foreachBatch │
│ │ │
│ ▼ │ │
│ ┌──────────────┐ ▼ │
│ │ Producers │ ┌─────────────────┐ │
│ │ (Python) │ │ GOLD │ │
│ │ │ │ (dbt) │ │
│ │ Simulate │ │ │ │
│ │ streaming │ │ • daily_sales │ │
│ └──────────────┘ │ • seller_perf │ │
│ │ • customer_rfm │ │
│ ┌─────────────────────────────────────────────┐ │ • product_stats │ │
│ │ ORCHESTRATION (Airflow) │ │ • delivery_perf │ │
│ │ │ └─────────────────┘ │
│ │ [Producers] → [Bronze] → [Silver] → [dbt] → [GE Validation] │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ DATA QUALITY (Great Expectations) │ │
│ │ • Bronze: schema, completeness │ │
│ │ • Silver: business rules, uniqueness │ │
│ │ • Gold: consistency, freshness │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘
Livrables Attendus
Tu dois produire les livrables suivants :
olist_pipeline/
│
├── docker-compose.yml # 1. Infrastructure complète
│
├── producers/ # 2. Producteurs Kafka
│ ├── orders_producer.py
│ ├── items_producer.py
│ ├── customers_producer.py
│ ├── products_producer.py
│ └── sellers_producer.py
│
├── spark_jobs/ # 3 & 4. Jobs Spark
│ ├── bronze/
│ │ ├── ingest_orders.py
│ │ ├── ingest_items.py
│ │ └── ingest_customers.py
│ └── silver/
│ ├── silver_orders.py # Avec MERGE INTO
│ ├── silver_customers.py
│ └── silver_order_items.py
│
├── dbt_olist/ # 5. Projet dbt
│ ├── dbt_project.yml
│ ├── models/
│ │ ├── staging/
│ │ ├── intermediate/
│ │ └── gold/
│ └── tests/
│
├── great_expectations/ # 6. Data Quality
│ └── expectations/
│
├── dags/ # 7. Airflow DAGs
│ └── olist_pipeline_dag.py
│
└── README.md # 8. Documentation
Tables Gold Attendues
Tu dois créer 5 models Gold dans dbt :
1. gold_daily_sales
Chiffre d’affaires quotidien.
order_date
DATE
Date de commande
total_orders
INT
Nombre de commandes
total_revenue
DECIMAL
CA total
avg_order_value
DECIMAL
Panier moyen
total_items
INT
Nombre d’articles vendus
3. gold_customer_rfm
Segmentation RFM (Recency, Frequency, Monetary).
customer_unique_id
STRING
ID client unique
recency_days
INT
Jours depuis dernière commande
frequency
INT
Nombre de commandes
monetary
DECIMAL
Total dépensé
rfm_segment
STRING
Segment (Champions, At Risk, etc.)
4. gold_product_analytics
Performance des produits.
product_category
STRING
Catégorie (EN)
total_sold
INT
Quantité vendue
total_revenue
DECIMAL
CA
avg_price
DECIMAL
Prix moyen
avg_review_score
DECIMAL
Note moyenne
Spécifications Techniques
1. Infrastructure (Docker Compose)
Services requis :
zookeeper
confluentinc/cp-zookeeper:7.5.0
2181
Coordination Kafka
kafka
confluentinc/cp-kafka:7.5.0
9092, 29092
Message broker
schema-registry
confluentinc/cp-schema-registry:7.5.0
8081
Gestion schémas
minio
minio/minio
9000, 9001
S3 local (Delta Lake)
spark-master
bitnami/spark:3.5
8080, 7077
Spark Master
spark-worker
bitnami/spark:3.5
8081
Spark Worker
postgres
postgres:15
5432
Métadonnées Airflow
airflow-webserver
apache/airflow:2.8.0
8082
UI Airflow
airflow-scheduler
apache/airflow:2.8.0
-
Scheduler
2. Producteurs Kafka
Chaque producteur doit :
Lire le fichier CSV correspondant
Envoyer les lignes une par une vers Kafka (simuler du streaming)
Ajouter un délai aléatoire (50-200ms) entre les messages
Simuler du late data : 5% des messages avec un timestamp décalé de 1-5 minutes
Simuler des doublons : 2% des messages envoyés 2 fois
Topics Kafka : - raw_orders - raw_order_items - raw_customers - raw_products - raw_sellers
Format des messages : JSON
{
"order_id" : "e481f51cbdc54678b7cc49136f2d6af7" ,
"customer_id" : "9ef432eb6251297304e76186b10a928d" ,
"order_status" : "delivered" ,
"order_purchase_timestamp" : "2017-10-02 10:56:33" ,
"_ingestion_timestamp" : "2024-01-15T10:30:00Z"
}
3. Couche Bronze (Spark SSS → Delta)
Mode : Append (données brutes, pas de transformation)
Chaque job Bronze doit :
Lire depuis le topic Kafka correspondant
Parser le JSON
Ajouter une colonne _bronze_ingested_at (timestamp d’ingestion)
Écrire en append dans Delta Lake
Partitionner par date d’ingestion (_ingestion_date)
Configurer le checkpointing
Chemins Delta :
s3a://lakehouse/bronze/orders/
s3a://lakehouse/bronze/order_items/
s3a://lakehouse/bronze/customers/
s3a://lakehouse/bronze/products/
s3a://lakehouse/bronze/sellers/
4. Couche Silver (Spark SSS + MERGE INTO) ⭐
Mode : foreachBatch + MERGE INTO (upserts, déduplication)
C’est l’étape clé du projet. Chaque job Silver doit :
Lire depuis Bronze (streaming ou batch)
Dédupliquer sur la clé primaire (garder le plus récent)
Valider les données (filtrer les invalides)
Enrichir si nécessaire (jointures)
MERGE INTO Delta Lake Silver
Pattern à utiliser :
def upsert_to_silver(batch_df, batch_id):
# 1. Déduplication
deduped = batch_df.dropDuplicates(["order_id" ])
# 2. MERGE INTO
delta_table = DeltaTable.forPath(spark, "s3a://lakehouse/silver/orders" )
delta_table.alias("target" ).merge(
deduped.alias("source" ),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition= "source._bronze_ingested_at > target._bronze_ingested_at" ,
set = {...}
).whenNotMatchedInsertAll().execute()
bronze_stream.writeStream \
.foreachBatch(upsert_to_silver) \
.option("checkpointLocation" , "/checkpoints/silver_orders" ) \
.start()
Tables Silver :
silver_orders
order_id
+ customer info
silver_order_items
order_id + product_id + seller_id
+ product info
silver_customers
customer_id
Dédup sur customer_unique_id
silver_products
product_id
+ category translation
silver_sellers
seller_id
-
5. Couche Gold (dbt)
Structure du projet dbt :
dbt_olist/
├── dbt_project.yml
├── packages.yml # dbt-utils, dbt-expectations
│
├── models/
│ ├── staging/ # Vues sur Silver
│ │ ├── _sources.yml
│ │ ├── stg_orders.sql
│ │ ├── stg_order_items.sql
│ │ ├── stg_customers.sql
│ │ ├── stg_products.sql
│ │ └── stg_sellers.sql
│ │
│ ├── intermediate/ # Transformations métier
│ │ ├── int_orders_enriched.sql
│ │ └── int_order_items_enriched.sql
│ │
│ └── gold/ # Tables analytiques
│ ├── _gold__models.yml # Tests + docs
│ ├── gold_daily_sales.sql
│ ├── gold_seller_performance.sql
│ ├── gold_customer_rfm.sql
│ ├── gold_product_analytics.sql
│ └── gold_delivery_performance.sql
│
├── macros/
│ └── rfm_segment.sql # Macro pour segmentation RFM
│
└── tests/
└── assert_positive_revenue.sql
Matérialisations : - staging/ : view - intermediate/ : ephemeral ou view - gold/ : incremental (avec unique_key)
6. Data Quality (Great Expectations)
Créer des suites d’expectations pour chaque couche :
Suite Bronze : - Schema validation (colonnes présentes) - expect_column_values_to_not_be_null sur les IDs
Suite Silver : - expect_column_values_to_be_unique sur les clés primaires - expect_column_values_to_be_between sur les montants (0 - 100000) - expect_column_values_to_be_in_set sur les statuts
Suite Gold : - expect_column_values_to_be_between sur les métriques - expect_table_row_count_to_be_between (freshness check) - Tests de cohérence (total Gold = total Silver)
7. Orchestration (Airflow)
DAG principal : olist_pipeline
┌─────────────────┐
│ check_freshness │ Vérifier que les données arrivent
└────────┬────────┘
│
▼
┌─────────────────┐
│ bronze_to_silver│ Spark job (MERGE INTO)
└────────┬────────┘
│
▼
┌─────────────────┐
│ dbt_run │ dbt run --select gold
└────────┬────────┘
│
▼
┌─────────────────┐
│ dbt_test │ dbt test
└────────┬────────┘
│
▼
┌─────────────────┐
│ ge_validate │ Great Expectations checkpoint
└────────┬────────┘
│
┌────┴────┐
▼ ▼
┌───────┐ ┌───────┐
│notify │ │notify │
│success│ │failure│
└───────┘ └───────┘
Configuration : - Schedule : 0 6 * * * (tous les jours à 6h) - Retries : 2 - Alertes : Email ou Slack on_failure
💡 Hints & Ressources
Rappels des patterns clés
24
readStream.format("kafka")
Bronze ingestion
24
foreachBatch
Silver MERGE
23
DeltaTable.forPath().merge()
Silver MERGE
23
whenMatchedUpdate / whenNotMatchedInsert
Silver MERGE
25
{ ref('...') }
dbt models
25
{ config(materialized='incremental') }
Gold models
25
is_incremental()
Gold models
22
BashOperator
Airflow tasks
Voir le code
# Hint 1 : Structure du producteur Kafka
producer_template = '''
from kafka import KafkaProducer
import pandas as pd
import json
import time
import random
from datetime import datetime, timedelta
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
df = pd.read_csv('data/olist_orders_dataset.csv')
for idx, row in df.iterrows():
message = row.to_dict()
# Ajouter timestamp d'ingestion
ingestion_ts = datetime.now()
# Simuler late data (5%)
if random.random() < 0.05:
ingestion_ts -= timedelta(minutes=random.randint(1, 5))
message['_ingestion_timestamp'] = ingestion_ts.isoformat()
producer.send('raw_orders', value=message)
# Simuler doublons (2%)
if random.random() < 0.02:
producer.send('raw_orders', value=message)
# Délai aléatoire
time.sleep(random.uniform(0.05, 0.2))
producer.flush()
'''
print (producer_template)
Voir le code
# Hint 2 : Pattern MERGE INTO pour Silver
merge_pattern = '''
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp
def upsert_orders_to_silver(batch_df, batch_id):
"""Upsert orders vers Silver avec déduplication."""
if batch_df.count() == 0:
return
# 1. Déduplication (garder le plus récent)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window = Window.partitionBy("order_id").orderBy(col("_ingestion_timestamp").desc())
deduped = batch_df.withColumn("_row_num", row_number().over(window)) \
.filter(col("_row_num") == 1) \
.drop("_row_num")
# 2. Ajouter timestamp Silver
enriched = deduped.withColumn("_silver_updated_at", current_timestamp())
# 3. MERGE INTO
silver_path = "s3a://lakehouse/silver/orders"
if DeltaTable.isDeltaTable(spark, silver_path):
delta_table = DeltaTable.forPath(spark, silver_path)
delta_table.alias("target").merge(
enriched.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source._ingestion_timestamp > target._ingestion_timestamp",
set={
"order_status": "source.order_status",
"order_delivered_customer_date": "source.order_delivered_customer_date",
"_ingestion_timestamp": "source._ingestion_timestamp",
"_silver_updated_at": "source._silver_updated_at"
}
).whenNotMatchedInsertAll().execute()
else:
# Première exécution : créer la table
enriched.write.format("delta").mode("overwrite").save(silver_path)
print(f"Batch {batch_id} : {enriched.count()} records merged to Silver")
'''
print (merge_pattern)
Voir le code
# Hint 3 : Model dbt incremental
dbt_incremental = '''
-- models/gold/gold_daily_sales.sql
{{ config(
materialized='incremental',
unique_key='order_date',
incremental_strategy='merge'
) }}
WITH orders AS (
SELECT * FROM {{ ref('int_orders_enriched') }}
{ % i f is_incremental() %}
WHERE DATE(order_purchase_timestamp) >= (
SELECT MAX(order_date) - INTERVAL 2 DAY FROM {{ this }}
)
{ % e ndif %}
),
daily_agg AS (
SELECT
DATE(order_purchase_timestamp) AS order_date,
COUNT(DISTINCT order_id) AS total_orders,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value,
SUM(total_items) AS total_items
FROM orders
WHERE order_status = 'delivered'
GROUP BY DATE(order_purchase_timestamp)
)
SELECT * FROM daily_agg
'''
print (dbt_incremental)
Voir le code
# Hint 4 : Macro RFM pour dbt
rfm_macro = '''
-- macros/rfm_segment.sql
{% macro rfm_segment(recency, frequency, monetary) %}
CASE
-- Champions : Récent, Fréquent, Gros dépensier
WHEN {{ recency }} <= 30 AND {{ frequency }} >= 3 AND {{ monetary }} >= 500 THEN 'Champions'
-- Loyal Customers : Fréquent
WHEN {{ frequency }} >= 3 THEN 'Loyal Customers'
-- Potential Loyalists : Récent, pas encore fréquent
WHEN {{ recency }} <= 30 AND {{ frequency }} < 3 THEN 'Potential Loyalists'
-- At Risk : Pas récent mais était fréquent
WHEN {{ recency }} > 90 AND {{ frequency }} >= 2 THEN 'At Risk'
-- Hibernating : Pas récent, peu fréquent
WHEN {{ recency }} > 90 THEN 'Hibernating'
-- Others
ELSE 'Others'
END
{ % e ndmacro %}
-- Utilisation dans gold_customer_rfm.sql :
-- SELECT
-- customer_unique_id,
-- recency_days,
-- frequency,
-- monetary,
-- {{ rfm_segment('recency_days', 'frequency', 'monetary') }} AS rfm_segment
-- FROM rfm_base
'''
print (rfm_macro)
Voir le code
# Hint 5 : DAG Airflow
airflow_dag = '''
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['data-team@olist.com'],
}
with DAG(
dag_id='olist_pipeline',
default_args=default_args,
description='Pipeline E-commerce Olist',
schedule_interval='0 6 * * *',
start_date=days_ago(1),
catchup=False,
tags=['olist', 'lakehouse'],
) as dag:
# 1. Vérifier la fraîcheur des sources
check_freshness = BashOperator(
task_id='check_freshness',
bash_command='cd /opt/dbt && dbt source freshness',
)
# 2. Spark : Bronze → Silver
bronze_to_silver = BashOperator(
task_id='bronze_to_silver',
bash_command='spark-submit /opt/spark_jobs/silver/run_all_silver.py',
)
# 3. dbt run
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /opt/dbt && dbt run --select gold',
)
# 4. dbt test
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test --select gold',
)
# 5. Great Expectations
ge_validate = BashOperator(
task_id='ge_validate',
bash_command='great_expectations checkpoint run gold_checkpoint',
)
# 6. Notification succès
notify_success = BashOperator(
task_id='notify_success',
bash_command='echo "Pipeline completed successfully!"',
trigger_rule='all_success',
)
# Dépendances
check_freshness >> bronze_to_silver >> dbt_run >> dbt_test >> ge_validate >> notify_success
'''
print (airflow_dag)
Critères d’Évaluation
Infrastructure
/10
Docker Compose fonctionne, tous les services up
Producteurs Kafka
/10
5 producteurs, late data + doublons simulés
Bronze (Append)
/10
Données ingérées, partitionnées, checkpointing
Silver (MERGE INTO)
/20
⭐ Pattern foreachBatch + MERGE correct, dédup
Gold (dbt)
/20
5 models, incremental, ref() correct
Tests dbt
/10
Tests passent, couverture suffisante
Great Expectations
/10
Suites créées, checkpoint fonctionne
Airflow DAG
/5
DAG fonctionne, dépendances correctes
Documentation
/5
README clair, schémas, instructions
TOTAL
/100
Compétences Validées
En complétant ce projet, tu valides les compétences suivantes :
14-16
Python, environnements
Producteurs Kafka
☐
17
SQL
Transformations dbt
☐
18-20
PySpark DataFrame
Jobs Spark
☐
22
Airflow
Orchestration DAG
☐
23
Delta Lake, MERGE INTO
Bronze → Silver
☐
24
Kafka, Spark SSS, foreachBatch
Ingestion streaming
☐
25
dbt, Great Expectations
Gold + Qualité
☐
Extensions Possibles (Bonus)
Si tu as terminé le projet principal, voici des extensions pour aller plus loin :
Monitoring
Ajouter Prometheus + Grafana pour monitorer le pipeline
⭐⭐
Kubernetes
Déployer sur K8s avec Spark Operator
⭐⭐⭐
ML Pipeline
Ajouter un modèle de prédiction (churn, LTV)
⭐⭐
CDC
Utiliser Debezium pour capturer les changes
⭐⭐
Data Catalog
Intégrer DataHub ou Amundsen
⭐⭐⭐
Streamlit
Créer un dashboard interactif
⭐
📝 Solution Complète
📂 Cliquer pour voir la solution complète
docker-compose.yml
version : '3.8'
services :
# ═══════════════════════════════════════════════════════════════
# KAFKA
# ═══════════════════════════════════════════════════════════════
zookeeper :
image : confluentinc/cp-zookeeper:7.5.0
container_name : zookeeper
environment :
ZOOKEEPER_CLIENT_PORT : 2181
ports :
- "2181:2181"
kafka :
image : confluentinc/cp-kafka:7.5.0
container_name : kafka
depends_on :
- zookeeper
ports :
- "9092:9092"
- "29092:29092"
environment :
KAFKA_BROKER_ID : 1
KAFKA_ZOOKEEPER_CONNECT : zookeeper:2181
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE : "true"
schema-registry :
image : confluentinc/cp-schema-registry:7.5.0
container_name : schema-registry
depends_on :
- kafka
ports :
- "8081:8081"
environment :
SCHEMA_REGISTRY_HOST_NAME : schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS : kafka:29092
# ═══════════════════════════════════════════════════════════════
# STORAGE (MinIO = S3 local)
# ═══════════════════════════════════════════════════════════════
minio :
image : minio/minio
container_name : minio
ports :
- "9000:9000"
- "9001:9001"
environment :
MINIO_ROOT_USER : minioadmin
MINIO_ROOT_PASSWORD : minioadmin
command : server /data --console-address ":9001"
volumes :
- minio_data:/data
# Créer le bucket au démarrage
minio-setup :
image : minio/mc
depends_on :
- minio
entrypoint : >
/bin/sh -c "
sleep 5;
mc alias set myminio http://minio:9000 minioadmin minioadmin;
mc mb myminio/lakehouse --ignore-existing;
exit 0;
"
# ═══════════════════════════════════════════════════════════════
# SPARK
# ═══════════════════════════════════════════════════════════════
spark-master :
image : bitnami/spark:3.5
container_name : spark-master
environment :
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
ports :
- "8080:8080"
- "7077:7077"
volumes :
- ./spark_jobs:/opt/spark_jobs
- ./data:/opt/data
spark-worker :
image : bitnami/spark:3.5
container_name : spark-worker
depends_on :
- spark-master
environment :
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=2
volumes :
- ./spark_jobs:/opt/spark_jobs
- ./data:/opt/data
# ═══════════════════════════════════════════════════════════════
# AIRFLOW
# ═══════════════════════════════════════════════════════════════
postgres :
image : postgres:15
container_name : postgres
environment :
POSTGRES_USER : airflow
POSTGRES_PASSWORD : airflow
POSTGRES_DB : airflow
ports :
- "5432:5432"
volumes :
- postgres_data:/var/lib/postgresql/data
airflow-webserver :
image : apache/airflow:2.8.0
container_name : airflow-webserver
depends_on :
- postgres
environment :
AIRFLOW__CORE__EXECUTOR : LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN : postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY : ''
AIRFLOW__CORE__LOAD_EXAMPLES : 'false'
ports :
- "8082:8080"
volumes :
- ./dags:/opt/airflow/dags
- ./dbt_olist:/opt/dbt
command : bash -c "airflow db init && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow webserver"
airflow-scheduler :
image : apache/airflow:2.8.0
container_name : airflow-scheduler
depends_on :
- airflow-webserver
environment :
AIRFLOW__CORE__EXECUTOR : LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN : postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes :
- ./dags:/opt/airflow/dags
- ./dbt_olist:/opt/dbt
command : airflow scheduler
volumes :
minio_data :
postgres_data :
producers/orders_producer.py
from kafka import KafkaProducer
import pandas as pd
import json
import time
import random
from datetime import datetime, timedelta
# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092' ]
TOPIC = 'raw_orders'
CSV_PATH = 'data/olist_orders_dataset.csv'
# Producteur Kafka
producer = KafkaProducer(
bootstrap_servers= KAFKA_BOOTSTRAP_SERVERS,
value_serializer= lambda v: json.dumps(v, default= str ).encode('utf-8' ),
key_serializer= lambda k: k.encode('utf-8' ) if k else None
)
# Lire le CSV
df = pd.read_csv(CSV_PATH)
print (f"Loaded { len (df)} orders" )
# Envoyer les messages
for idx, row in df.iterrows():
message = row.to_dict()
# Timestamp d'ingestion
ingestion_ts = datetime.now()
# Simuler late data (5%)
if random.random() < 0.05 :
ingestion_ts -= timedelta(minutes= random.randint(1 , 5 ))
message['_ingestion_timestamp' ] = ingestion_ts.isoformat()
# Envoyer
producer.send(
topic= TOPIC,
key= message['order_id' ],
value= message
)
# Simuler doublons (2%)
if random.random() < 0.02 :
producer.send(topic= TOPIC, key= message['order_id' ], value= message)
# Log progress
if idx % 1000 == 0 :
print (f"Sent { idx} / { len (df)} messages" )
# Délai aléatoire (simuler streaming)
time.sleep(random.uniform(0.05 , 0.2 ))
producer.flush()
print (f"Done! Sent { len (df)} messages to { TOPIC} " )
spark_jobs/bronze/ingest_orders.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, to_date
from pyspark.sql.types import StructType, StructField, StringType
# Spark Session
spark = SparkSession.builder \
.appName("Bronze - Ingest Orders" ) \
.config("spark.jars.packages" ,
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
"io.delta:delta-spark_2.12:3.1.0,"
"org.apache.hadoop:hadoop-aws:3.3.4" ) \
.config("spark.sql.extensions" , "io.delta.sql.DeltaSparkSessionExtension" ) \
.config("spark.sql.catalog.spark_catalog" , "org.apache.spark.sql.delta.catalog.DeltaCatalog" ) \
.config("spark.hadoop.fs.s3a.endpoint" , "http://minio:9000" ) \
.config("spark.hadoop.fs.s3a.access.key" , "minioadmin" ) \
.config("spark.hadoop.fs.s3a.secret.key" , "minioadmin" ) \
.config("spark.hadoop.fs.s3a.path.style.access" , "true" ) \
.getOrCreate()
# Schema des orders
order_schema = StructType([
StructField("order_id" , StringType()),
StructField("customer_id" , StringType()),
StructField("order_status" , StringType()),
StructField("order_purchase_timestamp" , StringType()),
StructField("order_approved_at" , StringType()),
StructField("order_delivered_carrier_date" , StringType()),
StructField("order_delivered_customer_date" , StringType()),
StructField("order_estimated_delivery_date" , StringType()),
StructField("_ingestion_timestamp" , StringType())
])
# Lire depuis Kafka
kafka_df = spark.readStream \
.format ("kafka" ) \
.option("kafka.bootstrap.servers" , "kafka:29092" ) \
.option("subscribe" , "raw_orders" ) \
.option("startingOffsets" , "earliest" ) \
.load()
# Parser le JSON
parsed_df = kafka_df \
.selectExpr("CAST(value AS STRING) as json_value" ) \
.select(from_json(col("json_value" ), order_schema).alias("data" )) \
.select("data.*" ) \
.withColumn("_bronze_ingested_at" , current_timestamp()) \
.withColumn("_ingestion_date" , to_date(col("_ingestion_timestamp" )))
# Écrire en Bronze (Append)
query = parsed_df.writeStream \
.format ("delta" ) \
.outputMode("append" ) \
.option("checkpointLocation" , "s3a://lakehouse/checkpoints/bronze_orders" ) \
.option("path" , "s3a://lakehouse/bronze/orders" ) \
.partitionBy("_ingestion_date" ) \
.start()
query.awaitTermination()
spark_jobs/silver/silver_orders.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable
# Spark Session (même config que Bronze)
spark = SparkSession.builder \
.appName("Silver - Orders MERGE" ) \
.config("spark.jars.packages" ,
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
"io.delta:delta-spark_2.12:3.1.0,"
"org.apache.hadoop:hadoop-aws:3.3.4" ) \
.config("spark.sql.extensions" , "io.delta.sql.DeltaSparkSessionExtension" ) \
.config("spark.sql.catalog.spark_catalog" , "org.apache.spark.sql.delta.catalog.DeltaCatalog" ) \
.config("spark.hadoop.fs.s3a.endpoint" , "http://minio:9000" ) \
.config("spark.hadoop.fs.s3a.access.key" , "minioadmin" ) \
.config("spark.hadoop.fs.s3a.secret.key" , "minioadmin" ) \
.config("spark.hadoop.fs.s3a.path.style.access" , "true" ) \
.getOrCreate()
BRONZE_PATH = "s3a://lakehouse/bronze/orders"
SILVER_PATH = "s3a://lakehouse/silver/orders"
CHECKPOINT_PATH = "s3a://lakehouse/checkpoints/silver_orders"
def upsert_to_silver(batch_df, batch_id):
"""Upsert vers Silver avec déduplication."""
if batch_df.count() == 0 :
print (f"Batch { batch_id} : No data" )
return
# 1. Déduplication (garder le plus récent par order_id)
window = Window.partitionBy("order_id" ).orderBy(col("_bronze_ingested_at" ).desc())
deduped = batch_df \
.withColumn("_row_num" , row_number().over(window)) \
.filter (col("_row_num" ) == 1 ) \
.drop("_row_num" )
# 2. Ajouter timestamp Silver
enriched = deduped.withColumn("_silver_updated_at" , current_timestamp())
# 3. MERGE INTO
if DeltaTable.isDeltaTable(spark, SILVER_PATH):
delta_table = DeltaTable.forPath(spark, SILVER_PATH)
delta_table.alias("target" ).merge(
enriched.alias("source" ),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition= "source._bronze_ingested_at > target._bronze_ingested_at" ,
set = {
"order_status" : "source.order_status" ,
"order_delivered_carrier_date" : "source.order_delivered_carrier_date" ,
"order_delivered_customer_date" : "source.order_delivered_customer_date" ,
"_bronze_ingested_at" : "source._bronze_ingested_at" ,
"_silver_updated_at" : "source._silver_updated_at"
}
).whenNotMatchedInsertAll().execute()
print (f"Batch { batch_id} : MERGED { enriched. count()} records" )
else :
# Première exécution
enriched.write.format ("delta" ).mode("overwrite" ).save(SILVER_PATH)
print (f"Batch { batch_id} : CREATED table with { enriched. count()} records" )
# Lire Bronze en streaming
bronze_stream = spark.readStream \
.format ("delta" ) \
.load(BRONZE_PATH)
# Écrire avec foreachBatch
query = bronze_stream.writeStream \
.foreachBatch(upsert_to_silver) \
.option("checkpointLocation" , CHECKPOINT_PATH) \
.trigger(processingTime= "30 seconds" ) \
.start()
query.awaitTermination()
dbt_olist/models/gold/gold_daily_sales.sql
{{ config(
materialized = 'incremental' ,
unique_key= 'order_date' ,
incremental_strategy= 'merge'
) }}
WITH orders AS (
SELECT
o.order_id,
o.order_status,
DATE (o.order_purchase_timestamp) AS order_date,
oi.price,
oi.freight_value
FROM {{ ref ('stg_orders' ) }} o
JOIN {{ ref ('stg_order_items' ) }} oi ON o.order_id = oi.order_id
WHERE o.order_status = 'delivered'
{% if is_incremental() %}
AND DATE (o.order_purchase_timestamp) >= (
SELECT MAX (order_date) - INTERVAL 2 DAY FROM {{ this }}
)
{% endif %}
)
SELECT
order_date,
COUNT (DISTINCT order_id) AS total_orders,
SUM (price + freight_value) AS total_revenue,
AVG (price + freight_value) AS avg_order_value,
COUNT (* ) AS total_items
FROM orders
GROUP BY order_date
ORDER BY order_date
dbt_olist/models/gold/gold_customer_rfm.sql
{{ config(materialized = 'table' ) }}
WITH customer_orders AS (
SELECT
c.customer_unique_id,
o.order_id,
o.order_purchase_timestamp,
oi.price + oi.freight_value AS order_value
FROM {{ ref ('stg_customers' ) }} c
JOIN {{ ref ('stg_orders' ) }} o ON c.customer_id = o.customer_id
JOIN {{ ref ('stg_order_items' ) }} oi ON o.order_id = oi.order_id
WHERE o.order_status = 'delivered'
),
rfm_base AS (
SELECT
customer_unique_id,
DATEDIFF(day , MAX (order_purchase_timestamp), CURRENT_DATE ) AS recency_days,
COUNT (DISTINCT order_id) AS frequency,
SUM (order_value) AS monetary
FROM customer_orders
GROUP BY customer_unique_id
)
SELECT
customer_unique_id,
recency_days,
frequency,
ROUND (monetary, 2 ) AS monetary,
{{ rfm_segment('recency_days' , 'frequency' , 'monetary' ) }} AS rfm_segment
FROM rfm_base
dbt_olist/models/gold/_gold__models.yml
version : 2
models :
- name : gold_daily_sales
description : "Chiffre d'affaires quotidien"
columns :
- name : order_date
tests :
- unique
- not_null
- name : total_revenue
tests :
- not_null
- dbt_expectations.expect_column_values_to_be_between :
min_value : 0
- name : gold_customer_rfm
description : "Segmentation RFM des clients"
columns :
- name : customer_unique_id
tests :
- unique
- not_null
- name : rfm_segment
tests :
- accepted_values :
values : [ 'Champions' , 'Loyal Customers' , 'Potential Loyalists' , 'At Risk' , 'Hibernating' , 'Others' ]
- name : gold_seller_performance
description : "Performance des vendeurs"
columns :
- name : seller_id
tests :
- unique
- not_null
- name : gold_product_analytics
description : "Analyse des produits par catégorie"
columns :
- name : product_category
tests :
- unique
- not_null
- name : gold_delivery_performance
description : "Performance des livraisons"
columns :
- name : on_time_rate
tests :
- dbt_expectations.expect_column_values_to_be_between :
min_value : 0
max_value : 1
🎉 Félicitations !
En complétant ce projet, tu as construit un pipeline de données complet utilisable en production.
Tu maîtrises maintenant :
✅ L’ingestion temps réel avec Kafka
✅ Le traitement streaming avec Spark Structured Streaming
✅ L’architecture Lakehouse avec Delta Lake
✅ Le pattern MERGE INTO pour les upserts
✅ La modélisation analytique avec dbt
✅ La validation de qualité avec Great Expectations
✅ L’orchestration avec Airflow
🚀 Et Maintenant ?
Ce projet constitue une base solide pour ton portfolio. Tu peux :
Le déployer sur le cloud (AWS, GCP, Azure)
Ajouter du monitoring (Prometheus + Grafana)
Intégrer du ML (prédiction de churn, recommandations)
Le présenter en entretien comme preuve de tes compétences
Bonne chance pour la suite de ton parcours Data Engineering ! 🎓
Retour au sommet