Orchestration de Pipelines Data

Ce module présente les outils d’orchestration pour automatiser l’exécution de vos pipelines de données.


Prérequis

Niveau Compétence
✅ Requis Avoir suivi le module 11_pyspark_for_data_engineering
✅ Requis Comprendre les pipelines ETL
✅ Requis Maîtriser Python (modules 04-05)
✅ Requis Connaître les bases de Linux (ligne de commande)

Objectifs du module

À la fin de ce notebook, tu seras capable de :

  • Comprendre ce qu’est l’orchestration de pipelines
  • Utiliser le Planificateur Windows (niveau débutant)
  • Configurer des tâches avec Crontab (niveau intermédiaire)
  • Comprendre l’architecture d’Apache Airflow
  • Créer des DAGs avec Apache Airflow
  • Utiliser les différents types d’Operators
  • Gérer les dépendances et le passage de données (XCom)
  • Configurer les alertes et le monitoring
  • Choisir le bon outil selon ton besoin

🎯 L’orchestration dans l’écosystème Data Engineering

Tu as appris à créer des pipelines ETL avec PySpark. Mais comment les automatiser pour qu’ils s’exécutent régulièrement sans intervention manuelle ?

Le problème

Sans orchestration :
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   😰 "Il faut que je lance mon script tous les jours..."   │
│   😰 "J'ai oublié de lancer le pipeline hier !"            │
│   😰 "Le script B a planté car A n'avait pas fini..."      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Avec orchestration :
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   ✅ Scripts exécutés automatiquement                       │
│   ✅ Alertes en cas d'échec                                 │
│   ✅ Dépendances respectées (A → B → C)                     │
│   ✅ Logs et monitoring centralisés                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Position dans le pipeline Data

┌─────────────────────────────────────────────────────────────────┐
│                     PIPELINE DATA                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Sources        ETL              Destination                   │
│   ────────       ───              ───────────                   │
│                                                                 │
│   APIs     ─┐                 ┌─►  Data Warehouse               │
│   Fichiers ─┼──►  PySpark  ───┼─►  Data Lake                    │
│   BDD      ─┘                 └─►  Dashboard                    │
│                                                                 │
│            ▲                                                    │
│            │                                                    │
│   ┌────────┴────────┐                                           │
│   │  ORCHESTRATION  │  ◄── Quand ? Dans quel ordre ?            │
│   │  (Airflow/Cron) │                                           │
│   └─────────────────┘                                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

📊 Comparaison rapide des outils

Critère Windows Task Crontab Airflow
Facilité ⭐⭐⭐ ⭐⭐
Interface GUI CLI Web
Dépendances
Monitoring Basique Non Complet
Retry auto
Idéal pour 1-5 scripts 5-15 scripts 10+ pipelines

Niveau 1 : Planificateur Windows

C’est quoi ?

Un outil intégré à Windows pour exécuter des programmes automatiquement.

Comment l’utiliser ?

Ouvrir le planificateur :

Windows + R → Taper 'taskschd.msc' → Entrée

Créer une tâche :

  1. Action → Créer une tâche de base
  2. Nom : “Mon script quotidien”
  3. Déclencheur : Quotidien à 2h du matin
  4. Action : Démarrer python.exe avec C:\scripts\mon_script.py
  5. Terminer

Voilà ! Votre script s’exécutera automatiquement tous les jours à 2h.

✅ Forces

Très facile - Interface graphique intuitive
Déjà installé - Pas de setup
Parfait pour débuter - Pas de code complexe

❌ Faiblesses

Pas de dépendances - Si tâche A doit finir avant tâche B → compliqué
Monitoring limité - Difficile de voir l’état global
Windows uniquement - Ne fonctionne pas sur Linux

Quand l’utiliser ?

OUI : Vous avez 1-5 scripts simples sur Windows
NON : Vous avez besoin que script B attende script A


Niveau 2 : Crontab (Linux/Mac)

C’est quoi ?

Le planificateur standard sur Linux/Mac.

Syntaxe de base

minute heure jour mois jour_semaine commande
┌───────────── minute (0 - 59)
│ ┌─────────── heure (0 - 23)
│ │ ┌───────── jour du mois (1 - 31)
│ │ │ ┌─────── mois (1 - 12)
│ │ │ │ ┌───── jour de la semaine (0 - 6) (dimanche = 0)
│ │ │ │ │
* * * * * commande

