BONUS : FastAPI pour Data Engineers

Ce module bonus couvre FastAPI, le framework Python moderne pour créer des APIs REST performantes. En tant que Data Engineer, tu auras souvent besoin d’exposer des données via des APIs.


Prérequis

Niveau Compétence
✅ Requis Module 04_python_basics (fonctions, classes, type hints)
✅ Requis Module 05_python_data_processing (Pandas, JSON)
⭐ Bonus Module 09_mongodb (pour les exemples avec DB)

Objectifs du module

À la fin de ce notebook, tu seras capable de :

  • Créer une API REST avec FastAPI
  • Définir des modèles de données avec Pydantic
  • Créer des endpoints CRUD (Create, Read, Update, Delete)
  • Valider automatiquement les données entrantes
  • Servir des données Pandas via API
  • Connecter une API à une base de données
  • Générer une documentation Swagger automatique
  • Déployer une API avec Uvicorn

💡 Note : FastAPI s’exécute comme un serveur. Certains exemples seront à tester hors du notebook, dans des fichiers .py.


Pourquoi FastAPI pour un Data Engineer ?

┌─────────────────────────────────────────────────────────────────┐
│              CAS D'USAGE DATA ENGINEERING                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│     Exposer des données traitées (datamart → API)               │
│     Créer des webhooks pour déclencher des pipelines            │
│     Servir des prédictions ML en production                     │
│     Fournir des métriques pour les dashboards                   │
│     Créer des microservices data                                │
│     Valider des données avant ingestion                         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

FastAPI vs autres frameworks

Framework Performance Typing Doc auto Async Complexité
FastAPI ⭐⭐⭐ ✅ Natif ✅ Swagger Simple
Flask ⭐⭐ ❌ Manuel ❌ Extension Simple
Django REST ⭐⭐ ❌ Manuel ⚠️ Complexe
Express (Node) ⭐⭐⭐ Simple

💡 FastAPI combine le meilleur : performance, simplicité, et documentation automatique.


Installation

# Installation de base
pip install fastapi uvicorn

# Avec toutes les dépendances optionnelles
pip install "fastapi[all]"
Package Rôle
fastapi Le framework
uvicorn Serveur ASGI (pour lancer l’API)
pydantic Validation de données (inclus avec FastAPI)
Voir le code
# Installation
!pip install fastapi uvicorn pydantic pandas --quiet

print("Packages installés !")

1️⃣ Ta première API FastAPI

Structure minimale

Créer un fichier main.py :

# main.py
from fastapi import FastAPI

# Créer l'application
app = FastAPI()

# Premier endpoint
@app.get("/")
def root():
    return {"message": "Hello Data Engineer!"}

@app.get("/health")
def health_check():
    return {"status": "healthy"}

Lancer le serveur

# Dans le terminal
uvicorn main:app --reload

# main = fichier main.py
# app = variable FastAPI()
# --reload = redémarre automatiquement si le code change

Résultat :

INFO:     Uvicorn running on http://127.0.0.1:8000
INFO:     Started reloader process

Tester l’API

URL Résultat
http://127.0.0.1:8000 {"message": "Hello Data Engineer!"}
http://127.0.0.1:8000/health {"status": "healthy"}
http://127.0.0.1:8000/docs Documentation Swagger
http://127.0.0.1:8000/redoc Documentation ReDoc

Les méthodes HTTP

┌─────────────────────────────────────────────────────────────────┐
│                    MÉTHODES HTTP (CRUD)                         │
├──────────┬──────────────────┬───────────────────────────────────┤
│ Méthode  │ Action           │ Exemple                           │
├──────────┼──────────────────┼───────────────────────────────────┤
│ GET      │ Lire (Read)      │ GET /users → Liste des users      │
│ POST     │ Créer (Create)   │ POST /users → Créer un user       │
│ PUT      │ Remplacer        │ PUT /users/1 → Remplacer user 1   │
│ PATCH    │ Modifier         │ PATCH /users/1 → Modifier user 1  │
│ DELETE   │ Supprimer        │ DELETE /users/1 → Supprimer       │
└──────────┴──────────────────┴───────────────────────────────────┘
from fastapi import FastAPI

