🎼 Advanced Orchestration pour Data Engineers

Bienvenue dans ce module avancé où tu vas maîtriser l’orchestration de pipelines data à l’échelle. Tu apprendras à déployer Airflow sur Kubernetes, à exploiter la TaskFlow API, à comparer les orchestrateurs modernes, et à mettre en place le data lineage avec OpenLineage — des compétences essentielles pour un Data Engineer Senior !


Prérequis

Niveau Compétence
✅ Requis Avoir suivi le module 12_orchestration_pipelines (Airflow basics)
✅ Requis Avoir suivi les modules 15_kubernetes_fundamentals et 27_kubernetes_deep_dive
✅ Requis Maîtriser les DAGs, Operators, XCom dans Airflow
✅ Requis Connaissances solides en Python et Docker
💡 Recommandé Un cluster K8s accessible (Minikube, kind, ou cloud)

🎯 Objectifs du module

À la fin de ce module, tu seras capable de :

  • Déployer et configurer Airflow sur Kubernetes
  • Utiliser le KubernetesExecutor et le KubernetesPodOperator
  • Écrire des DAGs modernes avec la TaskFlow API
  • Implémenter le Dynamic Task Mapping
  • Comparer et choisir entre Airflow, Dagster et Prefect
  • Mettre en place le data lineage avec OpenLineage
  • Utiliser Astronomer (Astro CLI et Astro SDK) pour industrialiser

Rappel : Ce qu’on a vu vs Ce qu’on va approfondir

Module M12 (Beginner) Ce module M28 (Advanced)
Architecture Airflow basique Airflow sur Kubernetes
DAGs, Operators simples TaskFlow API, Dynamic Mapping
XCom manuel XCom automatique avec TaskFlow
Mention des alternatives Comparatif détaillé + exemples de code
OpenLineage (data lineage)
Astronomer (plateforme enterprise)

Schéma : De l’orchestration basique à l’orchestration avancée

M12 Orchestration Basics              M28 Advanced Orchestration (ce module)
┌─────────────────────────┐           ┌─────────────────────────────────────┐
│ • Cron / Task Scheduler │           │ • Airflow sur Kubernetes            │
│ • DAGs basics           │           │ • KubernetesExecutor                │
│ • Operators simples     │  ──────▶  │ • TaskFlow API                      │
│ • XCom manuel           │           │ • Dynamic Task Mapping              │
│ • SequentialExecutor    │           │ • Dagster, Prefect (comparatif)     │
└─────────────────────────┘           │ • OpenLineage (lineage)             │
                                      │ • Astronomer (enterprise)           │
                                      └─────────────────────────────────────┘

💡 Ce module est orienté “production à l’échelle” — tu vas apprendre à faire tourner des centaines de DAGs avec des milliers de tâches sur Kubernetes.

ℹ️ Le savais-tu ?

Apache Airflow a été créé par Airbnb en 2014 pour orchestrer leurs pipelines data. Il a été donné à la Apache Foundation en 2016.

Aujourd’hui, Airflow est utilisé par des milliers d’entreprises dont Uber, Lyft, Twitter, Slack, Adobe, et bien d’autres.

Astronomer, fondé en 2018, est devenu le leader des solutions Airflow managées, levant plus de 200 millions de dollars et étant le principal contributeur au projet open-source Airflow.

📖 History of Apache Airflow


1. Airflow sur Kubernetes

Déployer Airflow sur Kubernetes permet de bénéficier de l’élasticité, de l’isolation et de la scalabilité native de K8s.

Pourquoi Airflow sur K8s ?

Aspect Sans K8s (Celery/Local) Avec K8s
Scaling Workers fixes Pods à la demande
Isolation Dépendances partagées Chaque tâche dans son pod
Ressources Allocation statique Requests/Limits par tâche
Coût Serveurs 24/7 Pay-per-use (pods éphémères)
Maintenance Gérer les workers K8s gère tout

Architecture Airflow sur Kubernetes

┌─────────────────────────────────────────────────────────────────────────────┐
│                        AIRFLOW ON KUBERNETES                                │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      Namespace: airflow                              │   │
│  │                                                                      │   │
│  │   ┌─────────────┐   ┌─────────────┐   ┌─────────────────────────┐   │   │
│  │   │  Scheduler  │   │  Webserver  │   │   Triggerer (Airflow 2) │   │   │
│  │   │   (Pod)     │   │   (Pod)     │   │       (Pod)             │   │   │
│  │   └──────┬──────┘   └─────────────┘   └─────────────────────────┘   │   │
│  │          │                                                           │   │
│  │          │ Crée des pods pour chaque tâche                          │   │
│  │          ▼                                                           │   │
│  │   ┌─────────────────────────────────────────────────────────────┐   │   │
│  │   │              Worker Pods (éphémères)                         │   │   │
│  │   │   ┌───────┐  ┌───────┐  ┌───────┐  ┌───────┐  ┌───────┐    │   │   │
│  │   │   │Task 1 │  │Task 2 │  │Task 3 │  │Task 4 │  │Task 5 │    │   │   │
│  │   │   │ Pod   │  │ Pod   │  │ Pod   │  │ Pod   │  │ Pod   │    │   │   │
│  │   │   └───────┘  └───────┘  └───────┘  └───────┘  └───────┘    │   │   │
│  │   └─────────────────────────────────────────────────────────────┘   │   │
│  │                                                                      │   │
│  │   ┌─────────────────┐   ┌─────────────────────────────────────┐     │   │
│  │   │   PostgreSQL    │   │            DAGs (PVC/Git-Sync)      │     │   │
│  │   │  (Metadata DB)  │   │                                     │     │   │
│  │   └─────────────────┘   └─────────────────────────────────────┘     │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────┘

