High Performance Python

Bienvenue dans ce module où tu vas apprendre à accélérer considérablement tes pipelines Python. Tu découvriras comment contourner les limitations du GIL, paralléliser tes traitements, et gérer des fichiers plus grands que ta RAM !


Prérequis

Niveau Compétence
✅ Requis Maîtriser Python (fonctions, classes)
✅ Requis Avoir suivi le module 17_polars_for_data_engineering
💡 Recommandé Connaître Pandas

🎯 Objectifs du module

À la fin de ce module, tu seras capable de :

  • Comprendre le GIL et ses implications sur la performance
  • Profiler ton code pour identifier les vrais goulots d’étranglement
  • Utiliser concurrent.futures pour paralléliser simplement
  • Maîtriser asyncio pour l’I/O massivement parallèle
  • Exploiter Dask pour traiter des fichiers plus grands que la RAM
  • Choisir le bon outil selon ton problème

1. Le GIL : Comprendre la limitation fondamentale

Cette section est essentielle pour comprendre pourquoi certaines techniques fonctionnent et d’autres non.

1.1 Qu’est-ce que le GIL ?

Le Global Interpreter Lock (GIL) est un verrou interne à Python qui empêche deux threads d’exécuter du code Python en même temps.

Pourquoi le GIL existe ?

Python gère la mémoire automatiquement (garbage collector). Sans le GIL, deux threads pourraient modifier le même objet simultanément → corruption de mémoire. Le GIL est une solution simple mais qui limite la performance.

Visualisation du GIL

╔══════════════════════════════════════════════════════════════════════════╗
║                          AVEC LE GIL (threading)                         ║
╠══════════════════════════════════════════════════════════════════════════╣
║                                                                          ║
║  Thread 1: ████████░░░░░░░░████████░░░░░░░░████████░░░░░░░░             ║
║  Thread 2: ░░░░░░░░████████░░░░░░░░████████░░░░░░░░████████             ║
║            ──────────────────────────────────────────────────▶ temps    ║
║                                                                          ║
║  → Un seul thread s'exécute à la fois !                                 ║
║  → Les threads ALTERNENT, ils ne sont pas vraiment parallèles           ║
║  → Pour du calcul CPU : PAS de gain de performance !                    ║
║                                                                          ║
╠══════════════════════════════════════════════════════════════════════════╣
║                       SANS GIL (multiprocessing)                         ║
╠══════════════════════════════════════════════════════════════════════════╣
║                                                                          ║
║  Process 1: ████████████████████████████████████████████████             ║
║  Process 2: ████████████████████████████████████████████████             ║
║             ──────────────────────────────────────────────────▶ temps   ║
║                                                                          ║
║  → VRAIE exécution parallèle sur plusieurs CPUs !                       ║
║  → Chaque process a son propre interpréteur Python                      ║
║  → Pour du calcul CPU : gain de performance proportionnel aux CPUs      ║
║                                                                          ║
╚══════════════════════════════════════════════════════════════════════════╝

1.2 CPU-bound vs I/O-bound : La distinction CRUCIALE

Avant de choisir une technique, tu DOIS savoir si ton problème est CPU-bound ou I/O-bound :

Type C’est quoi ? Exemples Le GIL bloque ? Solution
CPU-bound Le CPU travaille en continu Calculs mathématiques, transformations de données, parsing, compression OUI multiprocessing, ProcessPoolExecutor
I/O-bound Le CPU attend des données externes Requêtes API, lecture fichiers, requêtes base de données NON threading, asyncio

Pourquoi le GIL ne bloque PAS l’I/O ?

Quand Python attend une réponse (réseau, disque, etc.), il relâche le GIL automatiquement :

Thread 1:  [code Python]──▶[attend réseau]──▶[code Python]
                 GIL ↓           ↓ GIL libre     ↓ GIL
                 
Thread 2:  [attend GIL]──▶[code Python]──▶[attend GIL]
                              GIL ↓

→ Pendant que Thread 1 attend le réseau, Thread 2 peut s'exécuter !

1.3 Démonstration : GIL en action

ℹ️ Le savais-tu ?

Le GIL a été introduit dans Python pour simplifier la gestion de la mémoire. Il rend Python thread-safe par défaut, mais au prix de la performance multi-thread.

Des projets comme nogil (Python 3.13+) et subinterpreters travaillent à supprimer ou contourner cette limitation.

En attendant, les Data Engineers utilisent multiprocessing pour contourner le GIL !

Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  DÉMONSTRATION DU GIL : Threading vs Multiprocessing                     ║
# ╚══════════════════════════════════════════════════════════════════════════╝

import time
import threading
from multiprocessing import Process

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION CPU-INTENSIVE
# ─────────────────────────────────────────────────────────────────────────────
# Cette fonction fait des calculs lourds (boucle + opérations mathématiques)
# C'est du "CPU-bound" car le CPU travaille en continu sans attendre

def cpu_intensive(n):
    """
    Tâche CPU-bound : calcul intensif.
    
    Args:
        n: Nombre d'itérations (plus c'est grand, plus c'est long)
    
    Returns:
        La somme des carrés de 0 à n-1
    """
    total = 0
    for i in range(n):
        total += i ** 2  # Opération CPU : élévation au carré
    return total

# Paramètre : 5 millions d'itérations par appel
N = 5_000_000

print("=" * 60)
print("DÉMONSTRATION : Impact du GIL sur les tâches CPU-bound")
print("=" * 60)

# ─────────────────────────────────────────────────────────────────────────────
# TEST 1 : EXÉCUTION SÉQUENTIELLE (baseline)
# ─────────────────────────────────────────────────────────────────────────────
# On exécute la fonction 2 fois, l'une après l'autre
# C'est notre référence pour mesurer le gain des autres méthodes

print("\n Test 1 : Séquentiel (référence)")
start = time.time()

cpu_intensive(N)  # Premier appel
cpu_intensive(N)  # Deuxième appel (attend que le premier finisse)