app = FastAPI()

@app.get("/items")
def get_items():
    return {"action": "Liste des items"}

@app.post("/items")
def create_item():
    return {"action": "Création d'un item"}

@app.put("/items/{item_id}")
def update_item(item_id: int):
    return {"action": f"Mise à jour de l'item {item_id}"}

@app.delete("/items/{item_id}")
def delete_item(item_id: int):
    return {"action": f"Suppression de l'item {item_id}"}

Paramètres de chemin (Path Parameters)

Les paramètres dans l’URL permettent d’identifier une ressource spécifique.

from fastapi import FastAPI

app = FastAPI()

# Paramètre simple
@app.get("/users/{user_id}")
def get_user(user_id: int):  # Typage automatique !
    return {"user_id": user_id}

# Plusieurs paramètres
@app.get("/users/{user_id}/orders/{order_id}")
def get_user_order(user_id: int, order_id: int):
    return {
        "user_id": user_id,
        "order_id": order_id
    }

Validation automatique

GET /users/42        → {"user_id": 42}       ✅
GET /users/abc       → Erreur 422            ❌ (pas un int)
GET /users/-1        → {"user_id": -1}       ✅ (mais logiquement faux)

Validation avancée avec Path

from fastapi import FastAPI, Path

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(
    user_id: int = Path(
        ...,  # Requis
        title="User ID",
        description="L'identifiant unique de l'utilisateur",
        ge=1,  # Greater or Equal (>= 1)
        le=10000  # Less or Equal (<= 10000)
    )
):
    return {"user_id": user_id}

Paramètres de requête (Query Parameters)

Les paramètres après le ? dans l’URL pour filtrer, paginer, etc.

GET /users?skip=0&limit=10&active=true
         └──────────┬──────────────┘
              Query parameters
from fastapi import FastAPI
from typing import Optional

app = FastAPI()

@app.get("/users")
def get_users(
    skip: int = 0,           # Défaut = 0
    limit: int = 10,         # Défaut = 10
    active: bool = True,     # Défaut = True
    search: Optional[str] = None  # Optionnel
):
    return {
        "skip": skip,
        "limit": limit,
        "active": active,
        "search": search
    }

Cas d’usage Data Engineering : Pagination

@app.get("/data")
def get_data(
    page: int = 1,
    page_size: int = 100,
    sort_by: str = "created_at",
    order: str = "desc"
):
    # Calculer l'offset
    offset = (page - 1) * page_size
    
    return {
        "page": page,
        "page_size": page_size,
        "offset": offset,
        "sort_by": sort_by,
        "order": order
    }

2️⃣ Pydantic — Validation des données

Pydantic est au cœur de FastAPI. Il permet de : - Définir des schémas de données - Valider automatiquement les entrées - Convertir les types - Générer la documentation


Définir un modèle

from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime

class User(BaseModel):
    """Modèle utilisateur pour l'API."""
    
    id: Optional[int] = None
    nom: str = Field(..., min_length=2, max_length=50)
    email: EmailStr
    age: int = Field(..., ge=0, le=150)
    actif: bool = True
    created_at: datetime = Field(default_factory=datetime.now)
    
    class Config:
        # Exemple pour la documentation
        json_schema_extra = {
            "example": {
                "nom": "Alice Dupont",
                "email": "alice@example.com",
                "age": 30,
                "actif": True
            }
        }

Validateurs disponibles

Validateur Exemple Description
min_length Field(min_length=2) Longueur min string
max_length Field(max_length=50) Longueur max string
ge Field(ge=0) Greater or Equal
le Field(le=100) Less or Equal
gt Field(gt=0) Greater Than
lt Field(lt=100) Less Than
regex Field(regex='^[A-Z]') Pattern regex
EmailStr Type spécial Valide format email
Voir le code
# Exemple Pydantic dans le notebook
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime

class Transaction(BaseModel):
    """Modèle de transaction pour pipeline ETL."""
    
    id: Optional[int] = None
    montant: float = Field(..., gt=0, description="Montant positif")
    devise: str = Field(..., min_length=3, max_length=3)
    description: str = Field(..., min_length=1, max_length=200)
    timestamp: datetime = Field(default_factory=datetime.now)
    
    @field_validator('devise')
    @classmethod
    def devise_uppercase(cls, v):
        return v.upper()

# Test de validation
try:
    # Valide
    tx1 = Transaction(montant=100.50, devise="eur", description="Achat")
    print(f" Transaction valide : {tx1}")
    print(f" Devise convertie : {tx1.devise}")
    
    # Invalide
    tx2 = Transaction(montant=-50, devise="EUR", description="Test")
except Exception as e:
    print(f"\n Erreur de validation : {e}")

Corps de requête (Request Body)

Pour les requêtes POST/PUT, les données sont envoyées dans le body (corps) en JSON.

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class UserCreate(BaseModel):
    nom: str
    email: str
    age: int

class UserResponse(BaseModel):
    id: int
    nom: str
    email: str
    age: int

# Base de données fictive
users_db = []
user_id_counter = 1

@app.post("/users", response_model=UserResponse)
def create_user(user: UserCreate):
    global user_id_counter
    
    # Créer le user avec un ID
    new_user = {
        "id": user_id_counter,
        "nom": user.nom,
        "email": user.email,
        "age": user.age
    }
    
    users_db.append(new_user)
    user_id_counter += 1
    
    return new_user

Requête avec curl

curl -X POST "http://127.0.0.1:8000/users" \
  -H "Content-Type: application/json" \
  -d '{"nom": "Alice", "email": "alice@test.com", "age": 30}'

Requête avec Python requests

import requests

response = requests.post(
    "http://127.0.0.1:8000/users",
    json={"nom": "Alice", "email": "alice@test.com", "age": 30}
)
print(response.json())

3️⃣ API CRUD complète

Voici une API complète avec toutes les opérations CRUD :

# crud_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime

app = FastAPI(
    title="Data Pipeline API",
    description="API pour gérer les pipelines de données",
    version="1.0.0"
)

# ============ MODÈLES ============

class PipelineBase(BaseModel):
    nom: str = Field(..., min_length=3, max_length=100)
    description: Optional[str] = None
    schedule: str = Field(..., description="Cron expression")
    actif: bool = True

class PipelineCreate(PipelineBase):
    pass

class PipelineUpdate(BaseModel):
    nom: Optional[str] = None
    description: Optional[str] = None
    schedule: Optional[str] = None
    actif: Optional[bool] = None

class Pipeline(PipelineBase):
    id: int
    created_at: datetime
    updated_at: datetime

# ============ BASE DE DONNÉES (simulée) ============

pipelines_db: dict[int, dict] = {}
pipeline_id_counter = 1

# ============ ENDPOINTS ============

# CREATE
@app.post("/pipelines", response_model=Pipeline, status_code=201)
def create_pipeline(pipeline: PipelineCreate):
    global pipeline_id_counter
    
    now = datetime.now()
    new_pipeline = {
        "id": pipeline_id_counter,
        **pipeline.model_dump(),
        "created_at": now,
        "updated_at": now
    }
    
    pipelines_db[pipeline_id_counter] = new_pipeline
    pipeline_id_counter += 1
    
    return new_pipeline

# READ ALL
@app.get("/pipelines", response_model=list[Pipeline])
def list_pipelines(skip: int = 0, limit: int = 10, actif: Optional[bool] = None):
    pipelines = list(pipelines_db.values())
    
    # Filtrer par statut actif
    if actif is not None:
        pipelines = [p for p in pipelines if p["actif"] == actif]
    
    return pipelines[skip:skip + limit]