Les Executors Airflow

Executor Description Quand l’utiliser
SequentialExecutor 1 tâche à la fois Dev/test uniquement
LocalExecutor Parallèle sur 1 machine Petite production
CeleryExecutor Workers Celery distribués Production classique
KubernetesExecutor 1 pod K8s par tâche Production K8s
CeleryKubernetesExecutor Hybrid Celery + K8s Workloads mixtes

Installation avec le Helm Chart Officiel

# Ajouter le repo Helm officiel Airflow
helm repo add apache-airflow https://airflow.apache.org
helm repo update

# Créer le namespace
kubectl create namespace airflow

# Installer Airflow avec KubernetesExecutor
helm install airflow apache-airflow/airflow \
  --namespace airflow \
  --set executor=KubernetesExecutor \
  --set webserver.defaultUser.password=admin \
  --set dags.persistence.enabled=true \
  --set dags.gitSync.enabled=true \
  --set dags.gitSync.repo=https://github.com/myorg/airflow-dags.git \
  --set dags.gitSync.branch=main \
  --set dags.gitSync.subPath=dags

# Vérifier l'installation
kubectl get pods -n airflow

# Accéder au webserver
kubectl port-forward svc/airflow-webserver -n airflow 8080:8080
# http://localhost:8080 (admin / admin)

Configuration values.yaml avancée

# values-production.yaml
executor: KubernetesExecutor

# Webserver
webserver:
  replicas: 2
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "1000m"
      memory: "2Gi"

# Scheduler
scheduler:
  replicas: 2    # HA Scheduler (Airflow 2.0+)
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"

# Configuration KubernetesExecutor
config:
  kubernetes:
    # Namespace pour les worker pods
    namespace: airflow
    # Supprimer les pods après exécution
    delete_worker_pods: "True"
    delete_worker_pods_on_failure: "False"  # Garder pour debug
    # Image par défaut pour les workers
    worker_container_repository: apache/airflow
    worker_container_tag: 2.8.0-python3.11

# Git-Sync pour les DAGs
dags:
  persistence:
    enabled: false
  gitSync:
    enabled: true
    repo: git@github.com:myorg/airflow-dags.git
    branch: main
    subPath: dags
    sshKeySecret: airflow-git-ssh-key
    wait: 60  # Sync toutes les 60 secondes

# Logs dans un stockage externe
logs:
  persistence:
    enabled: true
    size: 10Gi

# PostgreSQL (ou utiliser un service externe)
postgresql:
  enabled: true
  auth:
    postgresPassword: airflow
    username: airflow
    password: airflow
    database: airflow

KubernetesExecutor : Comment ça marche

1. DAG est schedulé
       │
       ▼
2. Scheduler parse le DAG et identifie les tâches à exécuter
       │
       ▼
3. Pour chaque tâche, le Scheduler crée un Pod K8s
       │
   ┌───┴───────────────────────────────────────┐
   │  Pod Spec généré automatiquement :        │
   │  - Image: airflow (ou custom)             │
   │  - Command: airflow tasks run ...         │
   │  - Env: connexions, variables             │
   │  - Resources: requests/limits             │
   └───────────────────────────────────────────┘
       │
       ▼
4. K8s schedule le pod sur un node
       │
       ▼
5. La tâche s'exécute
       │
       ▼
6. Pod terminé → supprimé (si delete_worker_pods=True)

KubernetesPodOperator

Pour exécuter des tâches avec des images Docker personnalisées :

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime

with DAG(
    dag_id="etl_with_custom_image",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:
    
    # Tâche avec image Spark
    spark_job = KubernetesPodOperator(
        task_id="run_spark_etl",
        name="spark-etl-job",
        namespace="airflow",
        image="my-registry/spark-etl:1.0",
        cmds=["spark-submit"],
        arguments=[
            "--master", "k8s://https://kubernetes.default.svc",
            "/app/etl_job.py"
        ],
        # Ressources
        container_resources={
            "requests": {"cpu": "1", "memory": "2Gi"},
            "limits": {"cpu": "2", "memory": "4Gi"},
        },
        # Variables d'environnement
        env_vars={
            "SOURCE_PATH": "s3://bucket/raw/",
            "DEST_PATH": "s3://bucket/processed/",
        },
        # Secrets
        secrets=[
            {"secret": "aws-credentials", "key": "AWS_ACCESS_KEY_ID", "env": "AWS_ACCESS_KEY_ID"},
            {"secret": "aws-credentials", "key": "AWS_SECRET_ACCESS_KEY", "env": "AWS_SECRET_ACCESS_KEY"},
        ],
        # Configuration K8s
        is_delete_operator_pod=True,
        get_logs=True,
        startup_timeout_seconds=300,
        # Affinity/Tolerations
        affinity={
            "nodeAffinity": {
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [{
                        "matchExpressions": [{
                            "key": "node-type",
                            "operator": "In",
                            "values": ["compute"]
                        }]
                    }]
                }
            }
        },
    )
    
    # Tâche avec image Python custom
    python_job = KubernetesPodOperator(
        task_id="run_python_transform",
        name="python-transform",
        namespace="airflow",
        image="my-registry/python-etl:2.0",
        cmds=["python", "/app/transform.py"],
        container_resources={
            "requests": {"cpu": "500m", "memory": "1Gi"},
            "limits": {"cpu": "1", "memory": "2Gi"},
        },
        is_delete_operator_pod=True,
        get_logs=True,
    )
    
    spark_job >> python_job

pod_template_file : Personnalisation avancée

Pour des configurations complexes, utiliser un template YAML :

# pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
  name: airflow-worker
  labels:
    app: airflow-worker
spec:
  serviceAccountName: airflow-worker
  containers:
    - name: base
      image: apache/airflow:2.8.0-python3.11
      imagePullPolicy: IfNotPresent
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: "LocalExecutor"
      resources:
        requests:
          cpu: "500m"
          memory: "512Mi"
        limits:
          cpu: "1000m"
          memory: "1Gi"
      volumeMounts:
        - name: dags
          mountPath: /opt/airflow/dags
          readOnly: true
  volumes:
    - name: dags
      persistentVolumeClaim:
        claimName: airflow-dags
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
# Dans le DAG, référencer le template
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

task = KubernetesPodOperator(
    task_id="task_with_template",
    name="custom-task",
    namespace="airflow",
    pod_template_file="/opt/airflow/pod_templates/pod_template.yaml",
    # Override l'image du template
    image="my-custom-image:1.0",
)

2. TaskFlow API

La TaskFlow API (introduite dans Airflow 2.0) permet d’écrire des DAGs de manière Pythonic avec des décorateurs @dag et @task, en gérant automatiquement les XComs.

Pourquoi TaskFlow ?

Approche Traditionnelle TaskFlow API
PythonOperator(python_callable=fn) @task sur la fonction
xcom_push / xcom_pull manuels Passage de données automatique
Verbeux Concis et lisible
Dépendances explicites >> Dépendances implicites par appel

Exemple comparatif

Avant (approche traditionnelle) :

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract(**context):
    data = {"users": 100, "orders": 500}
    context['ti'].xcom_push(key='extracted_data', value=data)
    return data

def transform(**context):
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract_task', key='extracted_data')
    transformed = {"total": data['users'] + data['orders']}
    ti.xcom_push(key='transformed_data', value=transformed)
    return transformed

def load(**context):
    ti = context['ti']
    data = ti.xcom_pull(task_ids='transform_task', key='transformed_data')
    print(f"Loading: {data}")

with DAG(
    dag_id='traditional_etl',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract_task',
        python_callable=extract,
    )
    
    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=transform,
    )
    
    load_task = PythonOperator(
        task_id='load_task',
        python_callable=load,
    )
    
    extract_task >> transform_task >> load_task

Après (TaskFlow API) :

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='taskflow_etl',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def taskflow_etl():
    
    @task
    def extract() -> dict:
        return {"users": 100, "orders": 500}
    
    @task
    def transform(data: dict) -> dict:
        return {"total": data['users'] + data['orders']}
    
    @task
    def load(data: dict):
        print(f"Loading: {data}")
    
    # Dépendances implicites par appel de fonction !
    raw_data = extract()
    transformed_data = transform(raw_data)
    load(transformed_data)

# Instancier le DAG
taskflow_etl()

Avantages TaskFlow

Avantage Description
Code Pythonic Ressemble à du Python standard
XCom automatique Les retours de fonction sont automatiquement passés
Type hints Support des annotations de type
Moins de boilerplate Pas de PythonOperator explicite
Dépendances claires Le flux de données définit les dépendances

TaskFlow avec plusieurs outputs

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='taskflow_multiple_outputs',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def etl_pipeline():
    
    @task(multiple_outputs=True)
    def extract() -> dict:
        """Retourne plusieurs valeurs nommées"""
        return {
            "users": [{"id": 1, "name": "Alice"}],
            "orders": [{"id": 101, "amount": 99.99}],
            "metadata": {"source": "api", "timestamp": "2024-01-01"}
        }
    
    @task
    def process_users(users: list) -> list:
        return [u['name'].upper() for u in users]
    
    @task
    def process_orders(orders: list) -> float:
        return sum(o['amount'] for o in orders)
    
    @task
    def combine(users: list, total: float, metadata: dict):
        print(f"Users: {users}")
        print(f"Total orders: {total}")
        print(f"Source: {metadata['source']}")
    
    # Extraire les données
    data = extract()
    
    # Accéder aux outputs individuels
    processed_users = process_users(data['users'])
    orders_total = process_orders(data['orders'])
    
    # Combiner
    combine(processed_users, orders_total, data['metadata'])

etl_pipeline()

Mixing TaskFlow avec des Operators classiques

from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