Exemples simples :

# Tous les jours à 2h du matin
0 2 * * * python3 /home/user/script.py

# Toutes les heures
0 * * * * python3 /home/user/hourly.py

# Lundi à vendredi à 9h
0 9 * * 1-5 python3 /home/user/weekday.py

# Toutes les 15 minutes
*/15 * * * * python3 /home/user/check.py

# Le 1er de chaque mois à minuit
0 0 1 * * python3 /home/user/monthly.py

Comment l’utiliser ?

# Éditer votre crontab
crontab -e

# Voir les tâches planifiées
crontab -l

# Ajouter vos lignes
0 2 * * * python3 /home/user/backup.py >> /var/log/backup.log 2>&1

# Sauvegarder et quitter
# ✅ C'est fait !

💡 Astuce : Utilise crontab.guru pour tester tes expressions !

Forces

Universel - Sur TOUS les serveurs Linux
Très léger - Presque pas de ressources
Simple - Une ligne = une tâche
Gratuit - Déjà installé

Faiblesses

Pas de dépendances - Même problème que Windows
Pas de monitoring - Aucune interface
Pas de retry - Si ça échoue, il faut attendre le prochain run
Logs manuels - Il faut les gérer soi-même

Quand l’utiliser ?

OUI : Serveur Linux, 5-15 scripts indépendants
NON : Scripts avec dépendances complexes


Niveau 3 : Apache Airflow

C’est quoi ?

Un orchestrateur professionnel pour gérer des workflows complexes, créé par Airbnb en 2014 et devenu projet Apache en 2016.

┌─────────────────────────────────────────────────────────────────┐
│                    APACHE AIRFLOW                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   "Airflow is a platform to programmatically author,           │
│    schedule, and monitor workflows."                            │
│                                                                 │
│   • Créé par Airbnb (2014)                                      │
│   • Apache Top-Level Project (2019)                             │
│   • 30,000+ GitHub stars                                        │
│   • Utilisé par : Airbnb, Netflix, Spotify, Twitter, Adobe...   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

La grande différence avec Cron :

# ❌ Avec Cron (problème) :
0 2 * * * python extract.py
30 2 * * * python transform.py  # Et si extract prend plus de 30min ?
0 3 * * * python load.py        # Et si transform a planté ?

# ✅ Avec Airflow (solution) :
extract >> transform >> load  # transform ATTEND que extract soit terminé !

Architecture d’Airflow

┌─────────────────────────────────────────────────────────────────────────────┐
│                        ARCHITECTURE AIRFLOW                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────┐         ┌─────────────────┐         ┌─────────────┐  │
│   │                 │         │                 │         │             │  │
│   │   WEB SERVER    │◄───────►│    SCHEDULER    │◄───────►│  EXECUTOR   │  │
│   │   (Flask UI)    │         │  (Orchestrate)  │         │  (Workers)  │  │
│   │                 │         │                 │         │             │  │
│   └────────┬────────┘         └────────┬────────┘         └──────┬──────┘  │
│            │                           │                         │         │
│            │                           ▼                         │         │
│            │                  ┌─────────────────┐                │         │
│            │                  │                 │                │         │
│            └─────────────────►│    METADATA     │◄───────────────┘         │
│                               │    DATABASE     │                          │
│                               │  (PostgreSQL)   │                          │
│                               │                 │                          │
│                               └─────────────────┘                          │
│                                        ▲                                   │
│                                        │                                   │
│                               ┌────────┴────────┐                          │
│                               │                 │                          │
│                               │    DAG FILES    │                          │
│                               │   (Python .py)  │                          │
│                               │                 │                          │
│                               └─────────────────┘                          │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Composants clés

Composant Rôle Description
Web Server Interface UI Dashboard Flask pour visualiser les DAGs
Scheduler Planification Décide quand exécuter les tâches
Executor Exécution Lance les tâches (Local, Celery, K8s…)
Metadata DB Stockage État des DAGs, logs, historique
DAG Files Définition Fichiers Python définissant les workflows

Types d’Executors

Executor Usage Scalabilité
SequentialExecutor Dev/Test 1 tâche à la fois
LocalExecutor Petite prod Parallèle sur 1 machine
CeleryExecutor Production Workers distribués
KubernetesExecutor Cloud Pod par tâche

Concepts clés d’Airflow