# READ ONE
@app.get("/pipelines/{pipeline_id}", response_model=Pipeline)
def get_pipeline(pipeline_id: int):
    if pipeline_id not in pipelines_db:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    return pipelines_db[pipeline_id]

# UPDATE
@app.patch("/pipelines/{pipeline_id}", response_model=Pipeline)
def update_pipeline(pipeline_id: int, pipeline: PipelineUpdate):
    if pipeline_id not in pipelines_db:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    
    stored = pipelines_db[pipeline_id]
    update_data = pipeline.model_dump(exclude_unset=True)
    
    for key, value in update_data.items():
        stored[key] = value
    
    stored["updated_at"] = datetime.now()
    
    return stored

# DELETE
@app.delete("/pipelines/{pipeline_id}", status_code=204)
def delete_pipeline(pipeline_id: int):
    if pipeline_id not in pipelines_db:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    
    del pipelines_db[pipeline_id]
    return None

Gestion des erreurs

FastAPI utilise HTTPException pour retourner des erreurs HTTP propres.

from fastapi import FastAPI, HTTPException, status

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(user_id: int):
    user = find_user_by_id(user_id)  # Fonction fictive
    
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User {user_id} non trouvé",
            headers={"X-Error": "User not found"}
        )
    
    return user

Codes HTTP courants

Code Nom Usage
200 OK Succès (GET, PUT)
201 Created Création réussie (POST)
204 No Content Suppression réussie (DELETE)
400 Bad Request Requête invalide
401 Unauthorized Non authentifié
403 Forbidden Non autorisé
404 Not Found Ressource introuvable
422 Unprocessable Entity Erreur de validation
500 Internal Server Error Erreur serveur

4️⃣ Servir des données Pandas via API

C’est le cas d’usage le plus courant pour un Data Engineer : exposer des données traitées.

# data_api.py
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse, StreamingResponse
import pandas as pd
import io

app = FastAPI(title="Data API")

# Charger les données (en production: depuis une DB ou un fichier)
df = pd.DataFrame({
    "id": range(1, 101),
    "nom": [f"User_{i}" for i in range(1, 101)],
    "age": [20 + (i % 40) for i in range(100)],
    "ville": ["Paris", "Lyon", "Marseille", "Toulouse", "Bordeaux"] * 20,
    "salaire": [30000 + (i * 500) for i in range(100)]
})

# ============ ENDPOINT JSON ============

@app.get("/data")
def get_data(
    page: int = Query(1, ge=1),
    page_size: int = Query(10, ge=1, le=100),
    ville: str = None,
    min_salaire: int = None
):
    """Retourne les données avec pagination et filtres."""
    
    result = df.copy()
    
    # Appliquer les filtres
    if ville:
        result = result[result["ville"] == ville]
    if min_salaire:
        result = result[result["salaire"] >= min_salaire]
    
    # Pagination
    total = len(result)
    start = (page - 1) * page_size
    end = start + page_size
    
    return {
        "total": total,
        "page": page,
        "page_size": page_size,
        "pages": (total + page_size - 1) // page_size,
        "data": result.iloc[start:end].to_dict(orient="records")
    }

# ============ ENDPOINT CSV ============

@app.get("/data/export/csv")
def export_csv():
    """Exporte les données en CSV."""
    
    stream = io.StringIO()
    df.to_csv(stream, index=False)
    
    response = StreamingResponse(
        iter([stream.getvalue()]),
        media_type="text/csv"
    )
    response.headers["Content-Disposition"] = "attachment; filename=data.csv"
    
    return response

# ============ ENDPOINT AGRÉGATIONS ============

