Ce module présente les outils d’orchestration pour automatiser l’exécution de vos pipelines de données.
Prérequis
Niveau
Compétence
✅ Requis
Avoir suivi le module 11_pyspark_for_data_engineering
✅ Requis
Comprendre les pipelines ETL
✅ Requis
Maîtriser Python (modules 04-05)
✅ Requis
Connaître les bases de Linux (ligne de commande)
Objectifs du module
À la fin de ce notebook, tu seras capable de :
Comprendre ce qu’est l’orchestration de pipelines
Utiliser le Planificateur Windows (niveau débutant)
Configurer des tâches avec Crontab (niveau intermédiaire)
Comprendre l’architecture d’Apache Airflow
Créer des DAGs avec Apache Airflow
Utiliser les différents types d’Operators
Gérer les dépendances et le passage de données (XCom)
Configurer les alertes et le monitoring
Choisir le bon outil selon ton besoin
🎯 L’orchestration dans l’écosystème Data Engineering
Tu as appris à créer des pipelines ETL avec PySpark. Mais comment les automatiser pour qu’ils s’exécutent régulièrement sans intervention manuelle ?
Le problème
Sans orchestration :
┌─────────────────────────────────────────────────────────────┐
│ │
│ 😰 "Il faut que je lance mon script tous les jours..." │
│ 😰 "J'ai oublié de lancer le pipeline hier !" │
│ 😰 "Le script B a planté car A n'avait pas fini..." │
│ │
└─────────────────────────────────────────────────────────────┘
Avec orchestration :
┌─────────────────────────────────────────────────────────────┐
│ │
│ ✅ Scripts exécutés automatiquement │
│ ✅ Alertes en cas d'échec │
│ ✅ Dépendances respectées (A → B → C) │
│ ✅ Logs et monitoring centralisés │
│ │
└─────────────────────────────────────────────────────────────┘
Un outil intégré à Windows pour exécuter des programmes automatiquement.
Comment l’utiliser ?
Ouvrir le planificateur :
Windows + R → Taper 'taskschd.msc' → Entrée
Créer une tâche :
Action → Créer une tâche de base
Nom : “Mon script quotidien”
Déclencheur : Quotidien à 2h du matin
Action : Démarrer python.exe avec C:\scripts\mon_script.py
Terminer
✅ Voilà ! Votre script s’exécutera automatiquement tous les jours à 2h.
✅ Forces
✅ Très facile - Interface graphique intuitive
✅ Déjà installé - Pas de setup
✅ Parfait pour débuter - Pas de code complexe
❌ Faiblesses
❌ Pas de dépendances - Si tâche A doit finir avant tâche B → compliqué
❌ Monitoring limité - Difficile de voir l’état global
❌ Windows uniquement - Ne fonctionne pas sur Linux
Quand l’utiliser ?
✅ OUI : Vous avez 1-5 scripts simples sur Windows
❌ NON : Vous avez besoin que script B attende script A
Niveau 2 : Crontab (Linux/Mac)
C’est quoi ?
Le planificateur standard sur Linux/Mac.
Syntaxe de base
minute heure jour mois jour_semaine commande
┌───────────── minute (0 - 59)
│ ┌─────────── heure (0 - 23)
│ │ ┌───────── jour du mois (1 - 31)
│ │ │ ┌─────── mois (1 - 12)
│ │ │ │ ┌───── jour de la semaine (0 - 6) (dimanche = 0)
│ │ │ │ │
* * * * * commande
Exemples simples :
# Tous les jours à 2h du matin0 2 *** python3 /home/user/script.py# Toutes les heures0**** python3 /home/user/hourly.py# Lundi à vendredi à 9h0 9 ** 1-5 python3 /home/user/weekday.py# Toutes les 15 minutes*/15**** python3 /home/user/check.py# Le 1er de chaque mois à minuit0 0 1 ** python3 /home/user/monthly.py
Comment l’utiliser ?
# Éditer votre crontabcrontab-e# Voir les tâches planifiéescrontab-l# Ajouter vos lignes0 2 *** python3 /home/user/backup.py >> /var/log/backup.log 2>&1# Sauvegarder et quitter# ✅ C'est fait !
💡 Astuce : Utilise crontab.guru pour tester tes expressions !
Forces
Universel - Sur TOUS les serveurs Linux Très léger - Presque pas de ressources Simple - Une ligne = une tâche Gratuit - Déjà installé
Faiblesses
Pas de dépendances - Même problème que Windows Pas de monitoring - Aucune interface Pas de retry - Si ça échoue, il faut attendre le prochain run Logs manuels - Il faut les gérer soi-même
Quand l’utiliser ?
OUI : Serveur Linux, 5-15 scripts indépendants NON : Scripts avec dépendances complexes
Niveau 3 : Apache Airflow
C’est quoi ?
Un orchestrateur professionnel pour gérer des workflows complexes, créé par Airbnb en 2014 et devenu projet Apache en 2016.
┌─────────────────────────────────────────────────────────────────┐
│ APACHE AIRFLOW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ "Airflow is a platform to programmatically author, │
│ schedule, and monitor workflows." │
│ │
│ • Créé par Airbnb (2014) │
│ • Apache Top-Level Project (2019) │
│ • 30,000+ GitHub stars │
│ • Utilisé par : Airbnb, Netflix, Spotify, Twitter, Adobe... │
│ │
└─────────────────────────────────────────────────────────────────┘
La grande différence avec Cron :
# ❌ Avec Cron (problème) :02*** python extract.py302*** python transform.py # Et si extract prend plus de 30min ?03*** python load.py # Et si transform a planté ?# ✅ Avec Airflow (solution) :extract >> transform >> load # transform ATTEND que extract soit terminé !
Créer le fichier ~/airflow/dags/mon_premier_dag.py :
Voir le code
# ~/airflow/dags/mon_premier_dag.pyfrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# Arguments par défaut pour toutes les tâchesdefault_args = {'owner': 'data_engineer','depends_on_past': False,'email': ['data-team@company.com'],'email_on_failure': True,'email_on_retry': False,'retries': 3,'retry_delay': timedelta(minutes=5),}# Définir le DAGdag = DAG( dag_id='mon_premier_dag', default_args=default_args, description='Mon premier pipeline ETL', schedule_interval='@daily', # Exécution quotidienne start_date=datetime(2024, 1, 1), catchup=False, # Ne pas exécuter les runs passés tags=['etl', 'tutorial'],)# Fonctions Pythondef extract():"""Extraire les données"""print("📥 Extraction des données depuis l'API...")# Simuler extraction data = {'records': 1000, 'source': 'api'}return data # Retourné via XComdef transform(**context):"""Transformer les données"""# Récupérer les données de extract via XCom ti = context['ti'] data = ti.xcom_pull(task_ids='extract')print(f"🔄 Transformation de {data['records']} enregistrements")return {'records': data['records'], 'cleaned': True}def load(**context):"""Charger les données""" ti = context['ti'] data = ti.xcom_pull(task_ids='transform')print(f"💾 Chargement de {data['records']} enregistrements nettoyés")# Créer les tâchestask_start = BashOperator( task_id='start', bash_command='echo "🚀 Démarrage du pipeline à $(date)"', dag=dag,)task_extract = PythonOperator( task_id='extract', python_callable=extract, dag=dag,)task_transform = PythonOperator( task_id='transform', python_callable=transform, dag=dag,)task_load = PythonOperator( task_id='load', python_callable=load, dag=dag,)task_end = BashOperator( task_id='end', bash_command='echo "✅ Pipeline terminé avec succès !"', dag=dag,)# Définir les dépendancestask_start >> task_extract >> task_transform >> task_load >> task_end# Équivalent à :# task_start.set_downstream(task_extract)# task_extract.set_downstream(task_transform)# etc.
Paramètres importants du DAG
Schedule Interval (fréquence d’exécution)
Preset
Équivalent Cron
Description
@once
-
Une seule fois
@hourly
0 * * * *
Chaque heure
@daily
0 0 * * *
Chaque jour à minuit
@weekly
0 0 * * 0
Chaque dimanche
@monthly
0 0 1 * *
Le 1er du mois
@yearly
0 0 1 1 *
Le 1er janvier
None
-
Déclenché manuellement
'0 6 * * 1-5'
Cron
Lun-Ven à 6h
Catchup
# catchup=True (défaut) :# Si start_date=2024-01-01 et on est le 2024-01-10,# Airflow va exécuter les 10 DAG Runs manqués !# catchup=False :# Exécute seulement à partir de maintenant
Default Args importants
default_args = {'owner': 'data_team', # Propriétaire'depends_on_past': False, # Dépend du run précédent ?'email_on_failure': True, # Email si échec'retries': 3, # Nombre de retry'retry_delay': timedelta(minutes=5), # Délai entre retries'execution_timeout': timedelta(hours=1), # Timeout'sla': timedelta(hours=2), # SLA (alerte si dépassé)}
Les Operators les plus utilisés
PythonOperator
from airflow.operators.python import PythonOperatordef my_function(name, **context):print(f"Hello {name}!")print(f"Execution date: {context['ds']}")return"success"task = PythonOperator( task_id='python_task', python_callable=my_function, op_kwargs={'name': 'World'}, # Arguments de la fonction)
from airflow.providers.postgres.operators.postgres import PostgresOperatortask = PostgresOperator( task_id='create_table', postgres_conn_id='my_postgres', # Connexion définie dans UI sql=""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(100) ); """,)
from airflow.operators.empty import EmptyOperator# Utile pour créer des points de jonctionstart = EmptyOperator(task_id='start')end = EmptyOperator(task_id='end')
XCom — Passer des données entre tâches
XCom (Cross-Communication) permet de partager des données entre tâches.
┌──────────┐ XCom ┌───────────┐
│ Task A │ ─────── data ───────► │ Task B │
│ return │ │ xcom_pull │
└──────────┘ └───────────┘
Méthode 1 : Return (automatique)
Voir le code
# XCom avec return (automatique)def extract(): data = {'records': 1000, 'status': 'ok'}return data # Automatiquement stocké dans XComdef transform(**context):# Récupérer via ti (task instance) ti = context['ti'] data = ti.xcom_pull(task_ids='extract')print(f"Reçu: {data}")return {'records': data['records'], 'transformed': True}
Méthode 2 : Push/Pull explicite
def task_a(**context):# Push explicite avec une clé context['ti'].xcom_push(key='my_data', value={'count': 42}) context['ti'].xcom_push(key='status', value='success')def task_b(**context):# Pull avec la clé data = context['ti'].xcom_pull(task_ids='task_a', key='my_data') status = context['ti'].xcom_pull(task_ids='task_a', key='status')
⚠️ Limites de XCom
Limite
Description
Taille
Max ~48KB par défaut (stocké en DB)
Sérialisation
Doit être JSON-sérialisable
Pas pour big data
Utiliser S3/GCS pour gros fichiers
# ❌ Mauvaise pratiquedef extract(): df = pd.read_csv('big_file.csv') # 1GBreturn df.to_dict() # ❌ Trop gros pour XCom !# ✅ Bonne pratiquedef extract(): df = pd.read_csv('big_file.csv') path ='/data/output/extract_2024-01-15.parquet' df.to_parquet(path)return path # ✅ Passer le chemin, pas les données
TaskFlow API (Airflow 2.0+)
Syntaxe moderne et plus Pythonic avec des décorateurs :
Voir le code
# TaskFlow API - Syntaxe moderne (Airflow 2.0+)from airflow.decorators import dag, taskfrom datetime import datetime@dag( dag_id='etl_taskflow', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'],)def etl_pipeline():@task()def extract():"""Extraire les données"""return {'records': 1000, 'source': 'api'}@task()def transform(data: dict):"""Transformer les données"""return {'records': data['records'],'cleaned': True }@task()def load(data: dict):"""Charger les données"""print(f"Loading {data['records']} records")# Définir le flow - XCom automatique ! raw_data = extract() clean_data = transform(raw_data) load(clean_data)# Instancier le DAGetl_pipeline()
Avantages TaskFlow API
Avantage
Description
XCom automatique
Les retours sont passés automatiquement
Code plus lisible
Ressemble à du Python normal
Type hints
Support des annotations de type
Moins de boilerplate
Pas besoin de créer des Operators
Sensors — Attendre une condition
Les Sensors attendent qu’une condition soit remplie avant de continuer.
# Branching - Exécution conditionnellefrom airflow.operators.python import BranchPythonOperatorfrom airflow.operators.empty import EmptyOperatordef choose_branch(**context):"""Décider quelle branche exécuter"""# Exemple : vérifier le jour de la semaine day = context['ds_nodash'] # Format YYYYMMDD# Logique métierifint(day) %2==0:return'process_even'# Retourner le task_id à exécuterelse:return'process_odd'branch = BranchPythonOperator( task_id='branch', python_callable=choose_branch,)process_even = EmptyOperator(task_id='process_even')process_odd = EmptyOperator(task_id='process_odd')end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success')# Définir le flowbranch >> [process_even, process_odd] >> end
Trigger Rules
Contrôler quand une tâche s’exécute en fonction du statut des tâches parentes.
Trigger Rule
Exécute si…
all_success
Tous les parents ont réussi (défaut)
all_failed
Tous les parents ont échoué
all_done
Tous les parents sont terminés (peu importe le statut)
one_success
Au moins un parent a réussi
one_failed
Au moins un parent a échoué
none_failed
Aucun parent n’a échoué (succès ou skipped)
none_skipped
Aucun parent n’a été skipped
from airflow.utils.trigger_rule import TriggerRule# Tâche de notification en cas d'échecnotify_failure = EmailOperator( task_id='notify_failure', to='team@company.com', subject='Pipeline Failed!', html_content='...', trigger_rule=TriggerRule.ONE_FAILED, # Exécute si un parent échoue)# Tâche finale qui s'exécute toujourscleanup = BashOperator( task_id='cleanup', bash_command='rm -rf /tmp/data/*', trigger_rule=TriggerRule.ALL_DONE, # Toujours exécuter)
Connections et Variables
Connections
Stocker les informations de connexion aux systèmes externes.
# ❌ Non idempotent - accumule des donnéesdef bad_load(): db.execute("INSERT INTO table VALUES (...)")# ✅ Idempotent - même résultat si relancédef good_load(): db.execute("DELETE FROM table WHERE date = '{{ ds }}'") db.execute("INSERT INTO table SELECT ... WHERE date = '{{ ds }}'")
# ❌ Code exécuté à chaque parsingdata = fetch_from_api() # Appelé toutes les 30s !# ✅ Logique dans les tasks@taskdef fetch_data():return fetch_from_api()
Gestion des dépendances - A >> B = B attend A Retry automatique - Réessaie en cas d’échec Interface web - Visualisation complète Monitoring - Logs centralisés Alertes - Email/Slack en cas d’échec Scalable - Gère 100+ pipelines Extensible - Custom operators, hooks Communauté - Très active, beaucoup de providers
❌ Faiblesses d’Airflow
Complexe - Courbe d’apprentissage Ressources - Besoin de 4-8 GB RAM Overkill - Pour 1-3 scripts simples Setup - Installation et configuration Pas pour le streaming - Batch only (utiliser Kafka)
Quand utiliser Airflow ?
OUI : 10+ pipelines, dépendances complexes, production NON : 1-5 scripts simples sans dépendances
Tableau de décision
Quel outil choisir ?
Situation
Outil recommandé
J’ai 1-3 scripts sur Windows
🪟 Task Scheduler
J’ai 1-3 scripts sur Linux
🐧 Crontab
J’ai 5-10 scripts indépendants
🐧 Crontab
J’ai 10+ scripts avec dépendances
🌬️ Airflow
Script B doit attendre script A
🌬️ Airflow
Je veux un dashboard
🌬️ Airflow
Je débute en automatisation
🪟 Task Scheduler
Production critique
🌬️ Airflow
Progression naturelle
1. Débutez avec Task Scheduler ou Cron
2. Quand vous avez 5+ scripts → Pensez à migrer
3. Quand vous avez des dépendances → Migrez vers Airflow
Comment l’utiliser ?
Ouvrir le planificateur :
Créer une tâche :
python.exeavecC:\scripts\mon_script.py✅ Voilà ! Votre script s’exécutera automatiquement tous les jours à 2h.