🎼 Advanced Orchestration pour Data Engineers
Bienvenue dans ce module avancé où tu vas maîtriser l’orchestration de pipelines data à l’échelle. Tu apprendras à déployer Airflow sur Kubernetes, à exploiter la TaskFlow API, à comparer les orchestrateurs modernes, et à mettre en place le data lineage avec OpenLineage — des compétences essentielles pour un Data Engineer Senior !
Prérequis
| Niveau | Compétence |
|---|---|
| ✅ Requis | Avoir suivi le module 12_orchestration_pipelines (Airflow basics) |
| ✅ Requis | Avoir suivi les modules 15_kubernetes_fundamentals et 27_kubernetes_deep_dive |
| ✅ Requis | Maîtriser les DAGs, Operators, XCom dans Airflow |
| ✅ Requis | Connaissances solides en Python et Docker |
| 💡 Recommandé | Un cluster K8s accessible (Minikube, kind, ou cloud) |
🎯 Objectifs du module
À la fin de ce module, tu seras capable de :
- Déployer et configurer Airflow sur Kubernetes
- Utiliser le KubernetesExecutor et le KubernetesPodOperator
- Écrire des DAGs modernes avec la TaskFlow API
- Implémenter le Dynamic Task Mapping
- Comparer et choisir entre Airflow, Dagster et Prefect
- Mettre en place le data lineage avec OpenLineage
- Utiliser Astronomer (Astro CLI et Astro SDK) pour industrialiser
Rappel : Ce qu’on a vu vs Ce qu’on va approfondir
| Module M12 (Beginner) | Ce module M28 (Advanced) |
|---|---|
| Architecture Airflow basique | Airflow sur Kubernetes |
| DAGs, Operators simples | TaskFlow API, Dynamic Mapping |
| XCom manuel | XCom automatique avec TaskFlow |
| Mention des alternatives | Comparatif détaillé + exemples de code |
| — | OpenLineage (data lineage) |
| — | Astronomer (plateforme enterprise) |
Schéma : De l’orchestration basique à l’orchestration avancée
M12 Orchestration Basics M28 Advanced Orchestration (ce module)
┌─────────────────────────┐ ┌─────────────────────────────────────┐
│ • Cron / Task Scheduler │ │ • Airflow sur Kubernetes │
│ • DAGs basics │ │ • KubernetesExecutor │
│ • Operators simples │ ──────▶ │ • TaskFlow API │
│ • XCom manuel │ │ • Dynamic Task Mapping │
│ • SequentialExecutor │ │ • Dagster, Prefect (comparatif) │
└─────────────────────────┘ │ • OpenLineage (lineage) │
│ • Astronomer (enterprise) │
└─────────────────────────────────────┘
💡 Ce module est orienté “production à l’échelle” — tu vas apprendre à faire tourner des centaines de DAGs avec des milliers de tâches sur Kubernetes.
ℹ️ Le savais-tu ?
Apache Airflow a été créé par Airbnb en 2014 pour orchestrer leurs pipelines data. Il a été donné à la Apache Foundation en 2016.
Aujourd’hui, Airflow est utilisé par des milliers d’entreprises dont Uber, Lyft, Twitter, Slack, Adobe, et bien d’autres.
Astronomer, fondé en 2018, est devenu le leader des solutions Airflow managées, levant plus de 200 millions de dollars et étant le principal contributeur au projet open-source Airflow.
1. Airflow sur Kubernetes
Déployer Airflow sur Kubernetes permet de bénéficier de l’élasticité, de l’isolation et de la scalabilité native de K8s.
Pourquoi Airflow sur K8s ?
| Aspect | Sans K8s (Celery/Local) | Avec K8s |
|---|---|---|
| Scaling | Workers fixes | Pods à la demande |
| Isolation | Dépendances partagées | Chaque tâche dans son pod |
| Ressources | Allocation statique | Requests/Limits par tâche |
| Coût | Serveurs 24/7 | Pay-per-use (pods éphémères) |
| Maintenance | Gérer les workers | K8s gère tout |
Architecture Airflow sur Kubernetes
┌─────────────────────────────────────────────────────────────────────────────┐
│ AIRFLOW ON KUBERNETES │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Namespace: airflow │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Scheduler │ │ Webserver │ │ Triggerer (Airflow 2) │ │ │
│ │ │ (Pod) │ │ (Pod) │ │ (Pod) │ │ │
│ │ └──────┬──────┘ └─────────────┘ └─────────────────────────┘ │ │
│ │ │ │ │
│ │ │ Crée des pods pour chaque tâche │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Worker Pods (éphémères) │ │ │
│ │ │ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │ │ │
│ │ │ │Task 1 │ │Task 2 │ │Task 3 │ │Task 4 │ │Task 5 │ │ │ │
│ │ │ │ Pod │ │ Pod │ │ Pod │ │ Pod │ │ Pod │ │ │ │
│ │ │ └───────┘ └───────┘ └───────┘ └───────┘ └───────┘ │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────────────────────────┐ │ │
│ │ │ PostgreSQL │ │ DAGs (PVC/Git-Sync) │ │ │
│ │ │ (Metadata DB) │ │ │ │ │
│ │ └─────────────────┘ └─────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Les Executors Airflow
| Executor | Description | Quand l’utiliser |
|---|---|---|
| SequentialExecutor | 1 tâche à la fois | Dev/test uniquement |
| LocalExecutor | Parallèle sur 1 machine | Petite production |
| CeleryExecutor | Workers Celery distribués | Production classique |
| KubernetesExecutor | 1 pod K8s par tâche | Production K8s |
| CeleryKubernetesExecutor | Hybrid Celery + K8s | Workloads mixtes |
Installation avec le Helm Chart Officiel
# Ajouter le repo Helm officiel Airflow
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Créer le namespace
kubectl create namespace airflow
# Installer Airflow avec KubernetesExecutor
helm install airflow apache-airflow/airflow \
--namespace airflow \
--set executor=KubernetesExecutor \
--set webserver.defaultUser.password=admin \
--set dags.persistence.enabled=true \
--set dags.gitSync.enabled=true \
--set dags.gitSync.repo=https://github.com/myorg/airflow-dags.git \
--set dags.gitSync.branch=main \
--set dags.gitSync.subPath=dags
# Vérifier l'installation
kubectl get pods -n airflow
# Accéder au webserver
kubectl port-forward svc/airflow-webserver -n airflow 8080:8080
# http://localhost:8080 (admin / admin)Configuration values.yaml avancée
# values-production.yaml
executor: KubernetesExecutor
# Webserver
webserver:
replicas: 2
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
# Scheduler
scheduler:
replicas: 2 # HA Scheduler (Airflow 2.0+)
resources:
requests:
cpu: "500m"
memory: "1Gi"
# Configuration KubernetesExecutor
config:
kubernetes:
# Namespace pour les worker pods
namespace: airflow
# Supprimer les pods après exécution
delete_worker_pods: "True"
delete_worker_pods_on_failure: "False" # Garder pour debug
# Image par défaut pour les workers
worker_container_repository: apache/airflow
worker_container_tag: 2.8.0-python3.11
# Git-Sync pour les DAGs
dags:
persistence:
enabled: false
gitSync:
enabled: true
repo: git@github.com:myorg/airflow-dags.git
branch: main
subPath: dags
sshKeySecret: airflow-git-ssh-key
wait: 60 # Sync toutes les 60 secondes
# Logs dans un stockage externe
logs:
persistence:
enabled: true
size: 10Gi
# PostgreSQL (ou utiliser un service externe)
postgresql:
enabled: true
auth:
postgresPassword: airflow
username: airflow
password: airflow
database: airflowKubernetesExecutor : Comment ça marche
1. DAG est schedulé
│
▼
2. Scheduler parse le DAG et identifie les tâches à exécuter
│
▼
3. Pour chaque tâche, le Scheduler crée un Pod K8s
│
┌───┴───────────────────────────────────────┐
│ Pod Spec généré automatiquement : │
│ - Image: airflow (ou custom) │
│ - Command: airflow tasks run ... │
│ - Env: connexions, variables │
│ - Resources: requests/limits │
└───────────────────────────────────────────┘
│
▼
4. K8s schedule le pod sur un node
│
▼
5. La tâche s'exécute
│
▼
6. Pod terminé → supprimé (si delete_worker_pods=True)
KubernetesPodOperator
Pour exécuter des tâches avec des images Docker personnalisées :
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
with DAG(
dag_id="etl_with_custom_image",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
) as dag:
# Tâche avec image Spark
spark_job = KubernetesPodOperator(
task_id="run_spark_etl",
name="spark-etl-job",
namespace="airflow",
image="my-registry/spark-etl:1.0",
cmds=["spark-submit"],
arguments=[
"--master", "k8s://https://kubernetes.default.svc",
"/app/etl_job.py"
],
# Ressources
container_resources={
"requests": {"cpu": "1", "memory": "2Gi"},
"limits": {"cpu": "2", "memory": "4Gi"},
},
# Variables d'environnement
env_vars={
"SOURCE_PATH": "s3://bucket/raw/",
"DEST_PATH": "s3://bucket/processed/",
},
# Secrets
secrets=[
{"secret": "aws-credentials", "key": "AWS_ACCESS_KEY_ID", "env": "AWS_ACCESS_KEY_ID"},
{"secret": "aws-credentials", "key": "AWS_SECRET_ACCESS_KEY", "env": "AWS_SECRET_ACCESS_KEY"},
],
# Configuration K8s
is_delete_operator_pod=True,
get_logs=True,
startup_timeout_seconds=300,
# Affinity/Tolerations
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "node-type",
"operator": "In",
"values": ["compute"]
}]
}]
}
}
},
)
# Tâche avec image Python custom
python_job = KubernetesPodOperator(
task_id="run_python_transform",
name="python-transform",
namespace="airflow",
image="my-registry/python-etl:2.0",
cmds=["python", "/app/transform.py"],
container_resources={
"requests": {"cpu": "500m", "memory": "1Gi"},
"limits": {"cpu": "1", "memory": "2Gi"},
},
is_delete_operator_pod=True,
get_logs=True,
)
spark_job >> python_jobpod_template_file : Personnalisation avancée
Pour des configurations complexes, utiliser un template YAML :
# pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
name: airflow-worker
labels:
app: airflow-worker
spec:
serviceAccountName: airflow-worker
containers:
- name: base
image: apache/airflow:2.8.0-python3.11
imagePullPolicy: IfNotPresent
env:
- name: AIRFLOW__CORE__EXECUTOR
value: "LocalExecutor"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
volumeMounts:
- name: dags
mountPath: /opt/airflow/dags
readOnly: true
volumes:
- name: dags
persistentVolumeClaim:
claimName: airflow-dags
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000# Dans le DAG, référencer le template
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
task = KubernetesPodOperator(
task_id="task_with_template",
name="custom-task",
namespace="airflow",
pod_template_file="/opt/airflow/pod_templates/pod_template.yaml",
# Override l'image du template
image="my-custom-image:1.0",
)2. TaskFlow API
La TaskFlow API (introduite dans Airflow 2.0) permet d’écrire des DAGs de manière Pythonic avec des décorateurs
@daget@task, en gérant automatiquement les XComs.
Pourquoi TaskFlow ?
| Approche Traditionnelle | TaskFlow API |
|---|---|
PythonOperator(python_callable=fn) |
@task sur la fonction |
xcom_push / xcom_pull manuels |
Passage de données automatique |
| Verbeux | Concis et lisible |
Dépendances explicites >> |
Dépendances implicites par appel |
Exemple comparatif
Avant (approche traditionnelle) :
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract(**context):
data = {"users": 100, "orders": 500}
context['ti'].xcom_push(key='extracted_data', value=data)
return data
def transform(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='extract_task', key='extracted_data')
transformed = {"total": data['users'] + data['orders']}
ti.xcom_push(key='transformed_data', value=transformed)
return transformed
def load(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='transform_task', key='transformed_data')
print(f"Loading: {data}")
with DAG(
dag_id='traditional_etl',
start_date=datetime(2024, 1, 1),
schedule='@daily',
) as dag:
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract,
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform,
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load,
)
extract_task >> transform_task >> load_taskAprès (TaskFlow API) :
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='taskflow_etl',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def taskflow_etl():
@task
def extract() -> dict:
return {"users": 100, "orders": 500}
@task
def transform(data: dict) -> dict:
return {"total": data['users'] + data['orders']}
@task
def load(data: dict):
print(f"Loading: {data}")
# Dépendances implicites par appel de fonction !
raw_data = extract()
transformed_data = transform(raw_data)
load(transformed_data)
# Instancier le DAG
taskflow_etl()Avantages TaskFlow
| Avantage | Description |
|---|---|
| Code Pythonic | Ressemble à du Python standard |
| XCom automatique | Les retours de fonction sont automatiquement passés |
| Type hints | Support des annotations de type |
| Moins de boilerplate | Pas de PythonOperator explicite |
| Dépendances claires | Le flux de données définit les dépendances |
TaskFlow avec plusieurs outputs
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='taskflow_multiple_outputs',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def etl_pipeline():
@task(multiple_outputs=True)
def extract() -> dict:
"""Retourne plusieurs valeurs nommées"""
return {
"users": [{"id": 1, "name": "Alice"}],
"orders": [{"id": 101, "amount": 99.99}],
"metadata": {"source": "api", "timestamp": "2024-01-01"}
}
@task
def process_users(users: list) -> list:
return [u['name'].upper() for u in users]
@task
def process_orders(orders: list) -> float:
return sum(o['amount'] for o in orders)
@task
def combine(users: list, total: float, metadata: dict):
print(f"Users: {users}")
print(f"Total orders: {total}")
print(f"Source: {metadata['source']}")
# Extraire les données
data = extract()
# Accéder aux outputs individuels
processed_users = process_users(data['users'])
orders_total = process_orders(data['orders'])
# Combiner
combine(processed_users, orders_total, data['metadata'])
etl_pipeline()Mixing TaskFlow avec des Operators classiques
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
@dag(
dag_id='mixed_taskflow',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def mixed_pipeline():
# TaskFlow task
@task
def prepare_config() -> dict:
return {"batch_size": 1000, "date": "2024-01-01"}
# Operator classique
run_spark = KubernetesPodOperator(
task_id="run_spark",
name="spark-job",
namespace="airflow",
image="apache/spark:3.5.0",
cmds=["spark-submit", "/app/job.py"],
)
# TaskFlow task qui dépend d'un Operator
@task
def validate_output():
print("Validating Spark output...")
return True
# Bash operator
notify = BashOperator(
task_id="notify",
bash_command="echo 'Pipeline completed!'",
)
# Définir les dépendances
config = prepare_config()
config >> run_spark >> validate_output() >> notify
mixed_pipeline()3. Dynamic Task Mapping
Le Dynamic Task Mapping (Airflow 2.3+) permet de créer un nombre variable de tâches à runtime, basé sur les données.
Pourquoi Dynamic Mapping ?
| Problème | Solution |
|---|---|
| Nombre de fichiers inconnu à l’avance | .expand() sur la liste de fichiers |
| Traiter N partitions dynamiquement | Map sur les partitions |
| Paralléliser sur une liste variable | Créer N tâches automatiquement |
Schéma Dynamic Mapping
┌─────────────────┐
│ list_files() │
│ [f1, f2, f3, f4]│
└────────┬────────┘
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ process(f1) │ │ process(f2) │ │ process(f3) │ ...
│ [mapped] │ │ [mapped] │ │ [mapped] │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└─────────────────┼─────────────────┘
│
▼
┌─────────────────┐
│ aggregate() │
│ Combine results │
└─────────────────┘
Exemple : Traiter des fichiers dynamiquement
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='dynamic_file_processing',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def process_files_dynamically():
@task
def list_files() -> list[str]:
"""Liste les fichiers à traiter (nombre variable)"""
# En réalité : lister depuis S3, GCS, etc.
return [
"s3://bucket/data/file1.parquet",
"s3://bucket/data/file2.parquet",
"s3://bucket/data/file3.parquet",
"s3://bucket/data/file4.parquet",
]
@task
def process_file(file_path: str) -> dict:
"""Traite UN fichier — sera mappé dynamiquement"""
print(f"Processing: {file_path}")
# Simuler le traitement
row_count = len(file_path) * 100 # Fake
return {"file": file_path, "rows": row_count}
@task
def aggregate_results(results: list[dict]) -> dict:
"""Agrège tous les résultats"""
total_rows = sum(r['rows'] for r in results)
return {
"files_processed": len(results),
"total_rows": total_rows,
}
# Récupérer la liste de fichiers
files = list_files()
# 🎯 DYNAMIC MAPPING : .expand() crée N tâches
processed = process_file.expand(file_path=files)
# Agréger les résultats
aggregate_results(processed)
process_files_dynamically()expand() avec plusieurs paramètres
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='dynamic_multi_param',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def multi_param_mapping():
@task
def get_partitions() -> list[dict]:
return [
{"date": "2024-01-01", "region": "EU"},
{"date": "2024-01-01", "region": "US"},
{"date": "2024-01-02", "region": "EU"},
{"date": "2024-01-02", "region": "US"},
]
@task
def process_partition(date: str, region: str) -> dict:
print(f"Processing {region} for {date}")
return {"date": date, "region": region, "status": "done"}
partitions = get_partitions()
# expand_kwargs pour mapper plusieurs paramètres
process_partition.expand_kwargs(partitions)
multi_param_mapping()Limiter le parallélisme
@task(max_active_tis_per_dag=5) # Max 5 instances en parallèle
def process_file(file_path: str) -> dict:
# ...
passMapping sur un Operator (non-TaskFlow)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id='dynamic_bash',
start_date=datetime(2024, 1, 1),
schedule='@daily',
) as dag:
# Liste statique (ou XCom d'une tâche précédente)
files = ["file1.csv", "file2.csv", "file3.csv"]
process = BashOperator.partial(
task_id="process_files",
).expand(
bash_command=[f"python process.py {f}" for f in files]
)4. Airflow vs Dagster vs Prefect
Il existe plusieurs orchestrateurs modernes. Voici une comparaison approfondie.
Vue d’ensemble
| Critère | Airflow | Dagster | Prefect |
|---|---|---|---|
| Créé par | Airbnb (2014) | Elementl (2018) | Prefect (2018) |
| Philosophie | DAG-centric | Asset-centric | Flow-centric |
| Maturité | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| Communauté | Très large | En croissance | En croissance |
| Adoption | Standard industrie | Startups, ML | Startups, Data |
| Learning curve | Moyenne | Plus raide | Plus douce |
Comparaison détaillée
| Aspect | Airflow | Dagster | Prefect |
|---|---|---|---|
| Définition | DAGs Python | Assets + Ops | Flows + Tasks |
| Scheduling | Cron-like | Cron + Sensors | Cron + Events |
| Data Lineage | Via OpenLineage | Natif (Assets) | Via intégrations |
| Testing | Difficile | Excellent | Bon |
| Type checking | Non natif | Natif (I/O types) | Pydantic |
| UI | Fonctionnelle | Moderne | Moderne |
| Local dev | Complexe | Excellent | Excellent |
| Cloud offering | MWAA, Composer, Astronomer | Dagster Cloud | Prefect Cloud |
Exemples de code comparés
Même pipeline dans les 3 outils :
Airflow (TaskFlow)
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='etl_pipeline',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def etl_pipeline():
@task
def extract() -> dict:
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(raw: dict) -> dict:
return {"data": [x * 2 for x in raw['data']]}
@task
def load(transformed: dict):
print(f"Loading: {transformed}")
raw = extract()
transformed = transform(raw)
load(transformed)
etl_pipeline()Dagster (Asset-based)
from dagster import asset, Definitions, define_asset_job
@asset
def raw_data() -> dict:
"""Asset: données brutes"""
return {"data": [1, 2, 3, 4, 5]}
@asset
def transformed_data(raw_data: dict) -> dict:
"""Asset: données transformées (dépend de raw_data)"""
return {"data": [x * 2 for x in raw_data['data']]}
@asset
def loaded_data(transformed_data: dict) -> None:
"""Asset: données chargées"""
print(f"Loading: {transformed_data}")
# Job pour exécuter tous les assets
etl_job = define_asset_job("etl_job", selection="*")
# Définitions Dagster
defs = Definitions(
assets=[raw_data, transformed_data, loaded_data],
jobs=[etl_job],
)Dagster : Concepts clés
| Concept | Description |
|---|---|
| Asset | Objet de données persistant (table, fichier) |
| Op | Unité de calcul (comme un Operator) |
| Graph | Composition d’Ops |
| Job | Graph exécutable avec config |
| Resource | Connexion externe (DB, S3) |
| I/O Manager | Gère la persistance des assets |
Prefect (Flow-based)
from prefect import flow, task
@task
def extract() -> dict:
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(raw: dict) -> dict:
return {"data": [x * 2 for x in raw['data']]}
@task
def load(transformed: dict):
print(f"Loading: {transformed}")
@flow(name="ETL Pipeline")
def etl_pipeline():
raw = extract()
transformed = transform(raw)
load(transformed)
# Exécuter
if __name__ == "__main__":
etl_pipeline()Prefect : Concepts clés
| Concept | Description |
|---|---|
| Flow | Pipeline de tâches |
| Task | Unité de travail |
| Deployment | Flow déployé et schedulable |
| Work Pool | Groupe de workers |
| Block | Credentials et configs réutilisables |
Quand utiliser quoi ?
| Scénario | Recommandation | Pourquoi |
|---|---|---|
| Grande entreprise, équipe mature | Airflow | Standard, très documenté, écosystème large |
| Pipelines ML avec assets | Dagster | Asset-centric, excellent testing, types |
| Startup, itération rapide | Prefect | Simple, moderne, local dev facile |
| Déjà sur Airflow | Rester sur Airflow | Migration coûteuse, TaskFlow moderne |
| Nouveau projet, équipe data | Dagster ou Prefect | Approches modernes |
| K8s natif requis | Airflow ou Dagster | KubernetesExecutor mature |
Tableau de décision
Complexité du pipeline
Faible ◄─────────────────► Élevée
│ │
Taille équipe │ │
│ │ │
Petite ───────────┼──► Prefect ──────────►│ Dagster
│ │ │
│ │ │
Grande ────────────┼──► Prefect ──────────►│ Airflow
│ │ ou Dagster │
▼ │ │
5. OpenLineage — Data Lineage
OpenLineage est un standard ouvert pour collecter et partager les métadonnées de lineage des pipelines data. Il répond à la question : “D’où viennent mes données et où vont-elles ?”
Pourquoi le Data Lineage ?
| Question | Lineage répond |
|---|---|
| D’où viennent les données de ce dashboard ? | ✅ Traçabilité amont |
| Si cette table change, quoi d’autre est impacté ? | ✅ Impact analysis |
| Ce job a échoué, quelles données sont corrompues ? | ✅ Root cause analysis |
| Sommes-nous conformes RGPD ? | ✅ Audit et gouvernance |
Schéma OpenLineage
┌─────────────────────────────────────────────────────────────────────────────┐
│ OPENLINEAGE ECOSYSTEM │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Airflow │ │ Spark │ │ dbt │ │ Flink │ │
│ │ + OL Plugin│ │ + OL Lib │ │ + OL Plugin│ │ + OL Lib │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ │ OpenLineage Events (JSON) │ │ │
│ └─────────────────┴─────────────────┴─────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────┐ │
│ │ OpenLineage Backend │ │
│ │ (Marquez, Atlan, DataHub...) │ │
│ └───────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────┐ │
│ │ Lineage Visualization │ │
│ │ Impact Analysis │ │
│ │ Data Catalog │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Concepts OpenLineage
| Concept | Description | Exemple |
|---|---|---|
| Job | Pipeline ou transformation | DAG Airflow, job Spark |
| Run | Exécution d’un job | DAG run avec ID unique |
| Dataset | Source ou destination de données | Table SQL, fichier S3 |
| Facet | Métadonnées additionnelles | Schema, stats, owner |
Activer OpenLineage dans Airflow
# Installer le provider
pip install apache-airflow-providers-openlineage# airflow.cfg ou variables d'environnement
[openlineage]
transport = '{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
namespace = "my_airflow_instance"Ou via environnement :
export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
export AIRFLOW__OPENLINEAGE__NAMESPACE="my_airflow_instance"Marquez : Backend OpenLineage
Marquez est le backend de référence pour OpenLineage (open-source par WeWork/Linux Foundation).
# Déployer Marquez avec Docker Compose
git clone https://github.com/MarquezProject/marquez.git
cd marquez
docker-compose up -d
# UI disponible sur http://localhost:3000
# API sur http://localhost:5000DAG avec lineage explicite
from airflow.decorators import dag, task
from airflow.lineage.entities import Table, File
from datetime import datetime
@dag(
dag_id='etl_with_lineage',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def etl_with_lineage():
@task(
inlets=[File(url="s3://bucket/raw/")],
outlets=[Table(cluster="warehouse", database="analytics", name="staging_orders")],
)
def extract_and_stage():
"""Extract depuis S3 et charge dans staging"""
print("Extracting from S3 to staging...")
@task(
inlets=[Table(cluster="warehouse", database="analytics", name="staging_orders")],
outlets=[Table(cluster="warehouse", database="analytics", name="fact_orders")],
)
def transform_to_fact():
"""Transforme staging en fact table"""
print("Transforming to fact table...")
extract_and_stage() >> transform_to_fact()
etl_with_lineage()Lineage automatique avec certains Operators
Certains operators extraient automatiquement le lineage :
| Operator | Lineage auto |
|---|---|
SnowflakeOperator |
✅ (parse SQL) |
BigQueryOperator |
✅ |
PostgresOperator |
✅ |
SparkSubmitOperator |
✅ (avec Spark OL) |
PythonOperator |
❌ (manuel) |
Alternatives à Marquez
| Outil | Type | Description |
|---|---|---|
| Marquez | Open-source | Backend de référence |
| DataHub | Open-source | Catalogue + lineage (LinkedIn) |
| Atlan | SaaS | Plateforme data catalog complète |
| Collibra | Enterprise | Gouvernance et lineage |
| Alation | Enterprise | Data catalog |
6. Astronomer — Airflow Enterprise
Astronomer est la plateforme enterprise pour Apache Airflow. Elle fournit des outils, un hosting managé, et des SDKs pour industrialiser Airflow.
Pourquoi Astronomer ?
| Problème | Solution Astronomer |
|---|---|
| Installer/maintenir Airflow est complexe | Astro Cloud (fully managed) |
| Dev local difficile | Astro CLI (environment identique) |
| Écrire du code Airflow verbeux | Astro SDK (API simplifiée) |
| Pas de support | Support enterprise 24/7 |
Écosystème Astronomer
┌─────────────────────────────────────────────────────────────────────────────┐
│ ASTRONOMER ECOSYSTEM │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Astro Cloud │ │
│ │ • Airflow fully managed │ │
│ │ • Auto-scaling │ │
│ │ • Observability intégrée │ │
│ │ • CI/CD intégré │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │ Astro CLI │ │ Astro SDK │ │
│ │ • Dev local │ │ • API Pythonic │ │
│ │ • Deploy en 1 cmd │ │ • SQL/Python tasks │ │
│ │ • Tests intégrés │ │ • Data quality │ │
│ └───────────────────────┘ └───────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Astronomer Registry │ │
│ │ • Providers certifiés │ │
│ │ • DAGs de référence │ │
│ │ • Documentation enrichie │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Astro CLI
L’outil en ligne de commande pour développer et déployer Airflow.
# Installation
# macOS
brew install astro
# Linux
curl -sSL install.astronomer.io | sudo bash -s
# Windows (WSL)
curl -sSL install.astronomer.io | sudo bash -s# Créer un nouveau projet Airflow
mkdir my-airflow-project && cd my-airflow-project
astro dev init
# Structure générée :
# .
# ├── dags/ # Tes DAGs
# │ └── example_dag.py
# ├── include/ # Fichiers additionnels
# ├── plugins/ # Plugins custom
# ├── tests/ # Tests
# ├── Dockerfile # Image Airflow custom
# ├── packages.txt # Packages système
# ├── requirements.txt # Dépendances Python
# └── airflow_settings.yaml # Variables, connexions
# Démarrer l'environnement local
astro dev start
# UI sur http://localhost:8080 (admin/admin)
# Voir les logs
astro dev logs
# Exécuter les tests
astro dev pytest
# Parser les DAGs (vérifier les erreurs)
astro dev parse
# Arrêter
astro dev stop
# Déployer sur Astro Cloud
astro deployAstro SDK
Une API Python simplifiée pour écrire des DAGs, particulièrement pour les opérations SQL/dataframe.
pip install astro-sdk-pythonExemple : ETL SQL simplifié
from datetime import datetime
from airflow.decorators import dag
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table
@dag(
dag_id='astro_sdk_etl',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
)
def astro_sdk_etl():
# Charger un CSV dans une table
raw_orders = aql.load_file(
input_file=File("s3://bucket/orders.csv"),
output_table=Table(
name="raw_orders",
conn_id="postgres_conn",
),
)
# Transformation SQL (inline)
@aql.transform
def transform_orders(input_table: Table):
return """
SELECT
order_id,
customer_id,
order_date,
amount,
amount * 1.2 as amount_with_tax
FROM {{ input_table }}
WHERE amount > 0
"""
# Appliquer la transformation
transformed = transform_orders(
input_table=raw_orders,
output_table=Table(
name="transformed_orders",
conn_id="postgres_conn",
),
)
# Exporter vers S3
aql.export_to_file(
input_data=transformed,
output_file=File("s3://bucket/processed/orders.parquet"),
if_exists="replace",
)
# Nettoyer les tables temporaires
aql.cleanup()
astro_sdk_etl()Fonctionnalités Astro SDK
| Fonction | Description |
|---|---|
aql.load_file() |
Charger CSV/Parquet/JSON dans une table |
aql.transform() |
SQL transformation avec Jinja |
aql.run_raw_sql() |
Exécuter du SQL brut |
aql.export_to_file() |
Exporter une table vers fichier |
aql.merge() |
Merge/Upsert entre tables |
aql.append() |
Append data to table |
aql.dataframe() |
Transformer avec Pandas |
aql.cleanup() |
Supprimer les tables temporaires |
Astro SDK : Transformation DataFrame
from astro import sql as aql
from astro.sql.table import Table
import pandas as pd
@aql.dataframe
def process_with_pandas(df: pd.DataFrame) -> pd.DataFrame:
"""Transformation avec Pandas"""
df['total'] = df['quantity'] * df['price']
df['processed_at'] = pd.Timestamp.now()
return df
# Dans le DAG :
processed = process_with_pandas(
df=some_table,
output_table=Table(name="processed", conn_id="postgres"),
)Astro Cloud vs Self-Hosted
| Aspect | Astro Cloud | Self-Hosted (Helm) |
|---|---|---|
| Setup | Minutes | Heures/jours |
| Maintenance | Astronomer | Ton équipe |
| Scaling | Automatique | Manuel |
| Updates | Automatiques | Manuels |
| Coût | Par usage | Infrastructure |
| Contrôle | Moyen | Total |
| Sécurité | SOC 2, HIPAA | À implémenter |
7. Exercices Pratiques
Exercice 1 : DAG TaskFlow avec Dynamic Mapping
Objectif : Créer un DAG qui traite dynamiquement une liste de fichiers.
Instructions : 1. Créer une tâche list_files() qui retourne une liste de chemins 2. Créer une tâche process_file(path) mappée dynamiquement 3. Créer une tâche report(results) qui agrège les résultats 4. Limiter à 3 tâches parallèles maximum
💡 Indice
Utilise @task(max_active_tis_per_dag=3) et .expand()
Exercice 2 : KubernetesPodOperator avec Spark
Objectif : Créer un DAG qui exécute un job Spark via KubernetesPodOperator.
Instructions : 1. Utiliser l’image apache/spark:3.5.0 2. Configurer les ressources : 2 CPU, 4Gi RAM 3. Passer des variables d’environnement pour la config 4. Utiliser une node affinity pour cibler les nodes compute
Exercice 3 : Comparatif Airflow/Dagster/Prefect
Objectif : Implémenter le même pipeline simple dans les 3 outils.
Pipeline : 1. Lire un fichier JSON 2. Filtrer les enregistrements (amount > 100) 3. Calculer une somme 4. Écrire le résultat
Comparer : - Nombre de lignes de code - Facilité de test local - Lisibilité
Exercice 4 : OpenLineage avec Marquez
Objectif : Configurer OpenLineage et visualiser le lineage.
Instructions : 1. Déployer Marquez en local (Docker Compose) 2. Configurer Airflow pour envoyer les events à Marquez 3. Créer un DAG avec inlets et outlets explicites 4. Visualiser le lineage dans l’UI Marquez
Exercice 5 : Astro CLI Project
Objectif : Créer un projet Airflow complet avec Astro CLI.
Instructions : 1. Initialiser un projet avec astro dev init 2. Créer un DAG utilisant Astro SDK 3. Ajouter des tests dans /tests 4. Valider avec astro dev pytest
8. Mini-Projet : Pipeline Data Production-Ready
Objectif
Créer un pipeline ETL production-ready déployé sur Kubernetes avec monitoring et lineage.
Architecture cible
┌─────────────────────────────────────────────────────────────────────────────┐
│ MINI-PROJET M28 │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Airflow sur Kubernetes │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ DAG: data_pipeline_advanced │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ │
│ │ │ │ Extract │───▶│ Transform │───▶│ Load │ │ │ │
│ │ │ │ (K8sPod)│ │ (TaskFlow) │ │ (Astro SDK) │ │ │ │
│ │ │ └─────────┘ └──────┬──────┘ └─────────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ Dynamic Mapping (N fichiers) │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Scheduler │ │ Webserver │ │ Worker Pods (K8s) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ OpenLineage Events │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Marquez │ │
│ │ (Data Lineage Visualization) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Instructions
Étape 1 : Setup Astro Project
mkdir advanced-pipeline && cd advanced-pipeline
astro dev initÉtape 2 : Créer le DAG principal
Le DAG doit : - Lister des fichiers depuis S3 (simulé) - Traiter chaque fichier avec Dynamic Mapping - Utiliser TaskFlow API - Avoir des inlets/outlets pour le lineage
Étape 3 : Configurer OpenLineage
- Ajouter la config dans
airflow_settings.yaml - Vérifier que les events arrivent dans Marquez
Étape 4 : Tests
- Écrire des tests unitaires pour les tâches
- Valider avec
astro dev pytest
Étape 5 : Déployer (optionnel)
- Déployer sur un cluster K8s avec le Helm chart
- Ou utiliser Astro Cloud
✅ Solution du mini-projet
📥 Afficher la solution complète
1. Structure du projet Astro
advanced-pipeline/
├── dags/
│ └── data_pipeline_advanced.py
├── include/
│ └── sql/
│ └── transform.sql
├── tests/
│ └── test_pipeline.py
├── Dockerfile
├── requirements.txt
└── airflow_settings.yaml
2. requirements.txt
astro-sdk-python>=1.5.0
apache-airflow-providers-openlineage>=1.0.0
apache-airflow-providers-cncf-kubernetes>=7.0.0
pandas>=2.0.0
3. airflow_settings.yaml
airflow:
connections:
- conn_id: postgres_conn
conn_type: postgres
host: postgres
login: airflow
password: airflow
schema: airflow
port: 5432
variables:
- variable_name: data_bucket
variable_value: s3://my-data-bucket
# OpenLineage config
openlineage:
transport: '{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
namespace: advanced-pipeline4. dags/data_pipeline_advanced.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.lineage.entities import File, Table
from astro import sql as aql
from astro.sql.table import Table as AstroTable
import pandas as pd
@dag(
dag_id='data_pipeline_advanced',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
tags=['production', 'etl'],
doc_md="""## Pipeline Data Avancé
Ce pipeline:
1. Liste les fichiers à traiter
2. Extrait chaque fichier (Dynamic Mapping)
3. Transforme avec Astro SDK
4. Charge dans PostgreSQL
""",
)
def data_pipeline_advanced():
# --- ÉTAPE 1 : Lister les fichiers ---
@task
def list_files() -> list[str]:
"""Liste les fichiers à traiter (simulé)"""
# En production: lister depuis S3
return [
"orders_2024_01.parquet",
"orders_2024_02.parquet",
"orders_2024_03.parquet",
]
# --- ÉTAPE 2 : Extraire chaque fichier ---
@task(
max_active_tis_per_dag=3,
inlets=[File(url="s3://bucket/raw/")],
)
def extract_file(file_name: str) -> dict:
"""Extrait un fichier (simulé)"""
print(f"Extracting: {file_name}")
# Simuler des données
return {
"file": file_name,
"rows": 1000,
"data": [
{"order_id": 1, "amount": 100},
{"order_id": 2, "amount": 200},
]
}
# --- ÉTAPE 3 : Transformer avec Pandas ---
@task(
outlets=[Table(cluster="warehouse", database="analytics", name="orders_processed")],
)
def transform_data(extracted_list: list[dict]) -> list[dict]:
"""Agrège et transforme les données"""
all_data = []
for extracted in extracted_list:
for record in extracted['data']:
record['source_file'] = extracted['file']
record['amount_with_tax'] = record['amount'] * 1.2
all_data.append(record)
print(f"Transformed {len(all_data)} records")
return all_data
# --- ÉTAPE 4 : Charger dans PostgreSQL ---
@aql.dataframe
def load_to_postgres(data: list[dict]) -> pd.DataFrame:
"""Charge dans PostgreSQL via Astro SDK"""
df = pd.DataFrame(data)
df['loaded_at'] = pd.Timestamp.now()
return df
# --- ÉTAPE 5 : Rapport final ---
@task
def generate_report(row_count: int):
"""Génère un rapport"""
print(f"Pipeline completed. Total rows: {row_count}")
# --- ORCHESTRATION ---
files = list_files()
# Dynamic mapping sur les fichiers
extracted = extract_file.expand(file_name=files)
# Transformer
transformed = transform_data(extracted)
# Charger
loaded = load_to_postgres(
data=transformed,
output_table=AstroTable(
name="orders_processed",
conn_id="postgres_conn",
),
)
# Rapport
generate_report(len(transformed))
# Cleanup Astro SDK
aql.cleanup()
# Instancier le DAG
data_pipeline_advanced()5. tests/test_pipeline.py
import pytest
from dags.data_pipeline_advanced import data_pipeline_advanced
def test_dag_loads():
"""Test que le DAG se charge sans erreur"""
dag = data_pipeline_advanced()
assert dag is not None
assert dag.dag_id == "data_pipeline_advanced"
def test_dag_has_expected_tasks():
"""Test que le DAG a les bonnes tâches"""
dag = data_pipeline_advanced()
task_ids = [t.task_id for t in dag.tasks]
assert "list_files" in task_ids
assert "transform_data" in task_ids
def test_extract_file():
"""Test unitaire de la fonction extract"""
# Import direct de la fonction
from dags.data_pipeline_advanced import extract_file
# Appeler la fonction wrapped
result = extract_file.function("test.parquet")
assert "file" in result
assert "rows" in result
assert result["file"] == "test.parquet"6. Commandes pour tester
# Démarrer l'environnement
astro dev start
# Parser les DAGs
astro dev parse
# Lancer les tests
astro dev pytest
# Voir l'UI
open http://localhost:8080
# Déclencher le DAG
astro dev run dags trigger data_pipeline_advanced7. docker-compose.override.yml (pour Marquez)
version: "3"
services:
marquez:
image: marquezproject/marquez:latest
ports:
- "5000:5000"
- "5001:5001"
environment:
- MARQUEZ_PORT=5000
- MARQUEZ_ADMIN_PORT=5001
marquez-web:
image: marquezproject/marquez-web:latest
ports:
- "3000:3000"
environment:
- MARQUEZ_HOST=marquez
- MARQUEZ_PORT=5000📚 Ressources pour aller plus loin
🌐 Documentation officielle
- Apache Airflow Docs — Documentation Airflow
- Airflow Helm Chart — Déploiement K8s
- Dagster Docs — Documentation Dagster
- Prefect Docs — Documentation Prefect
- OpenLineage — Standard de lineage
- Astronomer Docs — Documentation Astronomer
🎮 Pratique
- Astronomer Academy — Cours gratuits Airflow
- Astro CLI Quickstart — Démarrer avec Astro
- Marquez Demo — Essayer OpenLineage
📖 Livres & Articles
- Data Pipelines with Apache Airflow — Bas Harenslak & Julian de Ruiter
- Fundamentals of Data Engineering — Joe Reis & Matt Housley
- Astronomer Blog — Best practices Airflow
🔧 Outils
- Astronomer Registry — Providers et DAGs
- Marquez — Backend OpenLineage
- DataHub — Data catalog avec lineage
➡️ Prochaine étape
Maintenant que tu maîtrises l’orchestration avancée, passons au messaging distribué !
👉 Module suivant : 29_distributed_messaging — Kafka avancé, RabbitMQ, Pulsar, Debezium
Tu vas apprendre : - Kafka avancé (Quotas, Tiered Storage) - Alternatives : RabbitMQ, Apache Pulsar - Change Data Capture avec Debezium - Patterns de messaging distribué
🎉 Félicitations ! Tu as terminé le module Advanced Orchestration pour Data Engineers.