@app.get("/data/stats")
def get_stats():
    """Retourne des statistiques agrégées."""
    
    stats = {
        "total_users": len(df),
        "age_moyen": round(df["age"].mean(), 2),
        "salaire_moyen": round(df["salaire"].mean(), 2),
        "salaire_median": round(df["salaire"].median(), 2),
        "par_ville": df.groupby("ville").agg({
            "id": "count",
            "salaire": "mean"
        }).round(2).to_dict(orient="index")
    }
    
    return stats
Voir le code
# Démonstration : Conversion DataFrame → JSON
import pandas as pd

df = pd.DataFrame({
    "id": [1, 2, 3],
    "nom": ["Alice", "Bob", "Charlie"],
    "salaire": [50000, 60000, 55000]
})

# Différentes conversions
print("📊 DataFrame original:")
print(df)

print("\n📄 to_dict('records') → pour API JSON:")
print(df.to_dict(orient="records"))

print("\n📄 to_dict('index') → pour stats par clé:")
print(df.set_index('nom')['salaire'].to_dict())

5️⃣ Connexion à une base de données

Avec SQLAlchemy (PostgreSQL, MySQL, SQLite)

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

# URL de connexion
DATABASE_URL = "postgresql://user:password@localhost:5432/mydb"
# Pour SQLite: "sqlite:///./data.db"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Dependency pour injecter la session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
# models.py
from sqlalchemy import Column, Integer, String, Float, DateTime
from database import Base
from datetime import datetime

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    nom = Column(String(100), nullable=False)
    email = Column(String(100), unique=True, index=True)
    age = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)
# main.py
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from database import get_db
import models

app = FastAPI()

@app.get("/users")
def get_users(db: Session = Depends(get_db), skip: int = 0, limit: int = 10):
    users = db.query(models.User).offset(skip).limit(limit).all()
    return users

Avec MongoDB (PyMongo)

# mongodb_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from pymongo import MongoClient
from bson import ObjectId
from typing import Optional

app = FastAPI(title="MongoDB API")

# Connexion MongoDB
client = MongoClient("mongodb://localhost:27017")
db = client["data_engineering"]
collection = db["pipelines"]

# Helper pour ObjectId
class PyObjectId(ObjectId):
    @classmethod
    def __get_validators__(cls):
        yield cls.validate
    
    @classmethod
    def validate(cls, v, field):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid ObjectId")
        return ObjectId(v)

# Modèle
class Pipeline(BaseModel):
    id: Optional[str] = Field(default=None, alias="_id")
    nom: str
    description: str
    schedule: str
    
    class Config:
        populate_by_name = True

# CRUD
@app.get("/pipelines")
def list_pipelines():
    pipelines = []
    for doc in collection.find():
        doc["_id"] = str(doc["_id"])
        pipelines.append(doc)
    return pipelines

@app.post("/pipelines")
def create_pipeline(pipeline: Pipeline):
    result = collection.insert_one(pipeline.model_dump(exclude={"id"}))
    return {"id": str(result.inserted_id)}

@app.get("/pipelines/{pipeline_id}")
def get_pipeline(pipeline_id: str):
    doc = collection.find_one({"_id": ObjectId(pipeline_id)})
    if not doc:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    doc["_id"] = str(doc["_id"])
    return doc

@app.delete("/pipelines/{pipeline_id}")
def delete_pipeline(pipeline_id: str):
    result = collection.delete_one({"_id": ObjectId(pipeline_id)})
    if result.deleted_count == 0:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    return {"deleted": True}

6️⃣ Fonctionnalités avancées

Background Tasks (tâches en arrière-plan)

Utile pour les webhooks et le déclenchement de pipelines.

from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

def run_etl_pipeline(pipeline_id: int):
    """Simule l'exécution d'un pipeline ETL."""
    print(f" Démarrage du pipeline {pipeline_id}")
    time.sleep(10)  # Simule un traitement long
    print(f" Pipeline {pipeline_id} terminé")