@dag(
    dag_id='mixed_taskflow',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def mixed_pipeline():
    
    # TaskFlow task
    @task
    def prepare_config() -> dict:
        return {"batch_size": 1000, "date": "2024-01-01"}
    
    # Operator classique
    run_spark = KubernetesPodOperator(
        task_id="run_spark",
        name="spark-job",
        namespace="airflow",
        image="apache/spark:3.5.0",
        cmds=["spark-submit", "/app/job.py"],
    )
    
    # TaskFlow task qui dépend d'un Operator
    @task
    def validate_output():
        print("Validating Spark output...")
        return True
    
    # Bash operator
    notify = BashOperator(
        task_id="notify",
        bash_command="echo 'Pipeline completed!'",
    )
    
    # Définir les dépendances
    config = prepare_config()
    config >> run_spark >> validate_output() >> notify

mixed_pipeline()

3. Dynamic Task Mapping

Le Dynamic Task Mapping (Airflow 2.3+) permet de créer un nombre variable de tâches à runtime, basé sur les données.

Pourquoi Dynamic Mapping ?

Problème Solution
Nombre de fichiers inconnu à l’avance .expand() sur la liste de fichiers
Traiter N partitions dynamiquement Map sur les partitions
Paralléliser sur une liste variable Créer N tâches automatiquement

Schéma Dynamic Mapping

                    ┌─────────────────┐
                    │  list_files()   │
                    │ [f1, f2, f3, f4]│
                    └────────┬────────┘
                             │
            ┌────────────────┼────────────────┐
            │                │                │
            ▼                ▼                ▼
    ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
    │ process(f1)   │ │ process(f2)   │ │ process(f3)   │ ...
    │   [mapped]    │ │   [mapped]    │ │   [mapped]    │
    └───────┬───────┘ └───────┬───────┘ └───────┬───────┘
            │                 │                 │
            └─────────────────┼─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │   aggregate()   │
                    │ Combine results │
                    └─────────────────┘

Exemple : Traiter des fichiers dynamiquement

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='dynamic_file_processing',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def process_files_dynamically():
    
    @task
    def list_files() -> list[str]:
        """Liste les fichiers à traiter (nombre variable)"""
        # En réalité : lister depuis S3, GCS, etc.
        return [
            "s3://bucket/data/file1.parquet",
            "s3://bucket/data/file2.parquet",
            "s3://bucket/data/file3.parquet",
            "s3://bucket/data/file4.parquet",
        ]
    
    @task
    def process_file(file_path: str) -> dict:
        """Traite UN fichier — sera mappé dynamiquement"""
        print(f"Processing: {file_path}")
        # Simuler le traitement
        row_count = len(file_path) * 100  # Fake
        return {"file": file_path, "rows": row_count}
    
    @task
    def aggregate_results(results: list[dict]) -> dict:
        """Agrège tous les résultats"""
        total_rows = sum(r['rows'] for r in results)
        return {
            "files_processed": len(results),
            "total_rows": total_rows,
        }
    
    # Récupérer la liste de fichiers
    files = list_files()
    
    # 🎯 DYNAMIC MAPPING : .expand() crée N tâches
    processed = process_file.expand(file_path=files)
    
    # Agréger les résultats
    aggregate_results(processed)

process_files_dynamically()

expand() avec plusieurs paramètres

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='dynamic_multi_param',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def multi_param_mapping():
    
    @task
    def get_partitions() -> list[dict]:
        return [
            {"date": "2024-01-01", "region": "EU"},
            {"date": "2024-01-01", "region": "US"},
            {"date": "2024-01-02", "region": "EU"},
            {"date": "2024-01-02", "region": "US"},
        ]
    
    @task
    def process_partition(date: str, region: str) -> dict:
        print(f"Processing {region} for {date}")
        return {"date": date, "region": region, "status": "done"}
    
    partitions = get_partitions()
    
    # expand_kwargs pour mapper plusieurs paramètres
    process_partition.expand_kwargs(partitions)

multi_param_mapping()

Limiter le parallélisme

@task(max_active_tis_per_dag=5)  # Max 5 instances en parallèle
def process_file(file_path: str) -> dict:
    # ...
    pass

Mapping sur un Operator (non-TaskFlow)

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id='dynamic_bash',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
) as dag:
    
    # Liste statique (ou XCom d'une tâche précédente)
    files = ["file1.csv", "file2.csv", "file3.csv"]
    
    process = BashOperator.partial(
        task_id="process_files",
    ).expand(
        bash_command=[f"python process.py {f}" for f in files]
    )

4. Airflow vs Dagster vs Prefect

Il existe plusieurs orchestrateurs modernes. Voici une comparaison approfondie.

Vue d’ensemble

Critère Airflow Dagster Prefect
Créé par Airbnb (2014) Elementl (2018) Prefect (2018)
Philosophie DAG-centric Asset-centric Flow-centric
Maturité ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐
Communauté Très large En croissance En croissance
Adoption Standard industrie Startups, ML Startups, Data
Learning curve Moyenne Plus raide Plus douce

Comparaison détaillée

Aspect Airflow Dagster Prefect
Définition DAGs Python Assets + Ops Flows + Tasks
Scheduling Cron-like Cron + Sensors Cron + Events
Data Lineage Via OpenLineage Natif (Assets) Via intégrations
Testing Difficile Excellent Bon
Type checking Non natif Natif (I/O types) Pydantic
UI Fonctionnelle Moderne Moderne
Local dev Complexe Excellent Excellent
Cloud offering MWAA, Composer, Astronomer Dagster Cloud Prefect Cloud

Exemples de code comparés

Même pipeline dans les 3 outils :

Airflow (TaskFlow)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def etl_pipeline():
    
    @task
    def extract() -> dict:
        return {"data": [1, 2, 3, 4, 5]}
    
    @task
    def transform(raw: dict) -> dict:
        return {"data": [x * 2 for x in raw['data']]}
    
    @task
    def load(transformed: dict):
        print(f"Loading: {transformed}")
    
    raw = extract()
    transformed = transform(raw)
    load(transformed)

etl_pipeline()

Dagster (Asset-based)

from dagster import asset, Definitions, define_asset_job

@asset
def raw_data() -> dict:
    """Asset: données brutes"""
    return {"data": [1, 2, 3, 4, 5]}

@asset
def transformed_data(raw_data: dict) -> dict:
    """Asset: données transformées (dépend de raw_data)"""
    return {"data": [x * 2 for x in raw_data['data']]}

@asset
def loaded_data(transformed_data: dict) -> None:
    """Asset: données chargées"""
    print(f"Loading: {transformed_data}")

# Job pour exécuter tous les assets
etl_job = define_asset_job("etl_job", selection="*")

# Définitions Dagster
defs = Definitions(
    assets=[raw_data, transformed_data, loaded_data],
    jobs=[etl_job],
)

Dagster : Concepts clés

Concept Description
Asset Objet de données persistant (table, fichier)
Op Unité de calcul (comme un Operator)
Graph Composition d’Ops
Job Graph exécutable avec config
Resource Connexion externe (DB, S3)
I/O Manager Gère la persistance des assets