1️⃣ DAG (Directed Acyclic Graph)

Un DAG est un graphe de tâches sans cycle : les données vont toujours dans une direction.

     ┌─────────┐
     │ Extract │
     └────┬────┘
          │
          ▼
     ┌─────────┐
     │Transform│
     └────┬────┘
          │
    ┌─────┴─────┐
    ▼           ▼
┌───────┐  ┌────────┐
│Load DW│  │Load S3 │
└───────┘  └────────┘

2️⃣ Task

Une Task est une unité de travail dans un DAG (une étape).

3️⃣ Operator

Un Operator définit ce que fait une tâche.

Operator Usage
PythonOperator Exécuter une fonction Python
BashOperator Exécuter une commande bash
EmailOperator Envoyer un email
PostgresOperator Exécuter du SQL
S3ToRedshiftOperator Copier S3 → Redshift

4️⃣ Task Instance

Une Task Instance = une exécution spécifique d’une Task à une date donnée.

5️⃣ DAG Run

Un DAG Run = une exécution complète du DAG.

DAG: etl_pipeline
├── DAG Run 2024-01-15 ✅
│   ├── extract (Task Instance) ✅
│   ├── transform (Task Instance) ✅
│   └── load (Task Instance) ✅
│
├── DAG Run 2024-01-16 ⏳
│   ├── extract (Task Instance) ✅
│   ├── transform (Task Instance) ⏳ running
│   └── load (Task Instance) ⏸️ waiting

Installation locale d’Airflow

# Créer un environnement virtuel
python -m venv airflow_venv
source airflow_venv/bin/activate  # Linux/Mac
# ou : airflow_venv\Scripts\activate  # Windows

# Définir le home Airflow
export AIRFLOW_HOME=~/airflow

# Installer Airflow (version contrainte pour compatibilité)
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Initialiser la base de données
airflow db init

# Créer un utilisateur admin
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

# Lancer le webserver (Terminal 1)
airflow webserver --port 8080

# Lancer le scheduler (Terminal 2)
airflow scheduler

👉 Accéder : http://localhost:8080 (login: admin / admin)


Ton premier DAG

Créer le fichier ~/airflow/dags/mon_premier_dag.py :

Voir le code
# ~/airflow/dags/mon_premier_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# Arguments par défaut pour toutes les tâches
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'email': ['data-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# Définir le DAG
dag = DAG(
    dag_id='mon_premier_dag',
    default_args=default_args,
    description='Mon premier pipeline ETL',
    schedule_interval='@daily',  # Exécution quotidienne
    start_date=datetime(2024, 1, 1),
    catchup=False,  # Ne pas exécuter les runs passés
    tags=['etl', 'tutorial'],
)

# Fonctions Python
def extract():
    """Extraire les données"""
    print("📥 Extraction des données depuis l'API...")
    # Simuler extraction
    data = {'records': 1000, 'source': 'api'}
    return data  # Retourné via XCom

def transform(**context):
    """Transformer les données"""
    # Récupérer les données de extract via XCom
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract')
    print(f"🔄 Transformation de {data['records']} enregistrements")
    return {'records': data['records'], 'cleaned': True}

def load(**context):
    """Charger les données"""
    ti = context['ti']
    data = ti.xcom_pull(task_ids='transform')
    print(f"💾 Chargement de {data['records']} enregistrements nettoyés")

# Créer les tâches
task_start = BashOperator(
    task_id='start',
    bash_command='echo "🚀 Démarrage du pipeline à $(date)"',
    dag=dag,
)

task_extract = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

task_transform = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

task_load = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag,
)

task_end = BashOperator(
    task_id='end',
    bash_command='echo "✅ Pipeline terminé avec succès !"',
    dag=dag,
)

# Définir les dépendances
task_start >> task_extract >> task_transform >> task_load >> task_end

# Équivalent à :
# task_start.set_downstream(task_extract)
# task_extract.set_downstream(task_transform)
# etc.

Paramètres importants du DAG

Schedule Interval (fréquence d’exécution)

Preset Équivalent Cron Description
@once - Une seule fois
@hourly 0 * * * * Chaque heure
@daily 0 0 * * * Chaque jour à minuit
@weekly 0 0 * * 0 Chaque dimanche
@monthly 0 0 1 * * Le 1er du mois
@yearly 0 0 1 1 * Le 1er janvier
None - Déclenché manuellement
'0 6 * * 1-5' Cron Lun-Ven à 6h