@app.post("/pipelines/{pipeline_id}/run")
def trigger_pipeline(pipeline_id: int, background_tasks: BackgroundTasks):
    """Déclenche un pipeline en arrière-plan."""
    
    # Ajoute la tâche en arrière-plan
    background_tasks.add_task(run_etl_pipeline, pipeline_id)
    
    # Répond immédiatement
    return {
        "message": f"Pipeline {pipeline_id} déclenché",
        "status": "running"
    }

Webhook pour déclencher un pipeline

@app.post("/webhook/data-arrival")
def webhook_data_arrival(
    payload: dict,
    background_tasks: BackgroundTasks
):
    """Webhook appelé quand de nouvelles données arrivent."""
    
    source = payload.get("source")
    file_path = payload.get("file_path")
    
    # Déclencher le pipeline approprié
    background_tasks.add_task(
        process_new_data, 
        source=source, 
        file_path=file_path
    )
    
    return {"status": "accepted", "message": "Pipeline déclenché"}

Upload de fichiers

Permet d’ingérer des fichiers (CSV, JSON, Parquet) via API.

from fastapi import FastAPI, UploadFile, File
import pandas as pd
import io

app = FastAPI()

@app.post("/upload/csv")
async def upload_csv(file: UploadFile = File(...)):
    """Upload et analyse un fichier CSV."""
    
    # Vérifier l'extension
    if not file.filename.endswith('.csv'):
        raise HTTPException(status_code=400, detail="Fichier CSV requis")
    
    # Lire le contenu
    contents = await file.read()
    
    # Charger dans Pandas
    df = pd.read_csv(io.StringIO(contents.decode('utf-8')))
    
    return {
        "filename": file.filename,
        "rows": len(df),
        "columns": list(df.columns),
        "dtypes": df.dtypes.astype(str).to_dict(),
        "preview": df.head(5).to_dict(orient="records")
    }

@app.post("/upload/batch")
async def upload_multiple(files: list[UploadFile] = File(...)):
    """Upload plusieurs fichiers."""
    
    results = []
    for file in files:
        results.append({
            "filename": file.filename,
            "size": len(await file.read())
        })
    
    return {"uploaded": len(files), "files": results}

Documentation automatique

FastAPI génère automatiquement une documentation interactive !

URLs de documentation

URL Type Description
/docs Swagger UI Interface interactive pour tester
/redoc ReDoc Documentation lisible
/openapi.json OpenAPI Schéma JSON de l’API

Personnaliser la documentation

from fastapi import FastAPI

app = FastAPI(
    title="Data Pipeline API",
    description="""
    ## API pour gérer les pipelines de données
    
    Cette API permet de :
    - Gérer les pipelines ETL
    - Consulter les données
    - Déclencher des exécutions
    
    ### Authentification
    Utilise un token Bearer dans le header `Authorization`.
    """,
    version="1.0.0",
    contact={
        "name": "Data Team",
        "email": "data@company.com"
    },
    license_info={
        "name": "MIT"
    }
)

# Documenter un endpoint
@app.get(
    "/pipelines",
    summary="Liste les pipelines",
    description="Retourne tous les pipelines avec pagination",
    response_description="Liste des pipelines",
    tags=["Pipelines"]
)
def list_pipelines():
    pass

7️⃣ Exemple complet : API Data Pipeline

Voici un fichier complet à copier et tester :

# pipeline_api.py
"""
API FastAPI pour Data Engineering
Lancer avec : uvicorn pipeline_api:app --reload
"""

from fastapi import FastAPI, HTTPException, Query, BackgroundTasks, UploadFile, File
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import pandas as pd
import io
import time

# ============ APPLICATION ============

app = FastAPI(
    title="Data Pipeline API",
    description="API pour gérer et monitorer les pipelines de données",
    version="1.0.0"
)

# ============ MODÈLES ============