Prefect (Flow-based)

from prefect import flow, task

@task
def extract() -> dict:
    return {"data": [1, 2, 3, 4, 5]}

@task
def transform(raw: dict) -> dict:
    return {"data": [x * 2 for x in raw['data']]}

@task
def load(transformed: dict):
    print(f"Loading: {transformed}")

@flow(name="ETL Pipeline")
def etl_pipeline():
    raw = extract()
    transformed = transform(raw)
    load(transformed)

# Exécuter
if __name__ == "__main__":
    etl_pipeline()

Prefect : Concepts clés

Concept Description
Flow Pipeline de tâches
Task Unité de travail
Deployment Flow déployé et schedulable
Work Pool Groupe de workers
Block Credentials et configs réutilisables

Quand utiliser quoi ?

Scénario Recommandation Pourquoi
Grande entreprise, équipe mature Airflow Standard, très documenté, écosystème large
Pipelines ML avec assets Dagster Asset-centric, excellent testing, types
Startup, itération rapide Prefect Simple, moderne, local dev facile
Déjà sur Airflow Rester sur Airflow Migration coûteuse, TaskFlow moderne
Nouveau projet, équipe data Dagster ou Prefect Approches modernes
K8s natif requis Airflow ou Dagster KubernetesExecutor mature

Tableau de décision

                            Complexité du pipeline
                     Faible ◄─────────────────► Élevée
                        │                          │
    Taille équipe       │                          │
         │              │                          │
     Petite  ───────────┼──► Prefect    ──────────►│ Dagster
         │              │                          │
         │              │                          │
    Grande  ────────────┼──► Prefect    ──────────►│ Airflow
         │              │    ou Dagster            │
         ▼              │                          │

5. OpenLineage — Data Lineage

OpenLineage est un standard ouvert pour collecter et partager les métadonnées de lineage des pipelines data. Il répond à la question : “D’où viennent mes données et où vont-elles ?”

Pourquoi le Data Lineage ?

Question Lineage répond
D’où viennent les données de ce dashboard ? ✅ Traçabilité amont
Si cette table change, quoi d’autre est impacté ? ✅ Impact analysis
Ce job a échoué, quelles données sont corrompues ? ✅ Root cause analysis
Sommes-nous conformes RGPD ? ✅ Audit et gouvernance

Schéma OpenLineage

┌─────────────────────────────────────────────────────────────────────────────┐
│                         OPENLINEAGE ECOSYSTEM                               │
│                                                                             │
│   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐    │
│   │   Airflow   │   │   Spark     │   │    dbt      │   │   Flink     │    │
│   │  + OL Plugin│   │  + OL Lib   │   │  + OL Plugin│   │  + OL Lib   │    │
│   └──────┬──────┘   └──────┬──────┘   └──────┬──────┘   └──────┬──────┘    │
│          │                 │                 │                 │            │
│          │    OpenLineage Events (JSON)      │                 │            │
│          └─────────────────┴─────────────────┴─────────────────┘            │
│                                    │                                        │
│                                    ▼                                        │
│                    ┌───────────────────────────────┐                       │
│                    │     OpenLineage Backend       │                       │
│                    │  (Marquez, Atlan, DataHub...) │                       │
│                    └───────────────────────────────┘                       │
│                                    │                                        │
│                                    ▼                                        │
│                    ┌───────────────────────────────┐                       │
│                    │    Lineage Visualization      │                       │
│                    │    Impact Analysis            │                       │
│                    │    Data Catalog               │                       │
│                    └───────────────────────────────┘                       │
└─────────────────────────────────────────────────────────────────────────────┘

Concepts OpenLineage

Concept Description Exemple
Job Pipeline ou transformation DAG Airflow, job Spark
Run Exécution d’un job DAG run avec ID unique
Dataset Source ou destination de données Table SQL, fichier S3
Facet Métadonnées additionnelles Schema, stats, owner

Activer OpenLineage dans Airflow

# Installer le provider
pip install apache-airflow-providers-openlineage
# airflow.cfg ou variables d'environnement
[openlineage]
transport = '{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
namespace = "my_airflow_instance"

Ou via environnement :

export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
export AIRFLOW__OPENLINEAGE__NAMESPACE="my_airflow_instance"

Marquez : Backend OpenLineage

Marquez est le backend de référence pour OpenLineage (open-source par WeWork/Linux Foundation).

# Déployer Marquez avec Docker Compose
git clone https://github.com/MarquezProject/marquez.git
cd marquez
docker-compose up -d

# UI disponible sur http://localhost:3000
# API sur http://localhost:5000

DAG avec lineage explicite

from airflow.decorators import dag, task
from airflow.lineage.entities import Table, File
from datetime import datetime