Catchup

# catchup=True (défaut) :
# Si start_date=2024-01-01 et on est le 2024-01-10,
# Airflow va exécuter les 10 DAG Runs manqués !

# catchup=False :
# Exécute seulement à partir de maintenant

Default Args importants

default_args = {
    'owner': 'data_team',           # Propriétaire
    'depends_on_past': False,       # Dépend du run précédent ?
    'email_on_failure': True,       # Email si échec
    'retries': 3,                   # Nombre de retry
    'retry_delay': timedelta(minutes=5),  # Délai entre retries
    'execution_timeout': timedelta(hours=1),  # Timeout
    'sla': timedelta(hours=2),      # SLA (alerte si dépassé)
}

Les Operators les plus utilisés

PythonOperator

from airflow.operators.python import PythonOperator

def my_function(name, **context):
    print(f"Hello {name}!")
    print(f"Execution date: {context['ds']}")
    return "success"

task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'name': 'World'},  # Arguments de la fonction
)

BashOperator

from airflow.operators.bash import BashOperator

task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Date: {{ ds }}" && python /scripts/etl.py',
)

PostgresOperator

from airflow.providers.postgres.operators.postgres import PostgresOperator

task = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='my_postgres',  # Connexion définie dans UI
    sql="""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100)
        );
    """,
)

EmailOperator

from airflow.operators.email import EmailOperator

task = EmailOperator(
    task_id='send_report',
    to='team@company.com',
    subject='Pipeline Report - {{ ds }}',
    html_content='<h1>Pipeline completed!</h1>',
)

EmptyOperator (anciennement DummyOperator)

from airflow.operators.empty import EmptyOperator

# Utile pour créer des points de jonction
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')

XCom — Passer des données entre tâches

XCom (Cross-Communication) permet de partager des données entre tâches.

┌──────────┐         XCom          ┌───────────┐
│  Task A  │ ─────── data ───────► │   Task B  │
│  return  │                       │ xcom_pull │
└──────────┘                       └───────────┘

Méthode 1 : Return (automatique)

Voir le code
# XCom avec return (automatique)

def extract():
    data = {'records': 1000, 'status': 'ok'}
    return data  # Automatiquement stocké dans XCom

def transform(**context):
    # Récupérer via ti (task instance)
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract')
    print(f"Reçu: {data}")
    return {'records': data['records'], 'transformed': True}

Méthode 2 : Push/Pull explicite

def task_a(**context):
    # Push explicite avec une clé
    context['ti'].xcom_push(key='my_data', value={'count': 42})
    context['ti'].xcom_push(key='status', value='success')

def task_b(**context):
    # Pull avec la clé
    data = context['ti'].xcom_pull(task_ids='task_a', key='my_data')
    status = context['ti'].xcom_pull(task_ids='task_a', key='status')

⚠️ Limites de XCom

Limite Description
Taille Max ~48KB par défaut (stocké en DB)
Sérialisation Doit être JSON-sérialisable
Pas pour big data Utiliser S3/GCS pour gros fichiers
# ❌ Mauvaise pratique
def extract():
    df = pd.read_csv('big_file.csv')  # 1GB
    return df.to_dict()  # ❌ Trop gros pour XCom !

# ✅ Bonne pratique
def extract():
    df = pd.read_csv('big_file.csv')
    path = '/data/output/extract_2024-01-15.parquet'
    df.to_parquet(path)
    return path  # ✅ Passer le chemin, pas les données

TaskFlow API (Airflow 2.0+)

Syntaxe moderne et plus Pythonic avec des décorateurs :

Voir le code
# TaskFlow API - Syntaxe moderne (Airflow 2.0+)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='etl_taskflow',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'taskflow'],
)
def etl_pipeline():
    
    @task()
    def extract():
        """Extraire les données"""
        return {'records': 1000, 'source': 'api'}
    
    @task()
    def transform(data: dict):
        """Transformer les données"""
        return {
            'records': data['records'],
            'cleaned': True
        }
    
    @task()
    def load(data: dict):
        """Charger les données"""
        print(f"Loading {data['records']} records")
    
    # Définir le flow - XCom automatique !
    raw_data = extract()
    clean_data = transform(raw_data)
    load(clean_data)