class PipelineCreate(BaseModel):
    nom: str = Field(..., min_length=3, max_length=100)
    description: Optional[str] = None
    schedule: str = Field(..., description="Expression cron")
    source: str = Field(..., description="Source de données")
    destination: str = Field(..., description="Destination")

class Pipeline(PipelineCreate):
    id: int
    status: str = "idle"
    last_run: Optional[datetime] = None
    created_at: datetime

class RunResult(BaseModel):
    pipeline_id: int
    status: str
    rows_processed: int
    duration_seconds: float
    timestamp: datetime

# ============ STORAGE (simulé) ============

pipelines_db: dict[int, dict] = {}
runs_history: list[dict] = []
next_id = 1

# ============ HELPERS ============

def simulate_pipeline_run(pipeline_id: int):
    """Simule l'exécution d'un pipeline."""
    import random
    
    pipeline = pipelines_db.get(pipeline_id)
    if not pipeline:
        return
    
    pipeline["status"] = "running"
    start = time.time()
    
    # Simulation
    time.sleep(random.uniform(1, 3))
    rows = random.randint(1000, 100000)
    
    duration = time.time() - start
    pipeline["status"] = "idle"
    pipeline["last_run"] = datetime.now()
    
    # Enregistrer le run
    runs_history.append({
        "pipeline_id": pipeline_id,
        "status": "success",
        "rows_processed": rows,
        "duration_seconds": round(duration, 2),
        "timestamp": datetime.now()
    })

# ============ ENDPOINTS ============

@app.get("/", tags=["Health"])
def root():
    return {"status": "healthy", "service": "Data Pipeline API"}

# --- CRUD Pipelines ---

@app.post("/pipelines", response_model=Pipeline, status_code=201, tags=["Pipelines"])
def create_pipeline(pipeline: PipelineCreate):
    """Crée un nouveau pipeline."""
    global next_id
    
    new_pipeline = {
        "id": next_id,
        **pipeline.model_dump(),
        "status": "idle",
        "last_run": None,
        "created_at": datetime.now()
    }
    
    pipelines_db[next_id] = new_pipeline
    next_id += 1
    
    return new_pipeline

@app.get("/pipelines", response_model=list[Pipeline], tags=["Pipelines"])
def list_pipelines(
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    status: Optional[str] = None
):
    """Liste tous les pipelines."""
    pipelines = list(pipelines_db.values())
    
    if status:
        pipelines = [p for p in pipelines if p["status"] == status]
    
    return pipelines[skip:skip + limit]

@app.get("/pipelines/{pipeline_id}", response_model=Pipeline, tags=["Pipelines"])
def get_pipeline(pipeline_id: int):
    """Récupère un pipeline par ID."""
    if pipeline_id not in pipelines_db:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    return pipelines_db[pipeline_id]

@app.delete("/pipelines/{pipeline_id}", status_code=204, tags=["Pipelines"])
def delete_pipeline(pipeline_id: int):
    """Supprime un pipeline."""
    if pipeline_id not in pipelines_db:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    del pipelines_db[pipeline_id]

# --- Exécution ---

@app.post("/pipelines/{pipeline_id}/run", tags=["Execution"])
def run_pipeline(pipeline_id: int, background_tasks: BackgroundTasks):
    """Déclenche l'exécution d'un pipeline."""
    if pipeline_id not in pipelines_db:
        raise HTTPException(status_code=404, detail="Pipeline non trouvé")
    
    pipeline = pipelines_db[pipeline_id]
    if pipeline["status"] == "running":
        raise HTTPException(status_code=409, detail="Pipeline déjà en cours")
    
    background_tasks.add_task(simulate_pipeline_run, pipeline_id)
    
    return {"message": f"Pipeline {pipeline_id} démarré", "status": "running"}

@app.get("/pipelines/{pipeline_id}/runs", response_model=list[RunResult], tags=["Execution"])
def get_pipeline_runs(pipeline_id: int, limit: int = 10):
    """Historique des exécutions d'un pipeline."""
    runs = [r for r in runs_history if r["pipeline_id"] == pipeline_id]
    return runs[-limit:]