@dag(
    dag_id='etl_with_lineage',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def etl_with_lineage():
    
    @task(
        inlets=[File(url="s3://bucket/raw/")],
        outlets=[Table(cluster="warehouse", database="analytics", name="staging_orders")],
    )
    def extract_and_stage():
        """Extract depuis S3 et charge dans staging"""
        print("Extracting from S3 to staging...")
    
    @task(
        inlets=[Table(cluster="warehouse", database="analytics", name="staging_orders")],
        outlets=[Table(cluster="warehouse", database="analytics", name="fact_orders")],
    )
    def transform_to_fact():
        """Transforme staging en fact table"""
        print("Transforming to fact table...")
    
    extract_and_stage() >> transform_to_fact()

etl_with_lineage()

Lineage automatique avec certains Operators

Certains operators extraient automatiquement le lineage :

Operator Lineage auto
SnowflakeOperator ✅ (parse SQL)
BigQueryOperator
PostgresOperator
SparkSubmitOperator ✅ (avec Spark OL)
PythonOperator ❌ (manuel)

Alternatives à Marquez

Outil Type Description
Marquez Open-source Backend de référence
DataHub Open-source Catalogue + lineage (LinkedIn)
Atlan SaaS Plateforme data catalog complète
Collibra Enterprise Gouvernance et lineage
Alation Enterprise Data catalog

6. Astronomer — Airflow Enterprise

Astronomer est la plateforme enterprise pour Apache Airflow. Elle fournit des outils, un hosting managé, et des SDKs pour industrialiser Airflow.

Pourquoi Astronomer ?

Problème Solution Astronomer
Installer/maintenir Airflow est complexe Astro Cloud (fully managed)
Dev local difficile Astro CLI (environment identique)
Écrire du code Airflow verbeux Astro SDK (API simplifiée)
Pas de support Support enterprise 24/7

Écosystème Astronomer

┌─────────────────────────────────────────────────────────────────────────────┐
│                         ASTRONOMER ECOSYSTEM                                │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                        Astro Cloud                                   │  │
│   │  • Airflow fully managed                                            │  │
│   │  • Auto-scaling                                                      │  │
│   │  • Observability intégrée                                           │  │
│   │  • CI/CD intégré                                                    │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌───────────────────────┐   ┌───────────────────────┐                    │
│   │      Astro CLI        │   │      Astro SDK        │                    │
│   │  • Dev local          │   │  • API Pythonic       │                    │
│   │  • Deploy en 1 cmd    │   │  • SQL/Python tasks   │                    │
│   │  • Tests intégrés     │   │  • Data quality       │                    │
│   └───────────────────────┘   └───────────────────────┘                    │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    Astronomer Registry                               │  │
│   │  • Providers certifiés                                               │  │
│   │  • DAGs de référence                                                 │  │
│   │  • Documentation enrichie                                            │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

Astro CLI

L’outil en ligne de commande pour développer et déployer Airflow.

# Installation
# macOS
brew install astro

# Linux
curl -sSL install.astronomer.io | sudo bash -s

# Windows (WSL)
curl -sSL install.astronomer.io | sudo bash -s
# Créer un nouveau projet Airflow
mkdir my-airflow-project && cd my-airflow-project
astro dev init

# Structure générée :
# .
# ├── dags/                 # Tes DAGs
# │   └── example_dag.py
# ├── include/              # Fichiers additionnels
# ├── plugins/              # Plugins custom
# ├── tests/                # Tests
# ├── Dockerfile            # Image Airflow custom
# ├── packages.txt          # Packages système
# ├── requirements.txt      # Dépendances Python
# └── airflow_settings.yaml # Variables, connexions

# Démarrer l'environnement local
astro dev start
# UI sur http://localhost:8080 (admin/admin)

# Voir les logs
astro dev logs

# Exécuter les tests
astro dev pytest

# Parser les DAGs (vérifier les erreurs)
astro dev parse

# Arrêter
astro dev stop

# Déployer sur Astro Cloud
astro deploy

Astro SDK

Une API Python simplifiée pour écrire des DAGs, particulièrement pour les opérations SQL/dataframe.

pip install astro-sdk-python

Exemple : ETL SQL simplifié

from datetime import datetime
from airflow.decorators import dag
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table

@dag(
    dag_id='astro_sdk_etl',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
)
def astro_sdk_etl():
    
    # Charger un CSV dans une table
    raw_orders = aql.load_file(
        input_file=File("s3://bucket/orders.csv"),
        output_table=Table(
            name="raw_orders",
            conn_id="postgres_conn",
        ),
    )
    
    # Transformation SQL (inline)
    @aql.transform
    def transform_orders(input_table: Table):
        return """
            SELECT 
                order_id,
                customer_id,
                order_date,
                amount,
                amount * 1.2 as amount_with_tax
            FROM {{ input_table }}
            WHERE amount > 0
        """
    
    # Appliquer la transformation
    transformed = transform_orders(
        input_table=raw_orders,
        output_table=Table(
            name="transformed_orders",
            conn_id="postgres_conn",
        ),
    )
    
    # Exporter vers S3
    aql.export_to_file(
        input_data=transformed,
        output_file=File("s3://bucket/processed/orders.parquet"),
        if_exists="replace",
    )
    
    # Nettoyer les tables temporaires
    aql.cleanup()

astro_sdk_etl()

Fonctionnalités Astro SDK

Fonction Description
aql.load_file() Charger CSV/Parquet/JSON dans une table
aql.transform() SQL transformation avec Jinja
aql.run_raw_sql() Exécuter du SQL brut
aql.export_to_file() Exporter une table vers fichier
aql.merge() Merge/Upsert entre tables
aql.append() Append data to table
aql.dataframe() Transformer avec Pandas
aql.cleanup() Supprimer les tables temporaires

Astro SDK : Transformation DataFrame

from astro import sql as aql
from astro.sql.table import Table
import pandas as pd

@aql.dataframe
def process_with_pandas(df: pd.DataFrame) -> pd.DataFrame:
    """Transformation avec Pandas"""
    df['total'] = df['quantity'] * df['price']
    df['processed_at'] = pd.Timestamp.now()
    return df

# Dans le DAG :
processed = process_with_pandas(
    df=some_table,
    output_table=Table(name="processed", conn_id="postgres"),
)

Astro Cloud vs Self-Hosted

Aspect Astro Cloud Self-Hosted (Helm)
Setup Minutes Heures/jours
Maintenance Astronomer Ton équipe
Scaling Automatique Manuel
Updates Automatiques Manuels
Coût Par usage Infrastructure
Contrôle Moyen Total
Sécurité SOC 2, HIPAA À implémenter

7. Exercices Pratiques

Exercice 1 : DAG TaskFlow avec Dynamic Mapping

Objectif : Créer un DAG qui traite dynamiquement une liste de fichiers.

Instructions : 1. Créer une tâche list_files() qui retourne une liste de chemins 2. Créer une tâche process_file(path) mappée dynamiquement 3. Créer une tâche report(results) qui agrège les résultats 4. Limiter à 3 tâches parallèles maximum

💡 Indice

Utilise @task(max_active_tis_per_dag=3) et .expand()


Exercice 2 : KubernetesPodOperator avec Spark

Objectif : Créer un DAG qui exécute un job Spark via KubernetesPodOperator.

Instructions : 1. Utiliser l’image apache/spark:3.5.0 2. Configurer les ressources : 2 CPU, 4Gi RAM 3. Passer des variables d’environnement pour la config 4. Utiliser une node affinity pour cibler les nodes compute


Exercice 3 : Comparatif Airflow/Dagster/Prefect

Objectif : Implémenter le même pipeline simple dans les 3 outils.

Pipeline : 1. Lire un fichier JSON 2. Filtrer les enregistrements (amount > 100) 3. Calculer une somme 4. Écrire le résultat

Comparer : - Nombre de lignes de code - Facilité de test local - Lisibilité


Exercice 4 : OpenLineage avec Marquez

Objectif : Configurer OpenLineage et visualiser le lineage.

Instructions : 1. Déployer Marquez en local (Docker Compose) 2. Configurer Airflow pour envoyer les events à Marquez 3. Créer un DAG avec inlets et outlets explicites 4. Visualiser le lineage dans l’UI Marquez


Exercice 5 : Astro CLI Project

Objectif : Créer un projet Airflow complet avec Astro CLI.

Instructions : 1. Initialiser un projet avec astro dev init 2. Créer un DAG utilisant Astro SDK 3. Ajouter des tests dans /tests 4. Valider avec astro dev pytest


8. Mini-Projet : Pipeline Data Production-Ready

Objectif

Créer un pipeline ETL production-ready déployé sur Kubernetes avec monitoring et lineage.

Architecture cible

┌─────────────────────────────────────────────────────────────────────────────┐
│                          MINI-PROJET M28                                    │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                    Airflow sur Kubernetes                            │  │
│   │                                                                      │  │
│   │   ┌─────────────────────────────────────────────────────────────┐   │  │
│   │   │            DAG: data_pipeline_advanced                       │   │  │
│   │   │                                                              │   │  │
│   │   │   ┌─────────┐    ┌─────────────┐    ┌─────────────────┐     │   │  │
│   │   │   │ Extract │───▶│  Transform  │───▶│      Load       │     │   │  │
│   │   │   │ (K8sPod)│    │ (TaskFlow)  │    │  (Astro SDK)    │     │   │  │
│   │   │   └─────────┘    └──────┬──────┘    └─────────────────┘     │   │  │
│   │   │                         │                                    │   │  │
│   │   │            Dynamic Mapping (N fichiers)                      │   │  │
│   │   └─────────────────────────────────────────────────────────────┘   │  │
│   │                                                                      │  │
│   │   ┌─────────────┐   ┌─────────────┐   ┌─────────────────────────┐   │  │
│   │   │  Scheduler  │   │  Webserver  │   │    Worker Pods (K8s)    │   │  │
│   │   └─────────────┘   └─────────────┘   └─────────────────────────┘   │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                    │                                        │
│                         OpenLineage Events                                  │
│                                    ▼                                        │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                         Marquez                                      │  │
│   │              (Data Lineage Visualization)                           │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

Instructions

Étape 1 : Setup Astro Project

mkdir advanced-pipeline && cd advanced-pipeline
astro dev init

Étape 2 : Créer le DAG principal

Le DAG doit : - Lister des fichiers depuis S3 (simulé) - Traiter chaque fichier avec Dynamic Mapping - Utiliser TaskFlow API - Avoir des inlets/outlets pour le lineage

Étape 3 : Configurer OpenLineage

  • Ajouter la config dans airflow_settings.yaml
  • Vérifier que les events arrivent dans Marquez

Étape 4 : Tests

  • Écrire des tests unitaires pour les tâches
  • Valider avec astro dev pytest

Étape 5 : Déployer (optionnel)

  • Déployer sur un cluster K8s avec le Helm chart
  • Ou utiliser Astro Cloud

✅ Solution du mini-projet

📥 Afficher la solution complète

1. Structure du projet Astro

advanced-pipeline/
├── dags/
│   └── data_pipeline_advanced.py
├── include/
│   └── sql/
│       └── transform.sql
├── tests/
│   └── test_pipeline.py
├── Dockerfile
├── requirements.txt
└── airflow_settings.yaml

2. requirements.txt

astro-sdk-python>=1.5.0
apache-airflow-providers-openlineage>=1.0.0
apache-airflow-providers-cncf-kubernetes>=7.0.0
pandas>=2.0.0

3. airflow_settings.yaml

airflow:
  connections:
    - conn_id: postgres_conn
      conn_type: postgres
      host: postgres
      login: airflow
      password: airflow
      schema: airflow
      port: 5432

  variables:
    - variable_name: data_bucket
      variable_value: s3://my-data-bucket

  # OpenLineage config
  openlineage:
    transport: '{"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}'
    namespace: advanced-pipeline

4. dags/data_pipeline_advanced.py

from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.lineage.entities import File, Table
from astro import sql as aql
from astro.sql.table import Table as AstroTable
import pandas as pd

@dag(
    dag_id='data_pipeline_advanced',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    tags=['production', 'etl'],
    doc_md="""## Pipeline Data Avancé
    
    Ce pipeline:
    1. Liste les fichiers à traiter
    2. Extrait chaque fichier (Dynamic Mapping)
    3. Transforme avec Astro SDK
    4. Charge dans PostgreSQL
    """,
)
def data_pipeline_advanced():
    
    # --- ÉTAPE 1 : Lister les fichiers ---
    @task
    def list_files() -> list[str]:
        """Liste les fichiers à traiter (simulé)"""
        # En production: lister depuis S3
        return [
            "orders_2024_01.parquet",
            "orders_2024_02.parquet",
            "orders_2024_03.parquet",
        ]
    
    # --- ÉTAPE 2 : Extraire chaque fichier ---
    @task(
        max_active_tis_per_dag=3,
        inlets=[File(url="s3://bucket/raw/")],
    )
    def extract_file(file_name: str) -> dict:
        """Extrait un fichier (simulé)"""
        print(f"Extracting: {file_name}")
        # Simuler des données
        return {
            "file": file_name,
            "rows": 1000,
            "data": [
                {"order_id": 1, "amount": 100},
                {"order_id": 2, "amount": 200},
            ]
        }
    
    # --- ÉTAPE 3 : Transformer avec Pandas ---
    @task(
        outlets=[Table(cluster="warehouse", database="analytics", name="orders_processed")],
    )
    def transform_data(extracted_list: list[dict]) -> list[dict]:
        """Agrège et transforme les données"""
        all_data = []
        for extracted in extracted_list:
            for record in extracted['data']:
                record['source_file'] = extracted['file']
                record['amount_with_tax'] = record['amount'] * 1.2
                all_data.append(record)
        
        print(f"Transformed {len(all_data)} records")
        return all_data
    
    # --- ÉTAPE 4 : Charger dans PostgreSQL ---
    @aql.dataframe
    def load_to_postgres(data: list[dict]) -> pd.DataFrame:
        """Charge dans PostgreSQL via Astro SDK"""
        df = pd.DataFrame(data)
        df['loaded_at'] = pd.Timestamp.now()
        return df
    
    # --- ÉTAPE 5 : Rapport final ---
    @task
    def generate_report(row_count: int):
        """Génère un rapport"""
        print(f"Pipeline completed. Total rows: {row_count}")
    
    # --- ORCHESTRATION ---
    files = list_files()
    
    # Dynamic mapping sur les fichiers
    extracted = extract_file.expand(file_name=files)
    
    # Transformer
    transformed = transform_data(extracted)
    
    # Charger
    loaded = load_to_postgres(
        data=transformed,
        output_table=AstroTable(
            name="orders_processed",
            conn_id="postgres_conn",
        ),
    )
    
    # Rapport
    generate_report(len(transformed))
    
    # Cleanup Astro SDK
    aql.cleanup()

# Instancier le DAG
data_pipeline_advanced()

5. tests/test_pipeline.py

import pytest
from dags.data_pipeline_advanced import data_pipeline_advanced

def test_dag_loads():
    """Test que le DAG se charge sans erreur"""
    dag = data_pipeline_advanced()
    assert dag is not None
    assert dag.dag_id == "data_pipeline_advanced"

def test_dag_has_expected_tasks():
    """Test que le DAG a les bonnes tâches"""
    dag = data_pipeline_advanced()
    task_ids = [t.task_id for t in dag.tasks]
    
    assert "list_files" in task_ids
    assert "transform_data" in task_ids

def test_extract_file():
    """Test unitaire de la fonction extract"""
    # Import direct de la fonction
    from dags.data_pipeline_advanced import extract_file
    
    # Appeler la fonction wrapped
    result = extract_file.function("test.parquet")
    
    assert "file" in result
    assert "rows" in result
    assert result["file"] == "test.parquet"

6. Commandes pour tester

# Démarrer l'environnement
astro dev start

# Parser les DAGs
astro dev parse

# Lancer les tests
astro dev pytest

# Voir l'UI
open http://localhost:8080

# Déclencher le DAG
astro dev run dags trigger data_pipeline_advanced

7. docker-compose.override.yml (pour Marquez)

version: "3"
services:
  marquez:
    image: marquezproject/marquez:latest
    ports:
      - "5000:5000"
      - "5001:5001"
    environment:
      - MARQUEZ_PORT=5000
      - MARQUEZ_ADMIN_PORT=5001
  
  marquez-web:
    image: marquezproject/marquez-web:latest
    ports:
      - "3000:3000"
    environment:
      - MARQUEZ_HOST=marquez
      - MARQUEZ_PORT=5000

📚 Ressources pour aller plus loin

🌐 Documentation officielle

🎮 Pratique

📖 Livres & Articles

  • Data Pipelines with Apache Airflow — Bas Harenslak & Julian de Ruiter
  • Fundamentals of Data Engineering — Joe Reis & Matt Housley
  • Astronomer Blog — Best practices Airflow

🔧 Outils


➡️ Prochaine étape

Maintenant que tu maîtrises l’orchestration avancée, passons au messaging distribué !

👉 Module suivant : 29_distributed_messaging — Kafka avancé, RabbitMQ, Pulsar, Debezium

Tu vas apprendre : - Kafka avancé (Quotas, Tiered Storage) - Alternatives : RabbitMQ, Apache Pulsar - Change Data Capture avec Debezium - Patterns de messaging distribué


🎉 Félicitations ! Tu as terminé le module Advanced Orchestration pour Data Engineers.

Retour au sommet