# Instancier le DAG
etl_pipeline()

Avantages TaskFlow API

Avantage Description
XCom automatique Les retours sont passés automatiquement
Code plus lisible Ressemble à du Python normal
Type hints Support des annotations de type
Moins de boilerplate Pas besoin de créer des Operators

Sensors — Attendre une condition

Les Sensors attendent qu’une condition soit remplie avant de continuer.

┌──────────────────────────────────────────────────────────────┐
│                                                              │
│   FileSensor          S3KeySensor         HttpSensor         │
│   "Fichier existe?"   "Fichier sur S3?"   "API disponible?"  │
│                                                              │
│        ⏳                   ⏳                   ⏳            │
│        │                    │                    │           │
│        ▼                    ▼                    ▼           │
│       ✅                   ✅                   ✅           │
│   Continuer            Continuer            Continuer        │
│                                                              │
└──────────────────────────────────────────────────────────────┘

FileSensor

Voir le code
# FileSensor - Attendre qu'un fichier existe

from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/input/daily_export.csv',
    poke_interval=60,      # Vérifier toutes les 60 secondes
    timeout=3600,          # Timeout après 1 heure
    mode='poke',           # poke ou reschedule
)

# Le pipeline attend le fichier avant de continuer
wait_for_file >> process_file

Autres Sensors utiles

# S3KeySensor - Attendre un fichier sur S3
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_s3 = S3KeySensor(
    task_id='wait_for_s3',
    bucket_name='my-bucket',
    bucket_key='data/{{ ds }}/export.csv',
    aws_conn_id='aws_default',
)

# HttpSensor - Attendre qu'une API réponde
from airflow.providers.http.sensors.http import HttpSensor

wait_api = HttpSensor(
    task_id='wait_for_api',
    http_conn_id='api_connection',
    endpoint='health',
    response_check=lambda response: response.status_code == 200,
)

# ExternalTaskSensor - Attendre un autre DAG
from airflow.sensors.external_task import ExternalTaskSensor

wait_other_dag = ExternalTaskSensor(
    task_id='wait_upstream',
    external_dag_id='upstream_dag',
    external_task_id='final_task',
)

Mode poke vs reschedule

Mode Description Ressources
poke Garde un worker slot Consomme plus
reschedule Libère le slot entre checks Recommandé

Branching — Logique conditionnelle

Exécuter différentes tâches selon une condition.

                    ┌──────────┐
                    │  Check   │
                    │ Condition│
                    └────┬─────┘
                         │
              ┌──────────┼──────────┐
              │          │          │
              ▼          ▼          ▼
         ┌────────┐ ┌────────┐ ┌────────┐
         │ Path A │ │ Path B │ │ Path C │
         └────────┘ └────────┘ └────────┘
Voir le code
# Branching - Exécution conditionnelle

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_branch(**context):
    """Décider quelle branche exécuter"""
    # Exemple : vérifier le jour de la semaine
    day = context['ds_nodash']  # Format YYYYMMDD
    
    # Logique métier
    if int(day) % 2 == 0:
        return 'process_even'  # Retourner le task_id à exécuter
    else:
        return 'process_odd'

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

process_even = EmptyOperator(task_id='process_even')
process_odd = EmptyOperator(task_id='process_odd')
end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success')

# Définir le flow
branch >> [process_even, process_odd] >> end

Trigger Rules

Contrôler quand une tâche s’exécute en fonction du statut des tâches parentes.

Trigger Rule Exécute si…
all_success Tous les parents ont réussi (défaut)
all_failed Tous les parents ont échoué
all_done Tous les parents sont terminés (peu importe le statut)
one_success Au moins un parent a réussi
one_failed Au moins un parent a échoué
none_failed Aucun parent n’a échoué (succès ou skipped)
none_skipped Aucun parent n’a été skipped
from airflow.utils.trigger_rule import TriggerRule

# Tâche de notification en cas d'échec
notify_failure = EmailOperator(
    task_id='notify_failure',
    to='team@company.com',
    subject='Pipeline Failed!',
    html_content='...',
    trigger_rule=TriggerRule.ONE_FAILED,  # Exécute si un parent échoue
)

# Tâche finale qui s'exécute toujours
cleanup = BashOperator(
    task_id='cleanup',
    bash_command='rm -rf /tmp/data/*',
    trigger_rule=TriggerRule.ALL_DONE,  # Toujours exécuter
)