seq_time = time.time() - start
print(f"   Temps : {seq_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# TEST 2 : THREADING (bloqué par le GIL)
# ─────────────────────────────────────────────────────────────────────────────
# On crée 2 threads qui exécutent cpu_intensive en "parallèle"
# MAIS : le GIL empêche l'exécution simultanée !
# Résultat attendu : temps SIMILAIRE au séquentiel (pas de gain)

print("\n Test 2 : Threading (2 threads)")
start = time.time()

# Créer les threads
t1 = threading.Thread(target=cpu_intensive, args=(N,))  # Thread 1
t2 = threading.Thread(target=cpu_intensive, args=(N,))  # Thread 2

# Démarrer les threads
t1.start()  # Lance Thread 1
t2.start()  # Lance Thread 2 (mais GIL bloque l'exécution simultanée !)

# Attendre que les threads finissent
t1.join()  # Attend Thread 1
t2.join()  # Attend Thread 2

thread_time = time.time() - start
print(f"   Temps : {thread_time:.2f}s")
print(f"   Pas plus rapide ! Le GIL empêche la parallélisation.")

# ─────────────────────────────────────────────────────────────────────────────
# TEST 3 : MULTIPROCESSING (contourne le GIL)
# ─────────────────────────────────────────────────────────────────────────────
# On crée 2 PROCESSUS séparés (pas des threads)
# Chaque processus a son propre interpréteur Python = son propre GIL
# Résultat attendu : temps DIVISÉ PAR 2 (vraie parallélisation)

print("\n Test 3 : Multiprocessing (2 processus)")
start = time.time()

# Créer les processus
p1 = Process(target=cpu_intensive, args=(N,))  # Processus 1
p2 = Process(target=cpu_intensive, args=(N,))  # Processus 2

# Démarrer les processus
p1.start()  # Lance Processus 1 sur CPU 1
p2.start()  # Lance Processus 2 sur CPU 2 (VRAIMENT en parallèle !)

# Attendre que les processus finissent
p1.join()  # Attend Processus 1
p2.join()  # Attend Processus 2

proc_time = time.time() - start
print(f"   Temps : {proc_time:.2f}s")
print(f"    ~2x plus rapide ! Le multiprocessing contourne le GIL.")

# ─────────────────────────────────────────────────────────────────────────────
# RÉSUMÉ
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("RÉSUMÉ")
print("=" * 60)
print(f"   Séquentiel      : {seq_time:.2f}s (référence)")
print(f"   Threading       : {thread_time:.2f}s (speedup : {seq_time/thread_time:.1f}x)")
print(f"   Multiprocessing : {proc_time:.2f}s (speedup : {seq_time/proc_time:.1f}x)")
print()
print("CONCLUSION :")
print("   Pour du CPU-bound → utilise multiprocessing (ou ProcessPoolExecutor)")
print("   Pour de l\'I/O-bound → threading ou asyncio fonctionnent bien")

2. Identifier les goulots : Profiling

⚠️ “Premature optimization is the root of all evil” — Donald Knuth

Avant d’optimiser, il faut mesurer pour identifier le vrai problème.

2.1 Outils de profiling

Outil Usage Comment l’utiliser
%%time Temps d’une cellule Jupyter magic
%%timeit Temps moyen (plusieurs runs) Jupyter magic
cProfile Profiling par fonction python -m cProfile script.py
line_profiler Profiling ligne par ligne @profile decorator
memory_profiler Usage RAM @profile + mprof run
Voir le code
# %%time - mesure le temps d'exécution d'une cellule
import time

def slow_function():
    total = 0
    for i in range(1_000_000):
        total += i
    return total

%time result = slow_function()
Voir le code
# %%timeit - moyenne sur plusieurs exécutions
def fast_function():
    return sum(range(1_000_000))

%timeit fast_function()
Voir le code
import cProfile
import pstats
from io import StringIO

def main_pipeline():
    """Pipeline simulé avec plusieurs étapes"""
    data = list(range(100_000))
    
    # Étape 1 : transformation
    transformed = [x ** 2 for x in data]
    
    # Étape 2 : filtrage
    filtered = [x for x in transformed if x % 2 == 0]
    
    # Étape 3 : agrégation
    result = sum(filtered)
    
    return result

# Profiler le code
profiler = cProfile.Profile()
profiler.enable()

result = main_pipeline()

profiler.disable()

# Afficher les résultats
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream).sort_stats('cumulative')
stats.print_stats(10)
print(stream.getvalue())

3. Stratégies de performance : Vue d’ensemble

Besoin Solution Quand l’utiliser
Multi-CPU (CPU-bound) ProcessPoolExecutor ETL lourd, calculs
I/O parallèle (simple) ThreadPoolExecutor < 20 requêtes/fichiers
I/O parallèle (massif) asyncio 100+ requêtes API
Gros fichiers (> RAM) Polars streaming, Dask 10-100+ Go
Parallélisation simple joblib Boucles, ML

Arbre de décision

Ton problème est...
│
├─▶ CPU-bound (calculs, transformations) ?
│   ├─▶ Simple/boucle → joblib
│   └─▶ Complexe/chunks → ProcessPoolExecutor
│
├─▶ I/O-bound (API, fichiers, DB) ?
│   ├─▶ < 20 requêtes → ThreadPoolExecutor
│   └─▶ 100+ requêtes → asyncio
│
└─▶ Gros fichiers (> RAM) ?
    ├─▶ Single file, Polars-like → Polars streaming
    ├─▶ Multi-files, Pandas-like → Dask
    └─▶ Cluster distribué → Spark (module 19)

4. concurrent.futures — L’API moderne et simple

Recommandé : Plus simple que multiprocessing et threading bruts Interface unifiée : Même API pour threads et processes

4.1 ThreadPoolExecutor : Pour les tâches I/O-bound

Le ThreadPoolExecutor crée un pool de threads réutilisables. C’est idéal pour : - Télécharger plusieurs fichiers - Faire plusieurs requêtes API - Lire/écrire plusieurs fichiers

Comment ça fonctionne ?

╔══════════════════════════════════════════════════════════════════════════╗
║                      ThreadPoolExecutor (5 workers)                       ║
╠══════════════════════════════════════════════════════════════════════════╣
║                                                                          ║
║   Tâches à faire : [T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]            ║
║                                                                          ║
║   Pool de threads :                                                      ║
║   ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐          ║
║   │Thread 1 │ │Thread 2 │ │Thread 3 │ │Thread 4 │ │Thread 5 │          ║
║   │   T1    │ │   T2    │ │   T3    │ │   T4    │ │   T5    │          ║
║   └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘          ║
║        │           │           │           │           │                 ║
║        │ T1 fini   │           │           │           │                 ║
║        ▼           │           │           │           │                 ║
║   ┌─────────┐      │           │           │           │                 ║
║   │Thread 1 │      │           │           │           │                 ║
║   │   T6    │ ◀────┴── Les threads prennent la tâche suivante           ║
║   └─────────┘          dès qu'ils ont fini                              ║
║                                                                          ║
║   → Max 5 tâches en parallèle, les autres attendent                     ║
║                                                                          ║
╚══════════════════════════════════════════════════════════════════════════╝
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  ThreadPoolExecutor : Paralléliser des tâches I/O-bound                  ║
# ╚══════════════════════════════════════════════════════════════════════════╝

from concurrent.futures import ThreadPoolExecutor
import time

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION QUI SIMULE UNE TÂCHE I/O (requête API, lecture fichier, etc.)
# ─────────────────────────────────────────────────────────────────────────────
def simulate_io_task(task_id):
    """
    Simule une tâche I/O-bound (ex: requête API).
    
    Dans la vraie vie, ça pourrait être :
    - requests.get("https://api.example.com/data")
    - open("fichier.csv").read()
    - cursor.execute("SELECT * FROM table")
    
    Args:
        task_id: Identifiant de la tâche (pour le logging)
    
    Returns:
        Message de confirmation
    """
    print(f"    Tâche {task_id} : début")
    time.sleep(0.5)  # Simule 500ms de latence réseau
    print(f"    Tâche {task_id} : terminée")
    return f"Résultat de la tâche {task_id}"

# Liste des tâches à effectuer (10 tâches)
tasks = list(range(10))

print("=" * 60)
print("🔬 ThreadPoolExecutor : Paralléliser des tâches I/O")
print("=" * 60)

# ─────────────────────────────────────────────────────────────────────────────
# MÉTHODE 1 : SÉQUENTIEL (pour comparer)
# ─────────────────────────────────────────────────────────────────────────────
# 10 tâches × 0.5s = 5s au total

print("\n Méthode 1 : Séquentiel")
print("-" * 40)
start = time.time()

results_seq = [simulate_io_task(t) for t in tasks]