# --- Data ---

@app.post("/upload", tags=["Data"])
async def upload_data(file: UploadFile = File(...)):
    """Upload un fichier de données."""
    contents = await file.read()
    
    if file.filename.endswith('.csv'):
        df = pd.read_csv(io.StringIO(contents.decode('utf-8')))
    elif file.filename.endswith('.json'):
        df = pd.read_json(io.StringIO(contents.decode('utf-8')))
    else:
        raise HTTPException(status_code=400, detail="Format non supporté")
    
    return {
        "filename": file.filename,
        "rows": len(df),
        "columns": list(df.columns),
        "preview": df.head().to_dict(orient="records")
    }

# ============ MAIN ============

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

🚀 Lancer et tester

# Terminal 1 : Lancer le serveur
uvicorn pipeline_api:app --reload

# Terminal 2 : Tester avec curl

# Créer un pipeline
curl -X POST "http://localhost:8000/pipelines" \
  -H "Content-Type: application/json" \
  -d '{"nom": "ETL Users", "schedule": "0 * * * *", "source": "postgres", "destination": "bigquery"}'

# Lister les pipelines
curl "http://localhost:8000/pipelines"

# Déclencher un pipeline
curl -X POST "http://localhost:8000/pipelines/1/run"

# Voir l'historique
curl "http://localhost:8000/pipelines/1/runs"

8️⃣ Bonnes pratiques

Structure de projet recommandée

my_api/
├── app/
│   ├── __init__.py
│   ├── main.py              # Point d'entrée FastAPI
│   ├── config.py            # Configuration (env vars)
│   ├── database.py          # Connexion DB
│   ├── models/              # Modèles SQLAlchemy/Pydantic
│   │   ├── __init__.py
│   │   └── pipeline.py
│   ├── schemas/             # Schémas Pydantic (request/response)
│   │   ├── __init__.py
│   │   └── pipeline.py
│   ├── routers/             # Endpoints par domaine
│   │   ├── __init__.py
│   │   ├── pipelines.py
│   │   └── data.py
│   └── services/            # Logique métier
│       ├── __init__.py
│       └── etl.py
├── tests/
│   ├── __init__.py
│   └── test_pipelines.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── README.md

Sécurité

# Ne jamais hardcoder les secrets !
import os
from dotenv import load_dotenv

load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL")
API_KEY = os.getenv("API_KEY")

Checklist production

Item
Variables d’environnement pour les secrets
Validation des entrées (Pydantic)
Gestion des erreurs (HTTPException)
Logging structuré
Rate limiting
CORS configuré
Tests automatisés
Documentation à jour
Health check endpoint

Résumé

Ce que tu as appris

Concept Description
FastAPI Framework moderne pour APIs REST
Pydantic Validation automatique des données
Endpoints CRUD GET, POST, PUT, PATCH, DELETE
Path/Query params Paramètres d’URL et de requête
Pandas + API Servir des données via HTTP
Background tasks Exécuter des tâches asynchrones
Documentation Swagger automatique

Cas d’usage Data Engineering

Cas Exemple
Exposer des données API pour dashboard/BI
Webhook Déclencher un pipeline à l’arrivée de données
Validation Vérifier les données avant ingestion
Monitoring API de statut des pipelines
ML Serving Exposer des prédictions

Commandes essentielles

# Installation
pip install fastapi uvicorn

# Lancer en dev
uvicorn main:app --reload

# Lancer en prod
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

📚 Ressources

Documentation officielle

Tutoriels

Outils complémentaires

  • SQLModel — ORM par le créateur de FastAPI
  • Strawberry — GraphQL avec FastAPI
  • Celery — Tâches asynchrones avancées

🎉 Félicitations ! Tu maîtrises maintenant FastAPI pour le Data Engineering.

Retour au sommet