Connections et Variables

Connections

Stocker les informations de connexion aux systèmes externes.

Dans l’UI : Admin → Connections → +

┌─────────────────────────────────────────────────────────────┐
│  Add Connection                                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Connection Id:   │ my_postgres                 │           │
│  Connection Type: │ Postgres           ▼        │           │
│  Host:            │ localhost                   │           │
│  Schema:          │ mydb                        │           │
│  Login:           │ admin                       │           │
│  Password:        │ ••••••••                    │           │
│  Port:            │ 5432                        │           │
│                                                             │
│                              [ Save ]                       │
└─────────────────────────────────────────────────────────────┘

Utilisation dans le code :

from airflow.hooks.postgres_hook import PostgresHook

def query_postgres():
    hook = PostgresHook(postgres_conn_id='my_postgres')
    df = hook.get_pandas_df('SELECT * FROM users')
    return df

Variables

Stocker des configurations réutilisables.

Dans l’UI : Admin → Variables → +

from airflow.models import Variable

# Récupérer une variable
api_key = Variable.get('API_KEY')

# Variable JSON
config = Variable.get('pipeline_config', deserialize_json=True)
# config = {'batch_size': 1000, 'env': 'prod'}

# Dans un template Jinja
# {{ var.value.API_KEY }}
# {{ var.json.pipeline_config.batch_size }}

Monitoring et Alertes

Interface Web

┌─────────────────────────────────────────────────────────────────────────────┐
│  Airflow - DAGs                                                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  DAG                    Schedule    Owner    Runs   Recent Tasks            │
│  ─────────────────────  ─────────   ─────    ────   ────────────            │
│  ▶ etl_pipeline         @daily      team     125    ✅✅✅✅✅                │
│  ▶ data_quality_check   @hourly     team     560    ✅✅✅❌✅                │
│  ▶ weekly_report        @weekly     team     52     ✅✅✅✅✅                │
│  ⏸ maintenance          None        admin    3      ✅✅✅                   │
│                                                                             │
│  ✅ Success  ❌ Failed  ⏳ Running  ⏸ Paused                                 │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Vues disponibles

Vue Description
Grid Vue matricielle des runs
Graph Graphe du DAG
Calendar Historique par date
Gantt Timeline d’exécution
Code Code source du DAG

Configurer les alertes email

# airflow.cfg
[smtp]
smtp_host = smtp.gmail.com
smtp_port = 587
smtp_user = airflow@company.com
smtp_password = your_password
smtp_mail_from = airflow@company.com