seq_time = time.time() - start
print(f"\n Temps séquentiel : {seq_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# MÉTHODE 2 : THREADPOOLEXECUTOR
# ─────────────────────────────────────────────────────────────────────────────
# Avec 5 workers : 10 tâches / 5 workers = 2 vagues = ~1s

print("\n Méthode 2 : ThreadPoolExecutor (5 workers)")
print("-" * 40)
start = time.time()

# Créer un pool de 5 threads
# with ... as : le pool se ferme automatiquement à la fin
with ThreadPoolExecutor(max_workers=5) as executor:
    #
    # executor.map() applique la fonction à chaque élément de la liste
    # - Distribue les tâches aux threads disponibles
    # - Retourne les résultats DANS L'ORDRE de la liste originale
    # - Bloque jusqu'à ce que toutes les tâches soient terminées
    #
    results_parallel = list(executor.map(simulate_io_task, tasks))

parallel_time = time.time() - start
print(f"\n Temps parallèle : {parallel_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# COMPARAISON
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("COMPARAISON")
print("=" * 60)
print(f"   Séquentiel : {seq_time:.2f}s")
print(f"   Parallèle  : {parallel_time:.2f}s")
print(f"   Speedup    : {seq_time/parallel_time:.1f}x plus rapide !")
print()
print("Avec 5 workers pour 10 tâches :")
print("   → 2 vagues de 5 tâches")
print("   → 2 × 0.5s = ~1s au lieu de 10 × 0.5s = 5s")

4.2 ProcessPoolExecutor (CPU-bound)

Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  ProcessPoolExecutor : Paralléliser des tâches CPU-bound                 ║
# ╚══════════════════════════════════════════════════════════════════════════╝

from concurrent.futures import ProcessPoolExecutor
import time
import os

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION CPU-INTENSIVE
# ─────────────────────────────────────────────────────────────────────────────
def cpu_task(n):
    """
    Tâche CPU-bound : calcul intensif.
    
    Cette fonction fait travailler le CPU en continu.
    Exemples réels : parsing, compression, transformations de données,
    calculs mathématiques, feature engineering.
    
    Args:
        n: Nombre d'itérations
    
    Returns:
        Somme des carrés
    """
    return sum(i ** 2 for i in range(n))

# ─────────────────────────────────────────────────────────────────────────────
# DONNÉES À TRAITER
# ─────────────────────────────────────────────────────────────────────────────
# On simule 8 "chunks" de données à traiter
# Chaque chunk nécessite 1 million d'itérations

data_chunks = [1_000_000] * 8  # 8 chunks de 1M itérations chacun

print("=" * 60)
print(" ProcessPoolExecutor : Paralléliser des tâches CPU-bound")
print("=" * 60)
print(f" CPUs disponibles : {os.cpu_count()}")
print(f" Chunks à traiter : {len(data_chunks)}")

# ─────────────────────────────────────────────────────────────────────────────
# MÉTHODE 1 : SÉQUENTIEL
# ─────────────────────────────────────────────────────────────────────────────
print("\n Méthode 1 : Séquentiel")
print("-" * 40)
start = time.time()

results_seq = [cpu_task(chunk) for chunk in data_chunks]

seq_time = time.time() - start
print(f" Temps séquentiel : {seq_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# MÉTHODE 2 : PROCESSPOOL
# ─────────────────────────────────────────────────────────────────────────────
# Chaque worker est un PROCESSUS séparé avec son propre GIL
# → Vraie parallélisation sur plusieurs CPUs !

print("\n Méthode 2 : ProcessPoolExecutor (4 workers)")
print("-" * 40)
start = time.time()

# Créer un pool de 4 processus
# Note : max_workers = nombre de CPUs est généralement optimal
with ProcessPoolExecutor(max_workers=4) as executor:
    #
    # executor.map() distribue les chunks aux processus
    # - Processus 1 traite chunk[0], chunk[4]
    # - Processus 2 traite chunk[1], chunk[5]
    # - etc.
    #
    results_parallel = list(executor.map(cpu_task, data_chunks))

proc_time = time.time() - start
print(f" Temps parallèle : {proc_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# COMPARAISON
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("COMPARAISON")
print("=" * 60)
print(f"   Séquentiel : {seq_time:.2f}s")
print(f"   Parallèle  : {proc_time:.2f}s")
print(f"   Speedup    : {seq_time/proc_time:.1f}x plus rapide !")
print()
print("Explication :")
print("   → 8 chunks avec 4 workers = 2 vagues")
print("   → Chaque processus utilise 100% d'un CPU")
print("   → Speedup théorique max = min(workers, chunks) = 4x")
print()
print("⚠️ IMPORTANT :")
print("   → ProcessPoolExecutor pour CPU-bound (contourne le GIL)")
print("   → ThreadPoolExecutor pour I/O-bound (GIL pas bloquant)")

4.3 Gestion avancée : submit() et as_completed()

Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  submit() et as_completed() : Contrôle avancé des tâches                 ║
# ╚══════════════════════════════════════════════════════════════════════════╝
#
# executor.map() est simple mais limité :
# - Attend que TOUTES les tâches finissent
# - Retourne les résultats dans l'ordre original
#
# submit() + as_completed() permet :
# - Traiter les résultats dès qu'ils arrivent
# - Gérer les erreurs individuellement
# - Ajouter des timeouts par tâche

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION AVEC TEMPS D'EXÉCUTION VARIABLE
# ─────────────────────────────────────────────────────────────────────────────
def fetch_data(task_id):
    """
    Simule une requête avec temps variable.
    Certaines tâches sont rapides, d'autres lentes.
    """
    delay = random.uniform(0.1, 1.0)  # Entre 0.1s et 1s
    time.sleep(delay)
    
    # Simuler une erreur aléatoire (10% de chance)
    if random.random() < 0.1:
        raise Exception(f"Erreur sur la tâche {task_id}")
    
    return {"task_id": task_id, "delay": round(delay, 2)}

print("=" * 60)
print(" submit() et as_completed() : Contrôle avancé")
print("=" * 60)

# ─────────────────────────────────────────────────────────────────────────────
# UTILISATION DE submit() + as_completed()
# ─────────────────────────────────────────────────────────────────────────────
with ThreadPoolExecutor(max_workers=5) as executor:
    
    # ─────────────────────────────────────────────────────────────────────
    # ÉTAPE 1 : Soumettre les tâches avec submit()
    # ─────────────────────────────────────────────────────────────────────
    # submit() retourne un objet Future (promesse de résultat futur)
    # On crée un dictionnaire {future: task_id} pour identifier les tâches
    
    futures = {}
    for i in range(5):
        future = executor.submit(fetch_data, i)  # Soumet la tâche
        futures[future] = i  # Associe le future à l'ID de la tâche
    
    print(f"\n {len(futures)} tâches soumises")
    print("-" * 40)
    
    # ─────────────────────────────────────────────────────────────────────
    # ÉTAPE 2 : Traiter les résultats avec as_completed()
    # ─────────────────────────────────────────────────────────────────────
    # as_completed() itère sur les futures dans l'ORDRE DE COMPLÉTION
    # (pas l'ordre de soumission !)
    # → La première tâche terminée est traitée en premier
    
    for future in as_completed(futures):
        task_id = futures[future]  # Récupérer l'ID de la tâche
        
        try:
            # future.result() récupère le résultat
            # timeout=5 : lève TimeoutError si > 5 secondes
            result = future.result(timeout=5)
            print(f" Tâche {task_id} terminée en {result['delay']}s")
            
        except Exception as e:
            # Gestion individuelle des erreurs
            print(f"❌ Tâche {task_id} a échoué : {e}")

print("\n AVANTAGES de submit() + as_completed() :")
print("   → Traiter les résultats dès qu'ils arrivent")
print("   → Gérer les erreurs individuellement")
print("   → Ajouter des timeouts par tâche")
print("   → Plus de contrôle que executor.map()")

4.4 Quand utiliser quoi ?

Executor GIL contourné ? Usage Exemple
ThreadPoolExecutor ❌ Non I/O : API, fichiers Télécharger 50 fichiers
ProcessPoolExecutor ✅ Oui CPU : calculs, ETL Transformer 8 chunks de données

🔧 5. multiprocessing — Contrôle avancé

Pour les cas où concurrent.futures ne suffit pas.

5.1 Pool avec map

Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  multiprocessing.Pool : Traitement parallèle avec partitioning           ║
# ╚══════════════════════════════════════════════════════════════════════════╝
#
# Pool est utile quand tu veux :
# - Partitionner des données en chunks
# - Traiter chaque chunk en parallèle
# - Combiner les résultats

from multiprocessing import Pool, cpu_count
import numpy as np

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION DE TRAITEMENT D'UN CHUNK
# ─────────────────────────────────────────────────────────────────────────────
def process_chunk(chunk):
    """
    Traite un chunk de données.
    
    Dans la vraie vie :
    - Transformation de données
    - Feature engineering
    - Calculs statistiques
    
    Args:
        chunk: numpy array (portion des données)
    
    Returns:
        Résultat du traitement (ici : somme des carrés)
    """
    return np.sum(chunk ** 2)

print("=" * 60)
print(" multiprocessing.Pool : Partitionner et paralléliser")
print("=" * 60)

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 1 : Créer les données
# ─────────────────────────────────────────────────────────────────────────────
big_array = np.random.rand(1_000_000)  # 1 million de valeurs
print(f"\n📊 Données : {len(big_array):,} valeurs")

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 2 : Partitionner en chunks
# ─────────────────────────────────────────────────────────────────────────────
# On divise les données en autant de chunks que de CPUs
# Chaque CPU traitera un chunk

n_workers = cpu_count()
chunks = np.array_split(big_array, n_workers)

print(f" CPUs disponibles : {n_workers}")
print(f" Chunks créés : {len(chunks)}")
print(f"   Taille de chaque chunk : ~{len(chunks[0]):,} valeurs")

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 3 : Traitement parallèle avec Pool
# ─────────────────────────────────────────────────────────────────────────────
print("\n Traitement parallèle...")

# Pool() crée un pool de processus réutilisables
# with ... as : le pool se ferme automatiquement à la fin
with Pool(processes=n_workers) as pool:
    #
    # pool.map() applique process_chunk à chaque chunk
    # - Les chunks sont distribués aux processus disponibles
    # - Chaque processus traite son chunk indépendamment
    # - Les résultats sont retournés dans l'ordre des chunks
    #
    results = pool.map(process_chunk, chunks)

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 4 : Combiner les résultats
# ─────────────────────────────────────────────────────────────────────────────
# Chaque processus a calculé la somme des carrés de son chunk
# On additionne tous les résultats partiels

total = sum(results)
print(f"\n Résultat total : {total:.2f}")

print("\n PATTERN CLASSIQUE : Map-Reduce")
print("   1. SPLIT : Diviser les données en chunks")
print("   2. MAP : Traiter chaque chunk en parallèle")
print("   3. REDUCE : Combiner les résultats (ici: sum)")

5.2 starmap pour plusieurs arguments

Voir le code
from multiprocessing import Pool

def process_with_params(data, multiplier, offset):
    """Fonction avec plusieurs paramètres"""
    return sum(data) * multiplier + offset

# Préparer les arguments
args_list = [
    ([1, 2, 3], 2, 10),
    ([4, 5, 6], 3, 20),
    ([7, 8, 9], 4, 30),
]

with Pool(3) as pool:
    results = pool.starmap(process_with_params, args_list)

print("Résultats:", results)

5.3 Limites et précautions

⚠️ Limite Explication
Overhead Créer des process prend du temps (~100ms)
Sérialisation Les données sont copiées (pickle)
if __name__ == "__main__" Obligatoire sur Windows
Mémoire Chaque process a sa propre mémoire
# ⚠️ Toujours protéger avec if __name__ == "__main__"
if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(my_func, data)

6. asyncio — I/O massivement parallèle

Idéal pour : 100+ requêtes API, ingestion massive, crawling web ❌ Pas pour : Calculs CPU-intensive

6.1 Comprendre async/await

asyncio utilise un modèle single-thread non-bloquant. Au lieu de créer plusieurs threads, un seul thread gère plusieurs tâches en switchant entre elles quand l’une attend.

Comment ça fonctionne ?

╔══════════════════════════════════════════════════════════════════════════╗
║                         asyncio : Single thread                          ║
╠══════════════════════════════════════════════════════════════════════════╣
║                                                                          ║
║  UN SEUL THREAD gère plusieurs tâches :                                 ║
║                                                                          ║
║  Tâche 1: [code]──▶[await: attend API]──────────────▶[code]──▶ done    ║
║                           ↓                              ↑               ║
║                    Le thread switch                      │               ║
║                           ↓                              │               ║
║  Tâche 2: [attend]──▶[code]──▶[await: attend DB]────────│──▶ done      ║
║                                      ↓                  │               ║
║                               Le thread switch          │               ║
║                                      ↓                  │               ║
║  Tâche 3: [attend]─────────────▶[code]──▶[await]────────┴──▶ done      ║
║                                                                          ║
║  → Le thread ne reste jamais "bloqué" à attendre                        ║
║  → Dès qu'une tâche attend, il passe à une autre                        ║
║  → Très efficace pour l'I/O : un seul thread peut gérer 1000+ requêtes  ║
║                                                                          ║
╚══════════════════════════════════════════════════════════════════════════╝

6.2 Les mots-clés async/await

Mot-clé Ce que ça fait Quand l’utiliser
async def Déclare une fonction coroutine Fonctions qui font de l’I/O asynchrone
await Attend le résultat d’une coroutine Quand tu appelles une fonction async
asyncio.gather() Lance plusieurs coroutines en parallèle Pour paralléliser des tâches
asyncio.run() Point d’entrée pour exécuter du code async Dans un script (pas dans Jupyter)
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  asyncio : Comprendre async/await                                        ║
# ╚══════════════════════════════════════════════════════════════════════════╝

import asyncio
import time

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION ASYNCHRONE (coroutine)
# ─────────────────────────────────────────────────────────────────────────────
# "async def" crée une COROUTINE, pas une fonction normale
# Une coroutine peut être "mise en pause" avec await

async def fetch_data(task_id):
    """
    Simule une requête API asynchrone.
    
    Dans la vraie vie, ça serait :
    - async with aiohttp.ClientSession() as session:
    -     async with session.get(url) as response:
    -         return await response.json()
    
    Args:
        task_id: Identifiant de la tâche
    
    Returns:
        Résultat simulé
    """
    print(f"   Tâche {task_id} : début")
    
    # await asyncio.sleep() = attente NON-BLOQUANTE
    # Pendant que cette tâche attend, d'autres tâches peuvent s'exécuter !
    # C'est LA différence avec time.sleep() qui bloque tout
    await asyncio.sleep(1)  # Simule 1s de latence réseau
    
    print(f"   Tâche {task_id} : terminée")
    return f"result_{task_id}"

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION PRINCIPALE ASYNCHRONE
# ─────────────────────────────────────────────────────────────────────────────
async def main():
    """
    Fonction principale qui orchestre les tâches.
    
    asyncio.gather() lance plusieurs coroutines EN PARALLÈLE
    et attend que TOUTES soient terminées.
    """
    print("=" * 60)
    print("asyncio : Lancer 5 tâches en parallèle")
    print("=" * 60)
    
    # Créer une liste de coroutines (pas encore exécutées !)
    # Note : fetch_data(i) retourne un objet coroutine, pas le résultat
    tasks = [fetch_data(i) for i in range(5)]
    
    print(f"\n {len(tasks)} tâches créées")
    print("=" * 60)
    
    # asyncio.gather() lance TOUTES les tâches en parallèle
    # - Les 5 tâches démarrent en même temps
    # - Chaque tâche attend 1s (await asyncio.sleep(1))
    # - MAIS elles attendent en parallèle !
    # - Temps total : ~1s au lieu de 5s
    results = await asyncio.gather(*tasks)
    
    # *tasks "déballe" la liste : gather(task[0], task[1], task[2], ...)
    
    return results

# ─────────────────────────────────────────────────────────────────────────────
# EXÉCUTION
# ─────────────────────────────────────────────────────────────────────────────
# Dans Jupyter : await main() fonctionne directement
# Dans un script Python normal : asyncio.run(main())

start = time.time()
results = await main()  # Jupyter permet d'utiliser await directement
total_time = time.time() - start

print("=" * 60)
print("RÉSULTAT")
print("=" * 60)
print(f" Temps total : {total_time:.2f}s")
print(f" Résultats : {results}")
print()
print(" EXPLICATION :")
print("   → 5 tâches de 1s chacune")
print("   → En séquentiel : 5 × 1s = 5s")
print("   → En parallèle (asyncio) : ~1s (toutes en même temps)")
print()
print("⚠️ IMPORTANT :")
print("   → await asyncio.sleep() ≠ time.sleep()")
print("   → asyncio.sleep() libère le thread pour d'autres tâches")
print("   → time.sleep() bloque TOUT le programme")

6.2 Exemple réel avec aiohttp

Voir le code
# Installation : pip install aiohttp
import asyncio

# Simulons aiohttp pour l'exemple (sans vraies requêtes)
async def fetch_url(session, url):
    """Simule une requête HTTP"""
    await asyncio.sleep(0.1)  # Simule latence
    return {"url": url, "status": 200}

async def fetch_all_urls(urls):
    """Fetch toutes les URLs en parallèle"""
    session = None  # En vrai : async with aiohttp.ClientSession() as session:
    
    tasks = [fetch_url(session, url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# Simuler 20 URLs
urls = [f"https://api.example.com/data/{i}" for i in range(20)]

start = time.time()
results = await fetch_all_urls(urls)
print(f" 20 requêtes en {time.time() - start:.2f}s")
print(f" Succès : {len([r for r in results if isinstance(r, dict)])}")

6.3 Semaphore : limiter les connexions simultanées

Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  Semaphore : Limiter le nombre de connexions simultanées                 ║
# ╚══════════════════════════════════════════════════════════════════════════╝
#
# PROBLÈME : Tu veux faire 100 requêtes API, mais l'API limite à 5
#            connexions simultanées (rate limiting).
#
# SOLUTION : Semaphore = un compteur qui limite les accès concurrents

import asyncio
import time

print("=" * 60)
print("🔬 Semaphore : Limiter la concurrence")
print("=" * 60)

# ─────────────────────────────────────────────────────────────────────────────
# CRÉER UN SEMAPHORE
# ─────────────────────────────────────────────────────────────────────────────
# Semaphore(5) = maximum 5 tâches peuvent s'exécuter en même temps
# Les autres attendent qu'une place se libère

MAX_CONCURRENT = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT)

print(f"\n🚦 Semaphore créé : max {MAX_CONCURRENT} tâches simultanées")

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION AVEC SEMAPHORE
# ─────────────────────────────────────────────────────────────────────────────
async def fetch_limited(task_id):
    """
    Fait une requête en respectant la limite de concurrence.
    
    async with semaphore :
    - Tente d'acquérir le semaphore (décrémente le compteur)
    - Si compteur = 0, ATTEND qu'une tâche se termine
    - À la sortie du with, libère le semaphore (incrémente le compteur)
    """
    # async with semaphore : attend si déjà 5 tâches en cours
    async with semaphore:
        print(f"   Tâche {task_id:2d} démarre (slot acquis)")
        await asyncio.sleep(0.5)  # Simule la requête
        print(f"   Tâche {task_id:2d} terminée (slot libéré)")
        return task_id

# ─────────────────────────────────────────────────────────────────────────────
# EXÉCUTION : 15 tâches avec max 5 simultanées
# ─────────────────────────────────────────────────────────────────────────────
async def main():
    print("\n" + "-" * 40)
    print(f" Lancement de 15 tâches (max {MAX_CONCURRENT} simultanées)")
    print("-" * 40 + "\n")
    
    # Créer 15 tâches
    tasks = [fetch_limited(i) for i in range(15)]
    
    # Lancer toutes les tâches
    # MAIS le semaphore limite à 5 simultanées !
    results = await asyncio.gather(*tasks)
    
    return results

start = time.time()
results = await main()
total_time = time.time() - start

print("\n" + "=" * 60)
print("RÉSULTAT")
print("=" * 60)
print(f"Temps total : {total_time:.2f}s")
print(f"Tâches complétées : {len(results)}")

print("\n EXPLICATION :")
print(f"   → 15 tâches de 0.5s chacune")
print(f"   → Max {MAX_CONCURRENT} simultanées = 3 vagues")
print(f"   → 3 vagues × 0.5s = ~1.5s (au lieu de 7.5s séquentiel)")
print("\n USE CASES :")
print("   → Rate limiting API (ex: max 10 req/s)")
print("   → Limiter les connexions DB")
print("   → Éviter de surcharger un service")

6.4 Quand NE PAS utiliser asyncio

Situation asyncio efficace ? Alternative
100+ appels API Oui -
Lecture S3/DB massives Oui -
Calculs CPU Non ProcessPoolExecutor
5 requêtes simples ❌ Overkill ThreadPoolExecutor
Code synchrone existant ❌ Refactoring lourd ThreadPoolExecutor

7. Dask — Traiter des fichiers plus grands que la RAM

Le plus utile quand tes données ne tiennent pas en mémoire.

7.1 Pourquoi Dask ?

Problème Solution classique Solution Dask
Fichier de 50 Go MemoryError ! ✅ Traité par chunks
1000 fichiers CSV Boucle lente ✅ Parallélisé automatiquement
Besoin d’apprendre une nouvelle API 😫 ✅ API quasi-identique à Pandas

7.2 Comment Dask fonctionne ?

╔══════════════════════════════════════════════════════════════════════════╗
║                    Dask : Lazy Evaluation + Partitions                   ║
╠══════════════════════════════════════════════════════════════════════════╣
║                                                                          ║
║  ÉTAPE 1 : Lecture (LAZY - pas de chargement en mémoire !)              ║
║  ─────────────────────────────────────────────────────────               ║
║                                                                          ║
║  ddf = dd.read_csv("data/*.csv")                                        ║
║                                                                          ║
║  → Dask SCANNE les fichiers (schéma, taille)                            ║
║  → Mais NE CHARGE PAS les données !                                     ║
║  → Crée un "plan d'exécution"                                           ║
║                                                                          ║
║  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐       ║
║  │ Partition 1 │ │ Partition 2 │ │ Partition 3 │ │ Partition 4 │       ║
║  │ (fichier 1) │ │ (fichier 2) │ │ (fichier 3) │ │ (fichier 4) │       ║
║  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘       ║
║                                                                          ║
║  ÉTAPE 2 : Opérations (LAZY - construit le plan)                        ║
║  ────────────────────────────────────────────────                        ║
║                                                                          ║
║  result = ddf.groupby("col").sum()   # Pas encore exécuté !             ║
║                                                                          ║
║  ÉTAPE 3 : compute() (EXÉCUTION - traitement réel)                      ║
║  ─────────────────────────────────────────────────                       ║
║                                                                          ║
║  result.compute()                                                        ║
║                                                                          ║
║  → Dask charge partition 1, la traite, la décharge                      ║
║  → Puis partition 2, etc.                                               ║
║  → Seule 1 partition en mémoire à la fois !                             ║
║  → Utilise tous les CPUs pour traiter les partitions en parallèle       ║
║                                                                          ║
╚══════════════════════════════════════════════════════════════════════════╝
Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  Dask DataFrame : Traiter des fichiers plus grands que la RAM            ║
# ╚══════════════════════════════════════════════════════════════════════════╝

# Installation : pip install "dask[complete]"
import dask.dataframe as dd
import pandas as pd
import os
import time

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 0 : Créer des fichiers de test
# ─────────────────────────────────────────────────────────────────────────────
print("=" * 60)
print("Création des fichiers de test")
print("=" * 60)

os.makedirs("data/dask_demo", exist_ok=True)

for i in range(5):
    df = pd.DataFrame({
        "id": range(i * 10000, (i + 1) * 10000),
        "category": [f"cat_{j % 5}" for j in range(10000)],
        "amount": [float(j % 1000) for j in range(10000)]
    })
    df.to_csv(f"data/dask_demo/file_{i}.csv", index=False)

print("5 fichiers CSV créés (50,000 lignes au total)")

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 1 : Lecture LAZY avec Dask
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("Étape 1 : Lecture avec Dask (LAZY)")
print("=" * 60)

# dd.read_csv() avec pattern glob : lit TOUS les fichiers correspondants
# IMPORTANT : les données ne sont PAS chargées en mémoire !
ddf = dd.read_csv("data/dask_demo/*.csv")

print(f"\nType de l'objet : {type(ddf)}")
print(f"Nombre de partitions : {ddf.npartitions}")
print(f"   → Chaque fichier = 1 partition")
print(f"   → Les partitions seront traitées indépendamment")

print("\n Schéma des données (sans les charger) :")
print(ddf)

print("\n⚠️ IMPORTANT : À ce stade, les données ne sont PAS en mémoire !")
print("   Dask a juste lu les en-têtes et créé un plan d'exécution.")

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 2 : Pipeline de transformations (LAZY)
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("Étape 2 : Pipeline de transformations (LAZY)")
print("=" * 60)

# Toutes ces opérations sont LAZY : rien n'est calculé !
# Dask construit un graphe d'exécution (DAG) optimisé

result = (
    ddf
    # Filtrer : garder seulement les montants > 100
    .query("amount > 100")
    
    # Créer une nouvelle colonne
    .assign(amount_doubled=ddf.amount * 2)
    
    # Grouper par catégorie et calculer la somme
    .groupby("category")
    .amount.sum()
)

print("Pipeline défini :")
print("   1. Filtrer amount > 100")
print("   2. Créer colonne amount_doubled")
print("   3. GroupBy category + sum")
print("\n Toujours LAZY ! Rien n'est calculé.")
print(f"Type du résultat : {type(result)}")

# ─────────────────────────────────────────────────────────────────────────────
# ÉTAPE 3 : Exécution avec compute()
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("Étape 3 : Exécution avec compute()")
print("=" * 60)

start = time.time()

# compute() déclenche l'exécution RÉELLE du pipeline
# - Dask optimise le plan d'exécution
# - Traite les partitions en parallèle (utilise tous les CPUs)
# - Retourne un objet Pandas (DataFrame ou Series)
final_result = result.compute()

print(f"\n Temps d'exécution : {time.time() - start:.2f}s")
print(f"Type du résultat final : {type(final_result)}")
print("\n Résultat :")
print(final_result)

print("\n CE QU'IL FAUT RETENIR :")
print("   → dd.read_csv() : lecture lazy (pas de chargement)")
print("   → Opérations (filter, groupby...) : construisent le plan")
print("   → compute() : exécute le plan et retourne un Pandas")
print("   → Une seule partition en mémoire à la fois !")
Voir le code
import dask.dataframe as dd
import time

# Lire TOUS les fichiers avec glob pattern (lazy !)
ddf = dd.read_csv("data/dask_demo/*.csv")

print("Type:", type(ddf))
print(f"Partitions: {ddf.npartitions}")
print("\n Les données ne sont PAS encore chargées !")
print(ddf)  # Affiche le schéma, pas les données
Voir le code
# Pipeline Dask (lazy)
result = (
    ddf
    .filter(ddf.amount > 100)  # Filtrage
    .assign(amount_doubled=ddf.amount * 2)  # Transformation
    .groupby("category")  # Agrégation
    .amount.sum()
)

print("Pipeline défini (lazy) :")
print(result)

# Exécuter avec compute()
start = time.time()
final_result = result.compute()
print(f"\n Exécution : {time.time() - start:.2f}s")
print("\n Résultat :")
print(final_result)

7.2 Dask vs Polars vs Spark

Aspect Polars Dask Spark
Single machine ⭐⭐⭐ ⭐⭐
Cluster ⭐⭐ ⭐⭐⭐
API Propre Pandas-like Propre
Setup Simple Simple Complexe
Vitesse (single node) ⭐⭐⭐ ⭐⭐
Écosystème Nouveau Mature Très mature

7.3 Quand utiliser Dask ?

  • ✅ Fichiers > RAM mais < 100 Go
  • ✅ Tu connais déjà Pandas
  • ✅ Pas besoin d’un cluster Spark
  • ✅ Traitement multi-fichiers
  • ❌ Si single file < 10 Go → utilise Polars
  • ❌ Si > 100 Go ou cluster → utilise Spark

8. joblib — Parallélisation simple

Ultra-simple — parfait pour paralléliser une boucle rapidement Très utilisé en Data Science (sklearn l’utilise en interne)

Voir le code
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  joblib : Parallélisation ultra-simple                                   ║
# ╚══════════════════════════════════════════════════════════════════════════╝
#
# joblib est LA solution la plus simple pour paralléliser une boucle
# Très utilisé en Data Science (scikit-learn l'utilise en interne)
#
# Installation : pip install joblib

from joblib import Parallel, delayed
import time

print("=" * 60)
print("joblib : Paralléliser une boucle en 1 ligne")
print("=" * 60)

# ─────────────────────────────────────────────────────────────────────────────
# FONCTION À PARALLÉLISER
# ─────────────────────────────────────────────────────────────────────────────
def expensive_computation(x):
    """
    Calcul coûteux à paralléliser.
    
    Dans la vraie vie :
    - Entraînement d'un modèle
    - Transformation de fichier
    - Calcul de features
    """
    time.sleep(0.1)  # Simule un calcul de 100ms
    return x ** 2

# Données à traiter
data = list(range(20))

# ─────────────────────────────────────────────────────────────────────────────
# MÉTHODE 1 : SÉQUENTIEL (classique)
# ─────────────────────────────────────────────────────────────────────────────
print("\n Méthode 1 : List comprehension (séquentiel)")
print("-" * 40)

start = time.time()
results_seq = [expensive_computation(x) for x in data]
seq_time = time.time() - start

print(f"Temps : {seq_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# MÉTHODE 2 : JOBLIB (parallèle)
# ─────────────────────────────────────────────────────────────────────────────
print("\nMéthode 2 : joblib.Parallel (parallèle)")
print("-" * 40)

start = time.time()

# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  Parallel(n_jobs=-1)(delayed(func)(args) for args in data)               ║
# ╠══════════════════════════════════════════════════════════════════════════╣
# ║                                                                          ║
# ║  Décomposition :                                                         ║
# ║                                                                          ║
# ║  Parallel(n_jobs=-1)     → Crée un pool de workers                      ║
# ║                            n_jobs=-1 = utiliser TOUS les CPUs           ║
# ║                            n_jobs=4 = utiliser 4 CPUs                   ║
# ║                                                                          ║
# ║  delayed(func)           → Wrapper qui "retarde" l'exécution            ║
# ║                            La fonction n'est pas appelée immédiatement   ║
# ║                                                                          ║
# ║  delayed(func)(args)     → Prépare l'appel func(args)                   ║
# ║                            Retourne un "callable" différé                ║
# ║                                                                          ║
# ║  for x in data           → Génère tous les appels différés              ║
# ║                                                                          ║
# ║  Le tout entre ()        → Parallel distribue et exécute les appels     ║
# ║                                                                          ║
# ╚══════════════════════════════════════════════════════════════════════════╝

results_parallel = Parallel(n_jobs=-1)(
    delayed(expensive_computation)(x) for x in data
)

parallel_time = time.time() - start
print(f"Temps : {parallel_time:.2f}s")

# ─────────────────────────────────────────────────────────────────────────────
# COMPARAISON
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("COMPARAISON")
print("=" * 60)
print(f"   Séquentiel : {seq_time:.2f}s")
print(f"   Parallèle  : {parallel_time:.2f}s")
print(f"   Speedup    : {seq_time/parallel_time:.1f}x plus rapide !")

print("\n💡 SYNTAXE JOBLIB :")
print("   Parallel(n_jobs=-1)(delayed(func)(arg) for arg in data)")
print("         │                   │      │         │")
print("         │                   │      │         └─ Données à traiter")
print("         │                   │      └─ Argument de la fonction")
print("         │                   └─ Fonction à paralléliser")
print("         └─ -1 = tous les CPUs")

print("\n🎯 QUAND UTILISER JOBLIB :")
print("   Paralléliser une boucle simple")
print("   Data Science / ML (cross-validation, grid search)")
print("   Quand tu veux quelque chose qui marche vite")
print("   Pour du I/O massif (préfère asyncio)")
Voir le code
from joblib import Parallel, delayed

def io_task(x):
    time.sleep(0.1)
    return x

# Options utiles
results = Parallel(
    n_jobs=4,              # Nombre de workers
    backend="threading",   # "threading" pour I/O, "loky" (défaut) pour CPU
    verbose=1              # Affiche la progression
)(delayed(io_task)(x) for x in range(10))

print(f"\n Résultats : {results}")

🌳 9. Choisir la bonne technologie

Récapitulatif

Outil Type GIL contourné Complexité Use case
ThreadPoolExecutor I/O ❌ Non < 20 requêtes/fichiers
ProcessPoolExecutor CPU ✅ Oui ⭐⭐ ETL, calculs
asyncio I/O ❌ Non ⭐⭐⭐ 100+ requêtes API
joblib CPU/I/O ✅ (loky) Boucles simples, ML
Dask Big Data ✅ Oui ⭐⭐ Fichiers > RAM

🖼️ Arbre de décision visuel

                    ┌─────────────────┐
                    │  Quel problème? │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────────┐
    │CPU-bound│        │I/O-bound│        │Fichiers >RAM│
    └────┬────┘        └────┬────┘        └──────┬──────┘
         │                  │                    │
    ┌────┴────┐        ┌────┴────┐         ┌────┴────┐
    │         │        │         │         │         │
    ▼         ▼        ▼         ▼         ▼         ▼
 Simple?  Complex?  <20 req?  100+ req?  <100Go?  >100Go?
    │         │        │         │         │         │
    ▼         ▼        ▼         ▼         ▼         ▼
 joblib   Process   Thread    asyncio    Dask     Spark
          Pool      Pool                        (mod 19)

10. Bonnes pratiques & Erreurs fréquentes

❌ Erreurs fréquentes

Erreur Pourquoi c’est faux Solution
Threading pour CPU GIL bloque ProcessPoolExecutor
Async pour calculs Pas de gain ProcessPoolExecutor
Pandas sur 50 Go Crash RAM Dask ou Polars streaming
100 workers pour 10 tâches Overhead inutile Adapter au workload
Pas de profiling Optimise au hasard Toujours profiler d’abord
Oublier if __name__ Crash sur Windows Toujours protéger

✅ Bonnes pratiques

Pratique Pourquoi
Profiler d’abord Identifier le vrai goulot
Écrire en Parquet I/O plus rapide
Partitionner intelligemment Évite surcharge mémoire
Tester avec peu de workers Puis augmenter progressivement
if __name__ == "__main__" Obligatoire pour multiprocessing
Utiliser n_jobs=-1 Utilise tous les CPUs disponibles

Quiz de fin de module


❓ Q1. Qu’est-ce que le GIL empêche en Python ?

  1. L’exécution de code Python
  2. L’exécution simultanée de plusieurs threads Python
  3. L’utilisation de la mémoire
  4. La lecture de fichiers
💡 Voir la réponse

Réponse : b — Le GIL (Global Interpreter Lock) empêche l’exécution simultanée de plusieurs threads Python, les forçant à s’exécuter en alternance.


❓ Q2. Pour un traitement CPU-intensive, quel outil utiliser ?

  1. ThreadPoolExecutor
  2. ProcessPoolExecutor
  3. asyncio
  4. Tous sont équivalents
💡 Voir la réponse

Réponse : bProcessPoolExecutor contourne le GIL en utilisant des processus séparés, permettant une vraie parallélisation CPU.


❓ Q3. Quand utiliser asyncio ?

  1. Pour des calculs mathématiques complexes
  2. Pour télécharger 100+ fichiers depuis une API
  3. Pour trier un gros tableau
  4. Pour compresser des fichiers
💡 Voir la réponse

Réponse : basyncio est idéal pour l’I/O massivement parallèle (requêtes API, téléchargements). Les autres sont CPU-bound.


❓ Q4. Que fait ddf.compute() dans Dask ?

  1. Définit le pipeline
  2. Affiche le schéma
  3. Déclenche l’exécution et retourne un DataFrame Pandas
  4. Sauvegarde les données
💡 Voir la réponse

Réponse : c.compute() déclenche l’exécution du pipeline lazy et retourne le résultat sous forme de DataFrame Pandas.


❓ Q5. Que signifie n_jobs=-1 dans joblib ?

  1. Désactive le parallélisme
  2. Utilise un seul CPU
  3. Utilise tous les CPUs disponibles
  4. Erreur de configuration
💡 Voir la réponse

Réponse : cn_jobs=-1 indique à joblib d’utiliser tous les CPUs disponibles sur la machine.


❓ Q6. Pourquoi ThreadPoolExecutor ne accélère pas les calculs CPU ?

  1. Parce qu’il est mal implémenté
  2. Parce que le GIL force l’exécution séquentielle des threads Python
  3. Parce qu’il utilise trop de mémoire
  4. Parce qu’il est obsolète
💡 Voir la réponse

Réponse : b — Le GIL empêche les threads Python de s’exécuter en parallèle. Pour du CPU-bound, il faut utiliser des processus.


❓ Q7. Pour traiter un fichier de 50 Go avec une API Pandas-like, quel outil choisir ?

  1. Pandas
  2. Polars
  3. Dask
  4. asyncio
💡 Voir la réponse

Réponse : c — Dask permet de traiter des fichiers plus grands que la RAM avec une API similaire à Pandas.


❓ Q8. Quelle est la première étape avant d’optimiser du code ?

  1. Ajouter du multiprocessing
  2. Réécrire en Rust
  3. Profiler pour identifier le goulot d’étranglement
  4. Utiliser asyncio
💡 Voir la réponse

Réponse : c — “Premature optimization is the root of all evil”. Il faut d’abord mesurer pour savoir où optimiser.


Mini-projet : Pipeline haute performance

Objectif

Combiner plusieurs techniques pour créer un pipeline performant : - ProcessPoolExecutor pour transformation CPU-intensive - Dask pour agrégation - Export Parquet

Architecture

data/raw/*.csv
      │
      ▼
┌─────────────────────┐
│ ProcessPoolExecutor │  Transformation parallèle (CPU)
│  - Nettoyage        │
│  - Feature eng.     │
└──────────┬──────────┘
           │
           ▼
data/intermediate/*.csv
           │
           ▼
┌─────────────────────┐
│   Dask DataFrame    │  Agrégation (multi-fichiers)
└──────────┬──────────┘
           │
           ▼
data/processed/result.parquet
Voir le code
# Setup : créer les données de test
import pandas as pd
import numpy as np
import os

os.makedirs("data/raw", exist_ok=True)
os.makedirs("data/intermediate", exist_ok=True)
os.makedirs("data/processed", exist_ok=True)

# Créer 10 fichiers CSV (simule des logs)
np.random.seed(42)
categories = ["web", "api", "db", "cache", "auth"]
statuses = ["success", "error", "timeout"]

for i in range(10):
    n_rows = 10000
    df = pd.DataFrame({
        "timestamp": pd.date_range("2024-01-01", periods=n_rows, freq="s"),
        "category": np.random.choice(categories, n_rows),
        "status": np.random.choice(statuses, n_rows, p=[0.8, 0.15, 0.05]),
        "response_time_ms": np.random.exponential(100, n_rows),
        "bytes_sent": np.random.randint(100, 10000, n_rows)
    })
    df.to_csv(f"data/raw/logs_{i:02d}.csv", index=False)

print("10 fichiers CSV créés (100,000 lignes au total)")
Voir le code
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import numpy as np
import glob
import time

def transform_file(filepath):
    """
    Transformation CPU-intensive d'un fichier.
    - Nettoyage
    - Feature engineering
    - Export intermédiaire
    """
    # Lire
    df = pd.read_csv(filepath)
    
    # Nettoyage : filtrer les timeouts extrêmes
    df = df[df["response_time_ms"] < 10000]
    
    # Feature engineering (CPU-intensive)
    df["response_time_log"] = np.log1p(df["response_time_ms"])
    df["is_error"] = (df["status"] != "success").astype(int)
    df["throughput"] = df["bytes_sent"] / (df["response_time_ms"] + 1)
    
    # Extraction date
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["hour"] = df["timestamp"].dt.hour
    df["day_of_week"] = df["timestamp"].dt.dayofweek
    
    # Export intermédiaire
    output_path = filepath.replace("raw", "intermediate")
    df.to_csv(output_path, index=False)
    
    return output_path

# Liste des fichiers
input_files = sorted(glob.glob("data/raw/*.csv"))
print(f"📁 {len(input_files)} fichiers à traiter")

# ============ TRANSFORMATION PARALLÈLE ============
start = time.time()

with ProcessPoolExecutor(max_workers=4) as executor:
    output_files = list(executor.map(transform_file, input_files))

print(f"\n⏱️ Transformation : {time.time() - start:.2f}s")
print(f"{len(output_files)} fichiers transformés")
Voir le code
import dask.dataframe as dd
import time

# ============ AGRÉGATION AVEC DASK ============
start = time.time()

# Lire tous les fichiers intermédiaires (lazy)
ddf = dd.read_csv("data/intermediate/*.csv")

# Pipeline d'agrégation
result = (
    ddf
    .groupby(["category", "status", "hour"])
    .agg({
        "response_time_ms": ["mean", "max", "count"],
        "bytes_sent": "sum",
        "is_error": "sum",
        "throughput": "mean"
    })
    .compute()  # Exécution
)

# Aplatir les colonnes multi-index
result.columns = ['_'.join(col).strip() for col in result.columns.values]
result = result.reset_index()

print(f"⏱️ Agrégation Dask : {time.time() - start:.2f}s")
print(f"\n📊 Résultat : {len(result)} lignes")
print(result.head(10))
Voir le code
# ============ EXPORT PARQUET ============
result.to_parquet("data/processed/aggregated_logs.parquet", index=False)
print("✅ Résultat exporté : data/processed/aggregated_logs.parquet")

# Vérification
import os
size_bytes = os.path.getsize("data/processed/aggregated_logs.parquet")
print(f"📦 Taille : {size_bytes / 1024:.1f} KB")
Voir le code
# ============ RÉSUMÉ DU PIPELINE ============
print("\n" + "="*50)
print("RÉSUMÉ DU PIPELINE HAUTE PERFORMANCE")
print("="*50)
print(f"""  
1️⃣ Input : 10 fichiers CSV (100K lignes)
2️⃣ Transformation : ProcessPoolExecutor (4 workers)
   - Nettoyage
   - Feature engineering
3️⃣ Agrégation : Dask DataFrame
   - GroupBy multi-colonnes
   - Aggregations multiples
4️⃣ Output : Parquet ({size_bytes / 1024:.1f} KB)
""")

📚 Ressources pour aller plus loin

🌐 Documentation officielle

📖 Articles & Tutoriels

🔧 Outils de profiling

  • py-spy — Sampling profiler
  • Scalene — CPU + mémoire + GPU profiler

➡️ Prochaine étape

Maintenant que tu maîtrises les techniques de performance en Python, passons au traitement distribué à grande échelle avec Spark !

👉 Module suivant : 19_pyspark_advanced — PySpark Avancé

Tu vas apprendre : - Architecture Spark (Driver, Executors) - RDDs et DataFrames Spark - Optimisations (partitioning, caching) - Spark sur Kubernetes


📝 Récapitulatif de ce module

Outil Type Quand l’utiliser
ThreadPoolExecutor I/O < 20 requêtes/fichiers
ProcessPoolExecutor CPU Calculs, transformations
asyncio I/O massif 100+ requêtes API
joblib Simple Paralléliser une boucle
Dask Big Data Fichiers > RAM, API Pandas

🎉 Félicitations ! Tu as terminé le module High Performance Python.

Voir le code
# Nettoyage des fichiers temporaires (optionnel)
import shutil
import os

# Décommenter pour nettoyer
# for folder in ["data/raw", "data/intermediate", "data/processed", "data/dask_demo"]:
#     if os.path.exists(folder):
#         shutil.rmtree(folder)
# print("🧹 Fichiers temporaires supprimés")
Retour au sommet