Bienvenue dans ce module où tu vas apprendre à accélérer considérablement tes pipelines Python. Tu découvriras comment contourner les limitations du GIL, paralléliser tes traitements, et gérer des fichiers plus grands que ta RAM !
Prérequis
Niveau
Compétence
✅ Requis
Maîtriser Python (fonctions, classes)
✅ Requis
Avoir suivi le module 17_polars_for_data_engineering
💡 Recommandé
Connaître Pandas
🎯 Objectifs du module
À la fin de ce module, tu seras capable de :
Comprendre le GIL et ses implications sur la performance
Profiler ton code pour identifier les vrais goulots d’étranglement
Utiliser concurrent.futures pour paralléliser simplement
Maîtriser asyncio pour l’I/O massivement parallèle
Exploiter Dask pour traiter des fichiers plus grands que la RAM
Choisir le bon outil selon ton problème
1. Le GIL : Comprendre la limitation fondamentale
Cette section est essentielle pour comprendre pourquoi certaines techniques fonctionnent et d’autres non.
1.1 Qu’est-ce que le GIL ?
Le Global Interpreter Lock (GIL) est un verrou interne à Python qui empêche deux threads d’exécuter du code Python en même temps.
Pourquoi le GIL existe ?
Python gère la mémoire automatiquement (garbage collector). Sans le GIL, deux threads pourraient modifier le même objet simultanément → corruption de mémoire. Le GIL est une solution simple mais qui limite la performance.
Visualisation du GIL
╔══════════════════════════════════════════════════════════════════════════╗
║ AVEC LE GIL (threading) ║
╠══════════════════════════════════════════════════════════════════════════╣
║ ║
║ Thread 1: ████████░░░░░░░░████████░░░░░░░░████████░░░░░░░░ ║
║ Thread 2: ░░░░░░░░████████░░░░░░░░████████░░░░░░░░████████ ║
║ ──────────────────────────────────────────────────▶ temps ║
║ ║
║ → Un seul thread s'exécute à la fois ! ║
║ → Les threads ALTERNENT, ils ne sont pas vraiment parallèles ║
║ → Pour du calcul CPU : PAS de gain de performance ! ║
║ ║
╠══════════════════════════════════════════════════════════════════════════╣
║ SANS GIL (multiprocessing) ║
╠══════════════════════════════════════════════════════════════════════════╣
║ ║
║ Process 1: ████████████████████████████████████████████████ ║
║ Process 2: ████████████████████████████████████████████████ ║
║ ──────────────────────────────────────────────────▶ temps ║
║ ║
║ → VRAIE exécution parallèle sur plusieurs CPUs ! ║
║ → Chaque process a son propre interpréteur Python ║
║ → Pour du calcul CPU : gain de performance proportionnel aux CPUs ║
║ ║
╚══════════════════════════════════════════════════════════════════════════╝
1.2 CPU-bound vs I/O-bound : La distinction CRUCIALE
Avant de choisir une technique, tu DOIS savoir si ton problème est CPU-bound ou I/O-bound :
Type
C’est quoi ?
Exemples
Le GIL bloque ?
Solution
CPU-bound
Le CPU travaille en continu
Calculs mathématiques, transformations de données, parsing, compression
✅ OUI
multiprocessing, ProcessPoolExecutor
I/O-bound
Le CPU attend des données externes
Requêtes API, lecture fichiers, requêtes base de données
❌ NON
threading, asyncio
Pourquoi le GIL ne bloque PAS l’I/O ?
Quand Python attend une réponse (réseau, disque, etc.), il relâche le GIL automatiquement :
Thread 1: [code Python]──▶[attend réseau]──▶[code Python]
GIL ↓ ↓ GIL libre ↓ GIL
Thread 2: [attend GIL]──▶[code Python]──▶[attend GIL]
GIL ↓
→ Pendant que Thread 1 attend le réseau, Thread 2 peut s'exécuter !
1.3 Démonstration : GIL en action
ℹ️ Le savais-tu ?
Le GIL a été introduit dans Python pour simplifier la gestion de la mémoire. Il rend Python thread-safe par défaut, mais au prix de la performance multi-thread.
Des projets comme nogil (Python 3.13+) et subinterpreters travaillent à supprimer ou contourner cette limitation.
En attendant, les Data Engineers utilisent multiprocessing pour contourner le GIL !
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ DÉMONSTRATION DU GIL : Threading vs Multiprocessing ║# ╚══════════════════════════════════════════════════════════════════════════╝import timeimport threadingfrom multiprocessing import Process# ─────────────────────────────────────────────────────────────────────────────# FONCTION CPU-INTENSIVE# ─────────────────────────────────────────────────────────────────────────────# Cette fonction fait des calculs lourds (boucle + opérations mathématiques)# C'est du "CPU-bound" car le CPU travaille en continu sans attendredef cpu_intensive(n):""" Tâche CPU-bound : calcul intensif. Args: n: Nombre d'itérations (plus c'est grand, plus c'est long) Returns: La somme des carrés de 0 à n-1 """ total =0for i inrange(n): total += i **2# Opération CPU : élévation au carréreturn total# Paramètre : 5 millions d'itérations par appelN =5_000_000print("="*60)print("DÉMONSTRATION : Impact du GIL sur les tâches CPU-bound")print("="*60)# ─────────────────────────────────────────────────────────────────────────────# TEST 1 : EXÉCUTION SÉQUENTIELLE (baseline)# ─────────────────────────────────────────────────────────────────────────────# On exécute la fonction 2 fois, l'une après l'autre# C'est notre référence pour mesurer le gain des autres méthodesprint("\n Test 1 : Séquentiel (référence)")start = time.time()cpu_intensive(N) # Premier appelcpu_intensive(N) # Deuxième appel (attend que le premier finisse)seq_time = time.time() - startprint(f" Temps : {seq_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# TEST 2 : THREADING (bloqué par le GIL)# ─────────────────────────────────────────────────────────────────────────────# On crée 2 threads qui exécutent cpu_intensive en "parallèle"# MAIS : le GIL empêche l'exécution simultanée !# Résultat attendu : temps SIMILAIRE au séquentiel (pas de gain)print("\n Test 2 : Threading (2 threads)")start = time.time()# Créer les threadst1 = threading.Thread(target=cpu_intensive, args=(N,)) # Thread 1t2 = threading.Thread(target=cpu_intensive, args=(N,)) # Thread 2# Démarrer les threadst1.start() # Lance Thread 1t2.start() # Lance Thread 2 (mais GIL bloque l'exécution simultanée !)# Attendre que les threads finissentt1.join() # Attend Thread 1t2.join() # Attend Thread 2thread_time = time.time() - startprint(f" Temps : {thread_time:.2f}s")print(f" Pas plus rapide ! Le GIL empêche la parallélisation.")# ─────────────────────────────────────────────────────────────────────────────# TEST 3 : MULTIPROCESSING (contourne le GIL)# ─────────────────────────────────────────────────────────────────────────────# On crée 2 PROCESSUS séparés (pas des threads)# Chaque processus a son propre interpréteur Python = son propre GIL# Résultat attendu : temps DIVISÉ PAR 2 (vraie parallélisation)print("\n Test 3 : Multiprocessing (2 processus)")start = time.time()# Créer les processusp1 = Process(target=cpu_intensive, args=(N,)) # Processus 1p2 = Process(target=cpu_intensive, args=(N,)) # Processus 2# Démarrer les processusp1.start() # Lance Processus 1 sur CPU 1p2.start() # Lance Processus 2 sur CPU 2 (VRAIMENT en parallèle !)# Attendre que les processus finissentp1.join() # Attend Processus 1p2.join() # Attend Processus 2proc_time = time.time() - startprint(f" Temps : {proc_time:.2f}s")print(f" ~2x plus rapide ! Le multiprocessing contourne le GIL.")# ─────────────────────────────────────────────────────────────────────────────# RÉSUMÉ# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("RÉSUMÉ")print("="*60)print(f" Séquentiel : {seq_time:.2f}s (référence)")print(f" Threading : {thread_time:.2f}s (speedup : {seq_time/thread_time:.1f}x)")print(f" Multiprocessing : {proc_time:.2f}s (speedup : {seq_time/proc_time:.1f}x)")print()print("CONCLUSION :")print(" Pour du CPU-bound → utilise multiprocessing (ou ProcessPoolExecutor)")print(" Pour de l\'I/O-bound → threading ou asyncio fonctionnent bien")
2. Identifier les goulots : Profiling
⚠️ “Premature optimization is the root of all evil” — Donald Knuth
Avant d’optimiser, il faut mesurer pour identifier le vrai problème.
2.1 Outils de profiling
Outil
Usage
Comment l’utiliser
%%time
Temps d’une cellule
Jupyter magic
%%timeit
Temps moyen (plusieurs runs)
Jupyter magic
cProfile
Profiling par fonction
python -m cProfile script.py
line_profiler
Profiling ligne par ligne
@profile decorator
memory_profiler
Usage RAM
@profile + mprof run
Voir le code
# %%time - mesure le temps d'exécution d'une celluleimport timedef slow_function(): total =0for i inrange(1_000_000): total += ireturn total%time result = slow_function()
Voir le code
# %%timeit - moyenne sur plusieurs exécutionsdef fast_function():returnsum(range(1_000_000))%timeit fast_function()
Voir le code
import cProfileimport pstatsfrom io import StringIOdef main_pipeline():"""Pipeline simulé avec plusieurs étapes""" data =list(range(100_000))# Étape 1 : transformation transformed = [x **2for x in data]# Étape 2 : filtrage filtered = [x for x in transformed if x %2==0]# Étape 3 : agrégation result =sum(filtered)return result# Profiler le codeprofiler = cProfile.Profile()profiler.enable()result = main_pipeline()profiler.disable()# Afficher les résultatsstream = StringIO()stats = pstats.Stats(profiler, stream=stream).sort_stats('cumulative')stats.print_stats(10)print(stream.getvalue())
Recommandé : Plus simple que multiprocessing et threading bruts Interface unifiée : Même API pour threads et processes
4.1 ThreadPoolExecutor : Pour les tâches I/O-bound
Le ThreadPoolExecutor crée un pool de threads réutilisables. C’est idéal pour : - Télécharger plusieurs fichiers - Faire plusieurs requêtes API - Lire/écrire plusieurs fichiers
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ ThreadPoolExecutor : Paralléliser des tâches I/O-bound ║# ╚══════════════════════════════════════════════════════════════════════════╝from concurrent.futures import ThreadPoolExecutorimport time# ─────────────────────────────────────────────────────────────────────────────# FONCTION QUI SIMULE UNE TÂCHE I/O (requête API, lecture fichier, etc.)# ─────────────────────────────────────────────────────────────────────────────def simulate_io_task(task_id):""" Simule une tâche I/O-bound (ex: requête API). Dans la vraie vie, ça pourrait être : - requests.get("https://api.example.com/data") - open("fichier.csv").read() - cursor.execute("SELECT * FROM table") Args: task_id: Identifiant de la tâche (pour le logging) Returns: Message de confirmation """print(f" Tâche {task_id} : début") time.sleep(0.5) # Simule 500ms de latence réseauprint(f" Tâche {task_id} : terminée")returnf"Résultat de la tâche {task_id}"# Liste des tâches à effectuer (10 tâches)tasks =list(range(10))print("="*60)print("🔬 ThreadPoolExecutor : Paralléliser des tâches I/O")print("="*60)# ─────────────────────────────────────────────────────────────────────────────# MÉTHODE 1 : SÉQUENTIEL (pour comparer)# ─────────────────────────────────────────────────────────────────────────────# 10 tâches × 0.5s = 5s au totalprint("\n Méthode 1 : Séquentiel")print("-"*40)start = time.time()results_seq = [simulate_io_task(t) for t in tasks]seq_time = time.time() - startprint(f"\n Temps séquentiel : {seq_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# MÉTHODE 2 : THREADPOOLEXECUTOR# ─────────────────────────────────────────────────────────────────────────────# Avec 5 workers : 10 tâches / 5 workers = 2 vagues = ~1sprint("\n Méthode 2 : ThreadPoolExecutor (5 workers)")print("-"*40)start = time.time()# Créer un pool de 5 threads# with ... as : le pool se ferme automatiquement à la finwith ThreadPoolExecutor(max_workers=5) as executor:## executor.map() applique la fonction à chaque élément de la liste# - Distribue les tâches aux threads disponibles# - Retourne les résultats DANS L'ORDRE de la liste originale# - Bloque jusqu'à ce que toutes les tâches soient terminées# results_parallel =list(executor.map(simulate_io_task, tasks))parallel_time = time.time() - startprint(f"\n Temps parallèle : {parallel_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# COMPARAISON# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("COMPARAISON")print("="*60)print(f" Séquentiel : {seq_time:.2f}s")print(f" Parallèle : {parallel_time:.2f}s")print(f" Speedup : {seq_time/parallel_time:.1f}x plus rapide !")print()print("Avec 5 workers pour 10 tâches :")print(" → 2 vagues de 5 tâches")print(" → 2 × 0.5s = ~1s au lieu de 10 × 0.5s = 5s")
4.2 ProcessPoolExecutor (CPU-bound)
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ ProcessPoolExecutor : Paralléliser des tâches CPU-bound ║# ╚══════════════════════════════════════════════════════════════════════════╝from concurrent.futures import ProcessPoolExecutorimport timeimport os# ─────────────────────────────────────────────────────────────────────────────# FONCTION CPU-INTENSIVE# ─────────────────────────────────────────────────────────────────────────────def cpu_task(n):""" Tâche CPU-bound : calcul intensif. Cette fonction fait travailler le CPU en continu. Exemples réels : parsing, compression, transformations de données, calculs mathématiques, feature engineering. Args: n: Nombre d'itérations Returns: Somme des carrés """returnsum(i **2for i inrange(n))# ─────────────────────────────────────────────────────────────────────────────# DONNÉES À TRAITER# ─────────────────────────────────────────────────────────────────────────────# On simule 8 "chunks" de données à traiter# Chaque chunk nécessite 1 million d'itérationsdata_chunks = [1_000_000] *8# 8 chunks de 1M itérations chacunprint("="*60)print(" ProcessPoolExecutor : Paralléliser des tâches CPU-bound")print("="*60)print(f" CPUs disponibles : {os.cpu_count()}")print(f" Chunks à traiter : {len(data_chunks)}")# ─────────────────────────────────────────────────────────────────────────────# MÉTHODE 1 : SÉQUENTIEL# ─────────────────────────────────────────────────────────────────────────────print("\n Méthode 1 : Séquentiel")print("-"*40)start = time.time()results_seq = [cpu_task(chunk) for chunk in data_chunks]seq_time = time.time() - startprint(f" Temps séquentiel : {seq_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# MÉTHODE 2 : PROCESSPOOL# ─────────────────────────────────────────────────────────────────────────────# Chaque worker est un PROCESSUS séparé avec son propre GIL# → Vraie parallélisation sur plusieurs CPUs !print("\n Méthode 2 : ProcessPoolExecutor (4 workers)")print("-"*40)start = time.time()# Créer un pool de 4 processus# Note : max_workers = nombre de CPUs est généralement optimalwith ProcessPoolExecutor(max_workers=4) as executor:## executor.map() distribue les chunks aux processus# - Processus 1 traite chunk[0], chunk[4]# - Processus 2 traite chunk[1], chunk[5]# - etc.# results_parallel =list(executor.map(cpu_task, data_chunks))proc_time = time.time() - startprint(f" Temps parallèle : {proc_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# COMPARAISON# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("COMPARAISON")print("="*60)print(f" Séquentiel : {seq_time:.2f}s")print(f" Parallèle : {proc_time:.2f}s")print(f" Speedup : {seq_time/proc_time:.1f}x plus rapide !")print()print("Explication :")print(" → 8 chunks avec 4 workers = 2 vagues")print(" → Chaque processus utilise 100% d'un CPU")print(" → Speedup théorique max = min(workers, chunks) = 4x")print()print("⚠️ IMPORTANT :")print(" → ProcessPoolExecutor pour CPU-bound (contourne le GIL)")print(" → ThreadPoolExecutor pour I/O-bound (GIL pas bloquant)")
4.3 Gestion avancée : submit() et as_completed()
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ submit() et as_completed() : Contrôle avancé des tâches ║# ╚══════════════════════════════════════════════════════════════════════════╝## executor.map() est simple mais limité :# - Attend que TOUTES les tâches finissent# - Retourne les résultats dans l'ordre original## submit() + as_completed() permet :# - Traiter les résultats dès qu'ils arrivent# - Gérer les erreurs individuellement# - Ajouter des timeouts par tâchefrom concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport random# ─────────────────────────────────────────────────────────────────────────────# FONCTION AVEC TEMPS D'EXÉCUTION VARIABLE# ─────────────────────────────────────────────────────────────────────────────def fetch_data(task_id):""" Simule une requête avec temps variable. Certaines tâches sont rapides, d'autres lentes. """ delay = random.uniform(0.1, 1.0) # Entre 0.1s et 1s time.sleep(delay)# Simuler une erreur aléatoire (10% de chance)if random.random() <0.1:raiseException(f"Erreur sur la tâche {task_id}")return {"task_id": task_id, "delay": round(delay, 2)}print("="*60)print(" submit() et as_completed() : Contrôle avancé")print("="*60)# ─────────────────────────────────────────────────────────────────────────────# UTILISATION DE submit() + as_completed()# ─────────────────────────────────────────────────────────────────────────────with ThreadPoolExecutor(max_workers=5) as executor:# ─────────────────────────────────────────────────────────────────────# ÉTAPE 1 : Soumettre les tâches avec submit()# ─────────────────────────────────────────────────────────────────────# submit() retourne un objet Future (promesse de résultat futur)# On crée un dictionnaire {future: task_id} pour identifier les tâches futures = {}for i inrange(5): future = executor.submit(fetch_data, i) # Soumet la tâche futures[future] = i # Associe le future à l'ID de la tâcheprint(f"\n{len(futures)} tâches soumises")print("-"*40)# ─────────────────────────────────────────────────────────────────────# ÉTAPE 2 : Traiter les résultats avec as_completed()# ─────────────────────────────────────────────────────────────────────# as_completed() itère sur les futures dans l'ORDRE DE COMPLÉTION# (pas l'ordre de soumission !)# → La première tâche terminée est traitée en premierfor future in as_completed(futures): task_id = futures[future] # Récupérer l'ID de la tâchetry:# future.result() récupère le résultat# timeout=5 : lève TimeoutError si > 5 secondes result = future.result(timeout=5)print(f" Tâche {task_id} terminée en {result['delay']}s")exceptExceptionas e:# Gestion individuelle des erreursprint(f"❌ Tâche {task_id} a échoué : {e}")print("\n AVANTAGES de submit() + as_completed() :")print(" → Traiter les résultats dès qu'ils arrivent")print(" → Gérer les erreurs individuellement")print(" → Ajouter des timeouts par tâche")print(" → Plus de contrôle que executor.map()")
4.4 Quand utiliser quoi ?
Executor
GIL contourné ?
Usage
Exemple
ThreadPoolExecutor
❌ Non
I/O : API, fichiers
Télécharger 50 fichiers
ProcessPoolExecutor
✅ Oui
CPU : calculs, ETL
Transformer 8 chunks de données
🔧 5. multiprocessing — Contrôle avancé
Pour les cas où concurrent.futures ne suffit pas.
5.1 Pool avec map
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ multiprocessing.Pool : Traitement parallèle avec partitioning ║# ╚══════════════════════════════════════════════════════════════════════════╝## Pool est utile quand tu veux :# - Partitionner des données en chunks# - Traiter chaque chunk en parallèle# - Combiner les résultatsfrom multiprocessing import Pool, cpu_countimport numpy as np# ─────────────────────────────────────────────────────────────────────────────# FONCTION DE TRAITEMENT D'UN CHUNK# ─────────────────────────────────────────────────────────────────────────────def process_chunk(chunk):""" Traite un chunk de données. Dans la vraie vie : - Transformation de données - Feature engineering - Calculs statistiques Args: chunk: numpy array (portion des données) Returns: Résultat du traitement (ici : somme des carrés) """return np.sum(chunk **2)print("="*60)print(" multiprocessing.Pool : Partitionner et paralléliser")print("="*60)# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 1 : Créer les données# ─────────────────────────────────────────────────────────────────────────────big_array = np.random.rand(1_000_000) # 1 million de valeursprint(f"\n📊 Données : {len(big_array):,} valeurs")# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 2 : Partitionner en chunks# ─────────────────────────────────────────────────────────────────────────────# On divise les données en autant de chunks que de CPUs# Chaque CPU traitera un chunkn_workers = cpu_count()chunks = np.array_split(big_array, n_workers)print(f" CPUs disponibles : {n_workers}")print(f" Chunks créés : {len(chunks)}")print(f" Taille de chaque chunk : ~{len(chunks[0]):,} valeurs")# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 3 : Traitement parallèle avec Pool# ─────────────────────────────────────────────────────────────────────────────print("\n Traitement parallèle...")# Pool() crée un pool de processus réutilisables# with ... as : le pool se ferme automatiquement à la finwith Pool(processes=n_workers) as pool:## pool.map() applique process_chunk à chaque chunk# - Les chunks sont distribués aux processus disponibles# - Chaque processus traite son chunk indépendamment# - Les résultats sont retournés dans l'ordre des chunks# results = pool.map(process_chunk, chunks)# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 4 : Combiner les résultats# ─────────────────────────────────────────────────────────────────────────────# Chaque processus a calculé la somme des carrés de son chunk# On additionne tous les résultats partielstotal =sum(results)print(f"\n Résultat total : {total:.2f}")print("\n PATTERN CLASSIQUE : Map-Reduce")print(" 1. SPLIT : Diviser les données en chunks")print(" 2. MAP : Traiter chaque chunk en parallèle")print(" 3. REDUCE : Combiner les résultats (ici: sum)")
5.2 starmap pour plusieurs arguments
Voir le code
from multiprocessing import Pooldef process_with_params(data, multiplier, offset):"""Fonction avec plusieurs paramètres"""returnsum(data) * multiplier + offset# Préparer les argumentsargs_list = [ ([1, 2, 3], 2, 10), ([4, 5, 6], 3, 20), ([7, 8, 9], 4, 30),]with Pool(3) as pool: results = pool.starmap(process_with_params, args_list)print("Résultats:", results)
5.3 Limites et précautions
⚠️ Limite
Explication
Overhead
Créer des process prend du temps (~100ms)
Sérialisation
Les données sont copiées (pickle)
if __name__ == "__main__"
Obligatoire sur Windows
Mémoire
Chaque process a sa propre mémoire
# ⚠️ Toujours protéger avec if __name__ == "__main__"if__name__=="__main__":with Pool(4) as pool: results = pool.map(my_func, data)
6. asyncio — I/O massivement parallèle
✅ Idéal pour : 100+ requêtes API, ingestion massive, crawling web ❌ Pas pour : Calculs CPU-intensive
6.1 Comprendre async/await
asyncio utilise un modèle single-thread non-bloquant. Au lieu de créer plusieurs threads, un seul thread gère plusieurs tâches en switchant entre elles quand l’une attend.
Comment ça fonctionne ?
╔══════════════════════════════════════════════════════════════════════════╗
║ asyncio : Single thread ║
╠══════════════════════════════════════════════════════════════════════════╣
║ ║
║ UN SEUL THREAD gère plusieurs tâches : ║
║ ║
║ Tâche 1: [code]──▶[await: attend API]──────────────▶[code]──▶ done ║
║ ↓ ↑ ║
║ Le thread switch │ ║
║ ↓ │ ║
║ Tâche 2: [attend]──▶[code]──▶[await: attend DB]────────│──▶ done ║
║ ↓ │ ║
║ Le thread switch │ ║
║ ↓ │ ║
║ Tâche 3: [attend]─────────────▶[code]──▶[await]────────┴──▶ done ║
║ ║
║ → Le thread ne reste jamais "bloqué" à attendre ║
║ → Dès qu'une tâche attend, il passe à une autre ║
║ → Très efficace pour l'I/O : un seul thread peut gérer 1000+ requêtes ║
║ ║
╚══════════════════════════════════════════════════════════════════════════╝
6.2 Les mots-clés async/await
Mot-clé
Ce que ça fait
Quand l’utiliser
async def
Déclare une fonction coroutine
Fonctions qui font de l’I/O asynchrone
await
Attend le résultat d’une coroutine
Quand tu appelles une fonction async
asyncio.gather()
Lance plusieurs coroutines en parallèle
Pour paralléliser des tâches
asyncio.run()
Point d’entrée pour exécuter du code async
Dans un script (pas dans Jupyter)
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ asyncio : Comprendre async/await ║# ╚══════════════════════════════════════════════════════════════════════════╝import asyncioimport time# ─────────────────────────────────────────────────────────────────────────────# FONCTION ASYNCHRONE (coroutine)# ─────────────────────────────────────────────────────────────────────────────# "async def" crée une COROUTINE, pas une fonction normale# Une coroutine peut être "mise en pause" avec awaitasyncdef fetch_data(task_id):""" Simule une requête API asynchrone. Dans la vraie vie, ça serait : - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - return await response.json() Args: task_id: Identifiant de la tâche Returns: Résultat simulé """print(f" Tâche {task_id} : début")# await asyncio.sleep() = attente NON-BLOQUANTE# Pendant que cette tâche attend, d'autres tâches peuvent s'exécuter !# C'est LA différence avec time.sleep() qui bloque toutawait asyncio.sleep(1) # Simule 1s de latence réseauprint(f" Tâche {task_id} : terminée")returnf"result_{task_id}"# ─────────────────────────────────────────────────────────────────────────────# FONCTION PRINCIPALE ASYNCHRONE# ─────────────────────────────────────────────────────────────────────────────asyncdef main():""" Fonction principale qui orchestre les tâches. asyncio.gather() lance plusieurs coroutines EN PARALLÈLE et attend que TOUTES soient terminées. """print("="*60)print("asyncio : Lancer 5 tâches en parallèle")print("="*60)# Créer une liste de coroutines (pas encore exécutées !)# Note : fetch_data(i) retourne un objet coroutine, pas le résultat tasks = [fetch_data(i) for i inrange(5)]print(f"\n{len(tasks)} tâches créées")print("="*60)# asyncio.gather() lance TOUTES les tâches en parallèle# - Les 5 tâches démarrent en même temps# - Chaque tâche attend 1s (await asyncio.sleep(1))# - MAIS elles attendent en parallèle !# - Temps total : ~1s au lieu de 5s results =await asyncio.gather(*tasks)# *tasks "déballe" la liste : gather(task[0], task[1], task[2], ...)return results# ─────────────────────────────────────────────────────────────────────────────# EXÉCUTION# ─────────────────────────────────────────────────────────────────────────────# Dans Jupyter : await main() fonctionne directement# Dans un script Python normal : asyncio.run(main())start = time.time()results =await main() # Jupyter permet d'utiliser await directementtotal_time = time.time() - startprint("="*60)print("RÉSULTAT")print("="*60)print(f" Temps total : {total_time:.2f}s")print(f" Résultats : {results}")print()print(" EXPLICATION :")print(" → 5 tâches de 1s chacune")print(" → En séquentiel : 5 × 1s = 5s")print(" → En parallèle (asyncio) : ~1s (toutes en même temps)")print()print("⚠️ IMPORTANT :")print(" → await asyncio.sleep() ≠ time.sleep()")print(" → asyncio.sleep() libère le thread pour d'autres tâches")print(" → time.sleep() bloque TOUT le programme")
6.2 Exemple réel avec aiohttp
Voir le code
# Installation : pip install aiohttpimport asyncio# Simulons aiohttp pour l'exemple (sans vraies requêtes)asyncdef fetch_url(session, url):"""Simule une requête HTTP"""await asyncio.sleep(0.1) # Simule latencereturn {"url": url, "status": 200}asyncdef fetch_all_urls(urls):"""Fetch toutes les URLs en parallèle""" session =None# En vrai : async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results =await asyncio.gather(*tasks, return_exceptions=True)return results# Simuler 20 URLsurls = [f"https://api.example.com/data/{i}"for i inrange(20)]start = time.time()results =await fetch_all_urls(urls)print(f" 20 requêtes en {time.time() - start:.2f}s")print(f" Succès : {len([r for r in results ifisinstance(r, dict)])}")
6.3 Semaphore : limiter les connexions simultanées
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ Semaphore : Limiter le nombre de connexions simultanées ║# ╚══════════════════════════════════════════════════════════════════════════╝## PROBLÈME : Tu veux faire 100 requêtes API, mais l'API limite à 5# connexions simultanées (rate limiting).## SOLUTION : Semaphore = un compteur qui limite les accès concurrentsimport asyncioimport timeprint("="*60)print("🔬 Semaphore : Limiter la concurrence")print("="*60)# ─────────────────────────────────────────────────────────────────────────────# CRÉER UN SEMAPHORE# ─────────────────────────────────────────────────────────────────────────────# Semaphore(5) = maximum 5 tâches peuvent s'exécuter en même temps# Les autres attendent qu'une place se libèreMAX_CONCURRENT =5semaphore = asyncio.Semaphore(MAX_CONCURRENT)print(f"\n🚦 Semaphore créé : max {MAX_CONCURRENT} tâches simultanées")# ─────────────────────────────────────────────────────────────────────────────# FONCTION AVEC SEMAPHORE# ─────────────────────────────────────────────────────────────────────────────asyncdef fetch_limited(task_id):""" Fait une requête en respectant la limite de concurrence. async with semaphore : - Tente d'acquérir le semaphore (décrémente le compteur) - Si compteur = 0, ATTEND qu'une tâche se termine - À la sortie du with, libère le semaphore (incrémente le compteur) """# async with semaphore : attend si déjà 5 tâches en coursasyncwith semaphore:print(f" Tâche {task_id:2d} démarre (slot acquis)")await asyncio.sleep(0.5) # Simule la requêteprint(f" Tâche {task_id:2d} terminée (slot libéré)")return task_id# ─────────────────────────────────────────────────────────────────────────────# EXÉCUTION : 15 tâches avec max 5 simultanées# ─────────────────────────────────────────────────────────────────────────────asyncdef main():print("\n"+"-"*40)print(f" Lancement de 15 tâches (max {MAX_CONCURRENT} simultanées)")print("-"*40+"\n")# Créer 15 tâches tasks = [fetch_limited(i) for i inrange(15)]# Lancer toutes les tâches# MAIS le semaphore limite à 5 simultanées ! results =await asyncio.gather(*tasks)return resultsstart = time.time()results =await main()total_time = time.time() - startprint("\n"+"="*60)print("RÉSULTAT")print("="*60)print(f"Temps total : {total_time:.2f}s")print(f"Tâches complétées : {len(results)}")print("\n EXPLICATION :")print(f" → 15 tâches de 0.5s chacune")print(f" → Max {MAX_CONCURRENT} simultanées = 3 vagues")print(f" → 3 vagues × 0.5s = ~1.5s (au lieu de 7.5s séquentiel)")print("\n USE CASES :")print(" → Rate limiting API (ex: max 10 req/s)")print(" → Limiter les connexions DB")print(" → Éviter de surcharger un service")
6.4 Quand NE PAS utiliser asyncio
Situation
asyncio efficace ?
Alternative
100+ appels API
✅ Oui
-
Lecture S3/DB massives
✅ Oui
-
Calculs CPU
❌ Non
ProcessPoolExecutor
5 requêtes simples
❌ Overkill
ThreadPoolExecutor
Code synchrone existant
❌ Refactoring lourd
ThreadPoolExecutor
7. Dask — Traiter des fichiers plus grands que la RAM
Le plus utile quand tes données ne tiennent pas en mémoire.
7.1 Pourquoi Dask ?
Problème
Solution classique
Solution Dask
Fichier de 50 Go
MemoryError !
✅ Traité par chunks
1000 fichiers CSV
Boucle lente
✅ Parallélisé automatiquement
Besoin d’apprendre une nouvelle API
😫
✅ API quasi-identique à Pandas
7.2 Comment Dask fonctionne ?
╔══════════════════════════════════════════════════════════════════════════╗
║ Dask : Lazy Evaluation + Partitions ║
╠══════════════════════════════════════════════════════════════════════════╣
║ ║
║ ÉTAPE 1 : Lecture (LAZY - pas de chargement en mémoire !) ║
║ ───────────────────────────────────────────────────────── ║
║ ║
║ ddf = dd.read_csv("data/*.csv") ║
║ ║
║ → Dask SCANNE les fichiers (schéma, taille) ║
║ → Mais NE CHARGE PAS les données ! ║
║ → Crée un "plan d'exécution" ║
║ ║
║ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ║
║ │ Partition 1 │ │ Partition 2 │ │ Partition 3 │ │ Partition 4 │ ║
║ │ (fichier 1) │ │ (fichier 2) │ │ (fichier 3) │ │ (fichier 4) │ ║
║ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ ║
║ ║
║ ÉTAPE 2 : Opérations (LAZY - construit le plan) ║
║ ──────────────────────────────────────────────── ║
║ ║
║ result = ddf.groupby("col").sum() # Pas encore exécuté ! ║
║ ║
║ ÉTAPE 3 : compute() (EXÉCUTION - traitement réel) ║
║ ───────────────────────────────────────────────── ║
║ ║
║ result.compute() ║
║ ║
║ → Dask charge partition 1, la traite, la décharge ║
║ → Puis partition 2, etc. ║
║ → Seule 1 partition en mémoire à la fois ! ║
║ → Utilise tous les CPUs pour traiter les partitions en parallèle ║
║ ║
╚══════════════════════════════════════════════════════════════════════════╝
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ Dask DataFrame : Traiter des fichiers plus grands que la RAM ║# ╚══════════════════════════════════════════════════════════════════════════╝# Installation : pip install "dask[complete]"import dask.dataframe as ddimport pandas as pdimport osimport time# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 0 : Créer des fichiers de test# ─────────────────────────────────────────────────────────────────────────────print("="*60)print("Création des fichiers de test")print("="*60)os.makedirs("data/dask_demo", exist_ok=True)for i inrange(5): df = pd.DataFrame({"id": range(i *10000, (i +1) *10000),"category": [f"cat_{j %5}"for j inrange(10000)],"amount": [float(j %1000) for j inrange(10000)] }) df.to_csv(f"data/dask_demo/file_{i}.csv", index=False)print("5 fichiers CSV créés (50,000 lignes au total)")# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 1 : Lecture LAZY avec Dask# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("Étape 1 : Lecture avec Dask (LAZY)")print("="*60)# dd.read_csv() avec pattern glob : lit TOUS les fichiers correspondants# IMPORTANT : les données ne sont PAS chargées en mémoire !ddf = dd.read_csv("data/dask_demo/*.csv")print(f"\nType de l'objet : {type(ddf)}")print(f"Nombre de partitions : {ddf.npartitions}")print(f" → Chaque fichier = 1 partition")print(f" → Les partitions seront traitées indépendamment")print("\n Schéma des données (sans les charger) :")print(ddf)print("\n⚠️ IMPORTANT : À ce stade, les données ne sont PAS en mémoire !")print(" Dask a juste lu les en-têtes et créé un plan d'exécution.")# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 2 : Pipeline de transformations (LAZY)# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("Étape 2 : Pipeline de transformations (LAZY)")print("="*60)# Toutes ces opérations sont LAZY : rien n'est calculé !# Dask construit un graphe d'exécution (DAG) optimiséresult = ( ddf# Filtrer : garder seulement les montants > 100 .query("amount > 100")# Créer une nouvelle colonne .assign(amount_doubled=ddf.amount *2)# Grouper par catégorie et calculer la somme .groupby("category") .amount.sum())print("Pipeline défini :")print(" 1. Filtrer amount > 100")print(" 2. Créer colonne amount_doubled")print(" 3. GroupBy category + sum")print("\n Toujours LAZY ! Rien n'est calculé.")print(f"Type du résultat : {type(result)}")# ─────────────────────────────────────────────────────────────────────────────# ÉTAPE 3 : Exécution avec compute()# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("Étape 3 : Exécution avec compute()")print("="*60)start = time.time()# compute() déclenche l'exécution RÉELLE du pipeline# - Dask optimise le plan d'exécution# - Traite les partitions en parallèle (utilise tous les CPUs)# - Retourne un objet Pandas (DataFrame ou Series)final_result = result.compute()print(f"\n Temps d'exécution : {time.time() - start:.2f}s")print(f"Type du résultat final : {type(final_result)}")print("\n Résultat :")print(final_result)print("\n CE QU'IL FAUT RETENIR :")print(" → dd.read_csv() : lecture lazy (pas de chargement)")print(" → Opérations (filter, groupby...) : construisent le plan")print(" → compute() : exécute le plan et retourne un Pandas")print(" → Une seule partition en mémoire à la fois !")
Voir le code
import dask.dataframe as ddimport time# Lire TOUS les fichiers avec glob pattern (lazy !)ddf = dd.read_csv("data/dask_demo/*.csv")print("Type:", type(ddf))print(f"Partitions: {ddf.npartitions}")print("\n Les données ne sont PAS encore chargées !")print(ddf) # Affiche le schéma, pas les données
Ultra-simple — parfait pour paralléliser une boucle rapidement Très utilisé en Data Science (sklearn l’utilise en interne)
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗# ║ joblib : Parallélisation ultra-simple ║# ╚══════════════════════════════════════════════════════════════════════════╝## joblib est LA solution la plus simple pour paralléliser une boucle# Très utilisé en Data Science (scikit-learn l'utilise en interne)## Installation : pip install joblibfrom joblib import Parallel, delayedimport timeprint("="*60)print("joblib : Paralléliser une boucle en 1 ligne")print("="*60)# ─────────────────────────────────────────────────────────────────────────────# FONCTION À PARALLÉLISER# ─────────────────────────────────────────────────────────────────────────────def expensive_computation(x):""" Calcul coûteux à paralléliser. Dans la vraie vie : - Entraînement d'un modèle - Transformation de fichier - Calcul de features """ time.sleep(0.1) # Simule un calcul de 100msreturn x **2# Données à traiterdata =list(range(20))# ─────────────────────────────────────────────────────────────────────────────# MÉTHODE 1 : SÉQUENTIEL (classique)# ─────────────────────────────────────────────────────────────────────────────print("\n Méthode 1 : List comprehension (séquentiel)")print("-"*40)start = time.time()results_seq = [expensive_computation(x) for x in data]seq_time = time.time() - startprint(f"Temps : {seq_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# MÉTHODE 2 : JOBLIB (parallèle)# ─────────────────────────────────────────────────────────────────────────────print("\nMéthode 2 : joblib.Parallel (parallèle)")print("-"*40)start = time.time()# ╔══════════════════════════════════════════════════════════════════════════╗# ║ Parallel(n_jobs=-1)(delayed(func)(args) for args in data) ║# ╠══════════════════════════════════════════════════════════════════════════╣# ║ ║# ║ Décomposition : ║# ║ ║# ║ Parallel(n_jobs=-1) → Crée un pool de workers ║# ║ n_jobs=-1 = utiliser TOUS les CPUs ║# ║ n_jobs=4 = utiliser 4 CPUs ║# ║ ║# ║ delayed(func) → Wrapper qui "retarde" l'exécution ║# ║ La fonction n'est pas appelée immédiatement ║# ║ ║# ║ delayed(func)(args) → Prépare l'appel func(args) ║# ║ Retourne un "callable" différé ║# ║ ║# ║ for x in data → Génère tous les appels différés ║# ║ ║# ║ Le tout entre () → Parallel distribue et exécute les appels ║# ║ ║# ╚══════════════════════════════════════════════════════════════════════════╝results_parallel = Parallel(n_jobs=-1)( delayed(expensive_computation)(x) for x in data)parallel_time = time.time() - startprint(f"Temps : {parallel_time:.2f}s")# ─────────────────────────────────────────────────────────────────────────────# COMPARAISON# ─────────────────────────────────────────────────────────────────────────────print("\n"+"="*60)print("COMPARAISON")print("="*60)print(f" Séquentiel : {seq_time:.2f}s")print(f" Parallèle : {parallel_time:.2f}s")print(f" Speedup : {seq_time/parallel_time:.1f}x plus rapide !")print("\n💡 SYNTAXE JOBLIB :")print(" Parallel(n_jobs=-1)(delayed(func)(arg) for arg in data)")print(" │ │ │ │")print(" │ │ │ └─ Données à traiter")print(" │ │ └─ Argument de la fonction")print(" │ └─ Fonction à paralléliser")print(" └─ -1 = tous les CPUs")print("\n🎯 QUAND UTILISER JOBLIB :")print(" Paralléliser une boucle simple")print(" Data Science / ML (cross-validation, grid search)")print(" Quand tu veux quelque chose qui marche vite")print(" Pour du I/O massif (préfère asyncio)")
Voir le code
from joblib import Parallel, delayeddef io_task(x): time.sleep(0.1)return x# Options utilesresults = Parallel( n_jobs=4, # Nombre de workers backend="threading", # "threading" pour I/O, "loky" (défaut) pour CPU verbose=1# Affiche la progression)(delayed(io_task)(x) for x inrange(10))print(f"\n Résultats : {results}")
L’exécution simultanée de plusieurs threads Python
L’utilisation de la mémoire
La lecture de fichiers
💡 Voir la réponse
✅ Réponse : b — Le GIL (Global Interpreter Lock) empêche l’exécution simultanée de plusieurs threads Python, les forçant à s’exécuter en alternance.
❓ Q2. Pour un traitement CPU-intensive, quel outil utiliser ?
ThreadPoolExecutor
ProcessPoolExecutor
asyncio
Tous sont équivalents
💡 Voir la réponse
✅ Réponse : b — ProcessPoolExecutor contourne le GIL en utilisant des processus séparés, permettant une vraie parallélisation CPU.
❓ Q3. Quand utiliser asyncio ?
Pour des calculs mathématiques complexes
Pour télécharger 100+ fichiers depuis une API
Pour trier un gros tableau
Pour compresser des fichiers
💡 Voir la réponse
✅ Réponse : b — asyncio est idéal pour l’I/O massivement parallèle (requêtes API, téléchargements). Les autres sont CPU-bound.
❓ Q4. Que fait ddf.compute() dans Dask ?
Définit le pipeline
Affiche le schéma
Déclenche l’exécution et retourne un DataFrame Pandas
Sauvegarde les données
💡 Voir la réponse
✅ Réponse : c — .compute() déclenche l’exécution du pipeline lazy et retourne le résultat sous forme de DataFrame Pandas.
❓ Q5. Que signifie n_jobs=-1 dans joblib ?
Désactive le parallélisme
Utilise un seul CPU
Utilise tous les CPUs disponibles
Erreur de configuration
💡 Voir la réponse
✅ Réponse : c — n_jobs=-1 indique à joblib d’utiliser tous les CPUs disponibles sur la machine.
❓ Q6. Pourquoi ThreadPoolExecutor ne accélère pas les calculs CPU ?
Parce qu’il est mal implémenté
Parce que le GIL force l’exécution séquentielle des threads Python
Parce qu’il utilise trop de mémoire
Parce qu’il est obsolète
💡 Voir la réponse
✅ Réponse : b — Le GIL empêche les threads Python de s’exécuter en parallèle. Pour du CPU-bound, il faut utiliser des processus.
❓ Q7. Pour traiter un fichier de 50 Go avec une API Pandas-like, quel outil choisir ?
Pandas
Polars
Dask
asyncio
💡 Voir la réponse
✅ Réponse : c — Dask permet de traiter des fichiers plus grands que la RAM avec une API similaire à Pandas.
❓ Q8. Quelle est la première étape avant d’optimiser du code ?
Ajouter du multiprocessing
Réécrire en Rust
Profiler pour identifier le goulot d’étranglement
Utiliser asyncio
💡 Voir la réponse
✅ Réponse : c — “Premature optimization is the root of all evil”. Il faut d’abord mesurer pour savoir où optimiser.
Mini-projet : Pipeline haute performance
Objectif
Combiner plusieurs techniques pour créer un pipeline performant : - ProcessPoolExecutor pour transformation CPU-intensive - Dask pour agrégation - Export Parquet
Tu vas apprendre : - Architecture Spark (Driver, Executors) - RDDs et DataFrames Spark - Optimisations (partitioning, caching) - Spark sur Kubernetes
📝 Récapitulatif de ce module
Outil
Type
Quand l’utiliser
ThreadPoolExecutor
I/O
< 20 requêtes/fichiers
ProcessPoolExecutor
CPU
Calculs, transformations
asyncio
I/O massif
100+ requêtes API
joblib
Simple
Paralléliser une boucle
Dask
Big Data
Fichiers > RAM, API Pandas
🎉 Félicitations ! Tu as terminé le module High Performance Python.
Voir le code
# Nettoyage des fichiers temporaires (optionnel)import shutilimport os# Décommenter pour nettoyer# for folder in ["data/raw", "data/intermediate", "data/processed", "data/dask_demo"]:# if os.path.exists(folder):# shutil.rmtree(folder)# print("🧹 Fichiers temporaires supprimés")
Comment ça fonctionne ?
Voir le code