# Dans le DAG
default_args = {
    'email': ['team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

Alertes Slack

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def alert_slack_on_failure(context):
    """Callback en cas d'échec"""
    slack_msg = f"""
        :red_circle: Task Failed!
        *DAG*: {context['dag'].dag_id}
        *Task*: {context['task'].task_id}
        *Execution Time*: {context['execution_date']}
    """
    return SlackWebhookOperator(
        task_id='slack_alert',
        slack_webhook_conn_id='slack_webhook',
        message=slack_msg,
    ).execute(context)

# Utiliser le callback
default_args = {
    'on_failure_callback': alert_slack_on_failure,
}

Bonnes pratiques Airflow

1. Structure des DAGs

airflow/
├── dags/
│   ├── __init__.py
│   ├── etl/
│   │   ├── __init__.py
│   │   ├── daily_etl.py
│   │   └── weekly_report.py
│   ├── utils/
│   │   ├── __init__.py
│   │   └── helpers.py
│   └── config/
│       └── settings.py
├── plugins/
│   └── custom_operators/
└── tests/
    └── test_dags.py

2. Idempotence

# ❌ Non idempotent - accumule des données
def bad_load():
    db.execute("INSERT INTO table VALUES (...)")

# ✅ Idempotent - même résultat si relancé
def good_load():
    db.execute("DELETE FROM table WHERE date = '{{ ds }}'")
    db.execute("INSERT INTO table SELECT ... WHERE date = '{{ ds }}'")

3. Atomicité des tâches

# ❌ Tâche trop grosse
def do_everything():
    extract()
    transform()
    load()

# ✅ Tâches atomiques
extract >> transform >> load

4. Ne pas mettre de logique dans le DAG

# ❌ Code exécuté à chaque parsing
data = fetch_from_api()  # Appelé toutes les 30s !

# ✅ Logique dans les tasks
@task
def fetch_data():
    return fetch_from_api()

5. Tester les DAGs

# tests/test_dags.py
import pytest
from airflow.models import DagBag

def test_dag_loaded():
    dag_bag = DagBag()
    assert len(dag_bag.import_errors) == 0

def test_dag_structure():
    dag_bag = DagBag()
    dag = dag_bag.get_dag('etl_pipeline')
    assert dag is not None
    assert len(dag.tasks) == 5

✅ Forces d’Airflow

Gestion des dépendances - A >> B = B attend A
Retry automatique - Réessaie en cas d’échec
Interface web - Visualisation complète
Monitoring - Logs centralisés
Alertes - Email/Slack en cas d’échec
Scalable - Gère 100+ pipelines
Extensible - Custom operators, hooks
Communauté - Très active, beaucoup de providers

❌ Faiblesses d’Airflow

Complexe - Courbe d’apprentissage
Ressources - Besoin de 4-8 GB RAM
Overkill - Pour 1-3 scripts simples
Setup - Installation et configuration
Pas pour le streaming - Batch only (utiliser Kafka)

Quand utiliser Airflow ?

OUI : 10+ pipelines, dépendances complexes, production
NON : 1-5 scripts simples sans dépendances


Tableau de décision

Quel outil choisir ?

Situation Outil recommandé
J’ai 1-3 scripts sur Windows 🪟 Task Scheduler
J’ai 1-3 scripts sur Linux 🐧 Crontab
J’ai 5-10 scripts indépendants 🐧 Crontab
J’ai 10+ scripts avec dépendances 🌬️ Airflow
Script B doit attendre script A 🌬️ Airflow
Je veux un dashboard 🌬️ Airflow
Je débute en automatisation 🪟 Task Scheduler
Production critique 🌬️ Airflow

Progression naturelle

1. Débutez avec Task Scheduler ou Cron
2. Quand vous avez 5+ scripts → Pensez à migrer
3. Quand vous avez des dépendances → Migrez vers Airflow

Résumé

Points clés

Windows Task Scheduler

  • Pour qui : Débutants sur Windows
  • Force : Très facile (GUI)
  • Faiblesse : Pas de dépendances
  • Limite : 5 scripts max

Crontab

  • Pour qui : Utilisateurs Linux
  • Force : Universel, léger
  • Faiblesse : Pas de monitoring
  • Limite : 15 scripts max

Airflow

  • Pour qui : Production, équipes data
  • Force : Dépendances, monitoring, scalable
  • Faiblesse : Complexe, ressources
  • Limite : Aucune (scalable)

Concepts Airflow à retenir

Concept Description
DAG Graphe de tâches (workflow)
Task Unité de travail
Operator Type de tâche (Python, Bash, SQL…)
XCom Passage de données entre tâches
Sensor Attendre une condition
Connection Credentials stockés
Variable Configuration stockée

Ressources


Quiz Final

❓ Q1. Vous avez 2 scripts Python à exécuter tous les jours sur Windows. Quel outil ?

  1. Airflow
  2. Crontab
  3. Windows Task Scheduler
  4. Kubernetes
💡 Voir la réponse Réponse : c – Pour 2 scripts simples sur Windows, utilisez le Planificateur de tâches : facile, natif, gratuit.

❓ Q2. Expression crontab pour “tous les lundis à 9h” ?

  1. 0 9 * * 1
  2. 9 0 * * 1
  3. * 9 * * 1
  4. 0 9 1 * *
💡 Voir la réponse Réponse : a – Format : minute heure jour mois jour_semaine. 0 9 * * 1 = 0 minutes, 9h, lundi (1).

❓ Q3. Que signifie DAG dans Airflow ?

  1. Data Analysis Graph
  2. Directed Acyclic Graph
  3. Dynamic Airflow Generator
  4. Database Access Gateway
💡 Voir la réponse Réponse : bDirected Acyclic Graph = graphe orienté sans cycle. Les tâches vont toujours dans une direction.

❓ Q4. Dans Airflow, que signifie task_a >> task_b ?

  1. task_a et task_b en parallèle
  2. task_b attend que task_a finisse
  3. task_a attend task_b
  4. Erreur de syntaxe
💡 Voir la réponse Réponse : bA >> B signifie “B attend que A soit terminé”. C’est la gestion des dépendances !

❓ Q5. Quel composant Airflow décide quand exécuter les tâches ?

  1. Web Server
  2. Executor
  3. Scheduler
  4. Metadata DB
💡 Voir la réponse Réponse : c – Le Scheduler analyse les DAGs et décide quand lancer les tâches.

❓ Q6. Comment passer des données entre tâches Airflow ?

  1. Variables globales
  2. XCom
  3. Fichiers partagés uniquement
  4. Pas possible
💡 Voir la réponse Réponse : bXCom (Cross-Communication) permet de passer des données entre tâches via xcom_push et xcom_pull.

❓ Q7. Qu’est-ce qu’un Sensor dans Airflow ?

  1. Un outil de monitoring
  2. Un Operator qui attend une condition
  3. Un type de DAG
  4. Un système d’alerte
💡 Voir la réponse Réponse : b – Un Sensor est un Operator spécial qui attend qu’une condition soit remplie (fichier existe, API disponible…).

❓ Q8. Quand NE PAS utiliser Airflow ?

  1. 10+ pipelines complexes
  2. 1-3 scripts simples
  3. Production critique
  4. Besoin de monitoring
💡 Voir la réponse Réponse : b – Pour 1-3 scripts simples, Airflow est overkill. Utilisez Cron ou Task Scheduler à la place.

❓ Q9. Quel est l’avantage de la TaskFlow API ?

  1. Plus performant
  2. XCom automatique et code plus lisible
  3. Compatible Python 2
  4. Pas besoin de Scheduler
💡 Voir la réponse Réponse : b – TaskFlow API (décorateurs @dag, @task) rend le code plus Pythonic avec XCom automatique.

❓ Q10. Vous avez 15 scripts avec des dépendances. Quel outil ?

  1. Windows Task Scheduler
  2. Crontab
  3. Apache Airflow
  4. Aucun, faire manuellement
💡 Voir la réponse Réponse : c – Avec 15 scripts et des dépendances, il est temps de passer à Airflow !

📊 Votre score

  • 10/10 : 🏆 Expert orchestration !
  • 8-9/10 : 🌟 Très bien !
  • 6-7/10 : 💪 Bon début !
  • < 6/10 : 📚 Relisez le notebook

📚 Ressources

Outils

Alternatives à Airflow

Outil Description Cas d’usage
Prefect Orchestration moderne, Pythonic Alternative plus simple à Airflow
Dagster Data orchestration avec types Pipelines ML
Luigi Par Spotify, simple Pipelines batch
Mage Low-code, moderne Prototypage rapide
Kestra Event-driven, YAML Workflows déclaratifs

Cloud managed

Cloud Service
AWS MWAA (Managed Airflow), Step Functions
GCP Cloud Composer (Managed Airflow)
Azure Data Factory, Synapse Pipelines

🎉 Fin du niveau débutant !

Tu as terminé le parcours Data Engineering - From Zero to Hero niveau débutant ! 🎉

Récapitulatif des modules

# Module Compétence acquise
01 Introduction Vision du métier
02 Bash Ligne de commande
03 Git Versioning
04 Python Basics Programmation
05 Python Data Processing Pandas, visualisation
06 Intro Bases Relationnelles Concepts relationnels
07 SQL Requêtes SQL
08 Big Data & NoSQL Systèmes distribués
09 MongoDB Base NoSQL document
10 Elasticsearch Recherche et indexation
11 PySpark Traitement distribué
12 Orchestration Airflow, pipelines

Module BONUS disponible

# Module Description
13 BONUS FastAPI Créer des APIs REST pour exposer tes données

👉 Parfait pour exposer les résultats de tes pipelines via API !


➡️ Prochaine étape : Niveau Intermédiaire

Tu es maintenant prêt pour le niveau intermédiaire qui couvrira :

Module Description
Docker Conteneurisation des pipelines
Data Lakes Parquet, Delta Lake, Iceberg
Kafka Streaming en temps réel
dbt Transformation dans le warehouse
Data Quality Great Expectations, tests
Cloud AWS / GCP / Azure
CI/CD GitHub Actions, tests automatisés
Kibana Dashboards et monitoring
Projet intégrateur Pipeline complet end-to-end

🚀 Bravo ! Tu as maintenant toutes les bases pour construire des pipelines de données !

Retour au sommet