Voir le code
# Installation
!pip install fastapi uvicorn pydantic pandas --quiet
print("Packages installés !")Ce module bonus couvre FastAPI, le framework Python moderne pour créer des APIs REST performantes. En tant que Data Engineer, tu auras souvent besoin d’exposer des données via des APIs.
| Niveau | Compétence |
|---|---|
| ✅ Requis | Module 04_python_basics (fonctions, classes, type hints) |
| ✅ Requis | Module 05_python_data_processing (Pandas, JSON) |
| ⭐ Bonus | Module 09_mongodb (pour les exemples avec DB) |
À la fin de ce notebook, tu seras capable de :
💡 Note : FastAPI s’exécute comme un serveur. Certains exemples seront à tester hors du notebook, dans des fichiers
.py.
┌─────────────────────────────────────────────────────────────────┐
│ CAS D'USAGE DATA ENGINEERING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Exposer des données traitées (datamart → API) │
│ Créer des webhooks pour déclencher des pipelines │
│ Servir des prédictions ML en production │
│ Fournir des métriques pour les dashboards │
│ Créer des microservices data │
│ Valider des données avant ingestion │
│ │
└─────────────────────────────────────────────────────────────────┘
| Framework | Performance | Typing | Doc auto | Async | Complexité |
|---|---|---|---|---|---|
| FastAPI | ⭐⭐⭐ | ✅ Natif | ✅ Swagger | ✅ | Simple |
| Flask | ⭐⭐ | ❌ Manuel | ❌ Extension | ❌ | Simple |
| Django REST | ⭐⭐ | ❌ Manuel | ✅ | ⚠️ | Complexe |
| Express (Node) | ⭐⭐⭐ | ❌ | ❌ | ✅ | Simple |
💡 FastAPI combine le meilleur : performance, simplicité, et documentation automatique.
| Package | Rôle |
|---|---|
fastapi |
Le framework |
uvicorn |
Serveur ASGI (pour lancer l’API) |
pydantic |
Validation de données (inclus avec FastAPI) |
Créer un fichier main.py :
Résultat :
INFO: Uvicorn running on http://127.0.0.1:8000
INFO: Started reloader process
| URL | Résultat |
|---|---|
| http://127.0.0.1:8000 | {"message": "Hello Data Engineer!"} |
| http://127.0.0.1:8000/health | {"status": "healthy"} |
| http://127.0.0.1:8000/docs | Documentation Swagger |
| http://127.0.0.1:8000/redoc | Documentation ReDoc |
┌─────────────────────────────────────────────────────────────────┐
│ MÉTHODES HTTP (CRUD) │
├──────────┬──────────────────┬───────────────────────────────────┤
│ Méthode │ Action │ Exemple │
├──────────┼──────────────────┼───────────────────────────────────┤
│ GET │ Lire (Read) │ GET /users → Liste des users │
│ POST │ Créer (Create) │ POST /users → Créer un user │
│ PUT │ Remplacer │ PUT /users/1 → Remplacer user 1 │
│ PATCH │ Modifier │ PATCH /users/1 → Modifier user 1 │
│ DELETE │ Supprimer │ DELETE /users/1 → Supprimer │
└──────────┴──────────────────┴───────────────────────────────────┘
from fastapi import FastAPI
app = FastAPI()
@app.get("/items")
def get_items():
return {"action": "Liste des items"}
@app.post("/items")
def create_item():
return {"action": "Création d'un item"}
@app.put("/items/{item_id}")
def update_item(item_id: int):
return {"action": f"Mise à jour de l'item {item_id}"}
@app.delete("/items/{item_id}")
def delete_item(item_id: int):
return {"action": f"Suppression de l'item {item_id}"}Les paramètres dans l’URL permettent d’identifier une ressource spécifique.
from fastapi import FastAPI
app = FastAPI()
# Paramètre simple
@app.get("/users/{user_id}")
def get_user(user_id: int): # Typage automatique !
return {"user_id": user_id}
# Plusieurs paramètres
@app.get("/users/{user_id}/orders/{order_id}")
def get_user_order(user_id: int, order_id: int):
return {
"user_id": user_id,
"order_id": order_id
}GET /users/42 → {"user_id": 42} ✅
GET /users/abc → Erreur 422 ❌ (pas un int)
GET /users/-1 → {"user_id": -1} ✅ (mais logiquement faux)
Les paramètres après le ? dans l’URL pour filtrer, paginer, etc.
GET /users?skip=0&limit=10&active=true
└──────────┬──────────────┘
Query parameters
from fastapi import FastAPI
from typing import Optional
app = FastAPI()
@app.get("/users")
def get_users(
skip: int = 0, # Défaut = 0
limit: int = 10, # Défaut = 10
active: bool = True, # Défaut = True
search: Optional[str] = None # Optionnel
):
return {
"skip": skip,
"limit": limit,
"active": active,
"search": search
}Pydantic est au cœur de FastAPI. Il permet de : - Définir des schémas de données - Valider automatiquement les entrées - Convertir les types - Générer la documentation
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime
class User(BaseModel):
"""Modèle utilisateur pour l'API."""
id: Optional[int] = None
nom: str = Field(..., min_length=2, max_length=50)
email: EmailStr
age: int = Field(..., ge=0, le=150)
actif: bool = True
created_at: datetime = Field(default_factory=datetime.now)
class Config:
# Exemple pour la documentation
json_schema_extra = {
"example": {
"nom": "Alice Dupont",
"email": "alice@example.com",
"age": 30,
"actif": True
}
}| Validateur | Exemple | Description |
|---|---|---|
min_length |
Field(min_length=2) |
Longueur min string |
max_length |
Field(max_length=50) |
Longueur max string |
ge |
Field(ge=0) |
Greater or Equal |
le |
Field(le=100) |
Less or Equal |
gt |
Field(gt=0) |
Greater Than |
lt |
Field(lt=100) |
Less Than |
regex |
Field(regex='^[A-Z]') |
Pattern regex |
EmailStr |
Type spécial | Valide format email |
# Exemple Pydantic dans le notebook
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime
class Transaction(BaseModel):
"""Modèle de transaction pour pipeline ETL."""
id: Optional[int] = None
montant: float = Field(..., gt=0, description="Montant positif")
devise: str = Field(..., min_length=3, max_length=3)
description: str = Field(..., min_length=1, max_length=200)
timestamp: datetime = Field(default_factory=datetime.now)
@field_validator('devise')
@classmethod
def devise_uppercase(cls, v):
return v.upper()
# Test de validation
try:
# Valide
tx1 = Transaction(montant=100.50, devise="eur", description="Achat")
print(f" Transaction valide : {tx1}")
print(f" Devise convertie : {tx1.devise}")
# Invalide
tx2 = Transaction(montant=-50, devise="EUR", description="Test")
except Exception as e:
print(f"\n Erreur de validation : {e}")Pour les requêtes POST/PUT, les données sont envoyées dans le body (corps) en JSON.
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class UserCreate(BaseModel):
nom: str
email: str
age: int
class UserResponse(BaseModel):
id: int
nom: str
email: str
age: int
# Base de données fictive
users_db = []
user_id_counter = 1
@app.post("/users", response_model=UserResponse)
def create_user(user: UserCreate):
global user_id_counter
# Créer le user avec un ID
new_user = {
"id": user_id_counter,
"nom": user.nom,
"email": user.email,
"age": user.age
}
users_db.append(new_user)
user_id_counter += 1
return new_userVoici une API complète avec toutes les opérations CRUD :
# crud_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
app = FastAPI(
title="Data Pipeline API",
description="API pour gérer les pipelines de données",
version="1.0.0"
)
# ============ MODÈLES ============
class PipelineBase(BaseModel):
nom: str = Field(..., min_length=3, max_length=100)
description: Optional[str] = None
schedule: str = Field(..., description="Cron expression")
actif: bool = True
class PipelineCreate(PipelineBase):
pass
class PipelineUpdate(BaseModel):
nom: Optional[str] = None
description: Optional[str] = None
schedule: Optional[str] = None
actif: Optional[bool] = None
class Pipeline(PipelineBase):
id: int
created_at: datetime
updated_at: datetime
# ============ BASE DE DONNÉES (simulée) ============
pipelines_db: dict[int, dict] = {}
pipeline_id_counter = 1
# ============ ENDPOINTS ============
# CREATE
@app.post("/pipelines", response_model=Pipeline, status_code=201)
def create_pipeline(pipeline: PipelineCreate):
global pipeline_id_counter
now = datetime.now()
new_pipeline = {
"id": pipeline_id_counter,
**pipeline.model_dump(),
"created_at": now,
"updated_at": now
}
pipelines_db[pipeline_id_counter] = new_pipeline
pipeline_id_counter += 1
return new_pipeline
# READ ALL
@app.get("/pipelines", response_model=list[Pipeline])
def list_pipelines(skip: int = 0, limit: int = 10, actif: Optional[bool] = None):
pipelines = list(pipelines_db.values())
# Filtrer par statut actif
if actif is not None:
pipelines = [p for p in pipelines if p["actif"] == actif]
return pipelines[skip:skip + limit]
# READ ONE
@app.get("/pipelines/{pipeline_id}", response_model=Pipeline)
def get_pipeline(pipeline_id: int):
if pipeline_id not in pipelines_db:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
return pipelines_db[pipeline_id]
# UPDATE
@app.patch("/pipelines/{pipeline_id}", response_model=Pipeline)
def update_pipeline(pipeline_id: int, pipeline: PipelineUpdate):
if pipeline_id not in pipelines_db:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
stored = pipelines_db[pipeline_id]
update_data = pipeline.model_dump(exclude_unset=True)
for key, value in update_data.items():
stored[key] = value
stored["updated_at"] = datetime.now()
return stored
# DELETE
@app.delete("/pipelines/{pipeline_id}", status_code=204)
def delete_pipeline(pipeline_id: int):
if pipeline_id not in pipelines_db:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
del pipelines_db[pipeline_id]
return NoneFastAPI utilise HTTPException pour retourner des erreurs HTTP propres.
from fastapi import FastAPI, HTTPException, status
app = FastAPI()
@app.get("/users/{user_id}")
def get_user(user_id: int):
user = find_user_by_id(user_id) # Fonction fictive
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} non trouvé",
headers={"X-Error": "User not found"}
)
return user| Code | Nom | Usage |
|---|---|---|
| 200 | OK | Succès (GET, PUT) |
| 201 | Created | Création réussie (POST) |
| 204 | No Content | Suppression réussie (DELETE) |
| 400 | Bad Request | Requête invalide |
| 401 | Unauthorized | Non authentifié |
| 403 | Forbidden | Non autorisé |
| 404 | Not Found | Ressource introuvable |
| 422 | Unprocessable Entity | Erreur de validation |
| 500 | Internal Server Error | Erreur serveur |
C’est le cas d’usage le plus courant pour un Data Engineer : exposer des données traitées.
# data_api.py
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse, StreamingResponse
import pandas as pd
import io
app = FastAPI(title="Data API")
# Charger les données (en production: depuis une DB ou un fichier)
df = pd.DataFrame({
"id": range(1, 101),
"nom": [f"User_{i}" for i in range(1, 101)],
"age": [20 + (i % 40) for i in range(100)],
"ville": ["Paris", "Lyon", "Marseille", "Toulouse", "Bordeaux"] * 20,
"salaire": [30000 + (i * 500) for i in range(100)]
})
# ============ ENDPOINT JSON ============
@app.get("/data")
def get_data(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
ville: str = None,
min_salaire: int = None
):
"""Retourne les données avec pagination et filtres."""
result = df.copy()
# Appliquer les filtres
if ville:
result = result[result["ville"] == ville]
if min_salaire:
result = result[result["salaire"] >= min_salaire]
# Pagination
total = len(result)
start = (page - 1) * page_size
end = start + page_size
return {
"total": total,
"page": page,
"page_size": page_size,
"pages": (total + page_size - 1) // page_size,
"data": result.iloc[start:end].to_dict(orient="records")
}
# ============ ENDPOINT CSV ============
@app.get("/data/export/csv")
def export_csv():
"""Exporte les données en CSV."""
stream = io.StringIO()
df.to_csv(stream, index=False)
response = StreamingResponse(
iter([stream.getvalue()]),
media_type="text/csv"
)
response.headers["Content-Disposition"] = "attachment; filename=data.csv"
return response
# ============ ENDPOINT AGRÉGATIONS ============
@app.get("/data/stats")
def get_stats():
"""Retourne des statistiques agrégées."""
stats = {
"total_users": len(df),
"age_moyen": round(df["age"].mean(), 2),
"salaire_moyen": round(df["salaire"].mean(), 2),
"salaire_median": round(df["salaire"].median(), 2),
"par_ville": df.groupby("ville").agg({
"id": "count",
"salaire": "mean"
}).round(2).to_dict(orient="index")
}
return stats# Démonstration : Conversion DataFrame → JSON
import pandas as pd
df = pd.DataFrame({
"id": [1, 2, 3],
"nom": ["Alice", "Bob", "Charlie"],
"salaire": [50000, 60000, 55000]
})
# Différentes conversions
print("📊 DataFrame original:")
print(df)
print("\n📄 to_dict('records') → pour API JSON:")
print(df.to_dict(orient="records"))
print("\n📄 to_dict('index') → pour stats par clé:")
print(df.set_index('nom')['salaire'].to_dict())# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
# URL de connexion
DATABASE_URL = "postgresql://user:password@localhost:5432/mydb"
# Pour SQLite: "sqlite:///./data.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency pour injecter la session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()# models.py
from sqlalchemy import Column, Integer, String, Float, DateTime
from database import Base
from datetime import datetime
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
nom = Column(String(100), nullable=False)
email = Column(String(100), unique=True, index=True)
age = Column(Integer)
created_at = Column(DateTime, default=datetime.now)# main.py
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from database import get_db
import models
app = FastAPI()
@app.get("/users")
def get_users(db: Session = Depends(get_db), skip: int = 0, limit: int = 10):
users = db.query(models.User).offset(skip).limit(limit).all()
return users# mongodb_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from pymongo import MongoClient
from bson import ObjectId
from typing import Optional
app = FastAPI(title="MongoDB API")
# Connexion MongoDB
client = MongoClient("mongodb://localhost:27017")
db = client["data_engineering"]
collection = db["pipelines"]
# Helper pour ObjectId
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v, field):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
# Modèle
class Pipeline(BaseModel):
id: Optional[str] = Field(default=None, alias="_id")
nom: str
description: str
schedule: str
class Config:
populate_by_name = True
# CRUD
@app.get("/pipelines")
def list_pipelines():
pipelines = []
for doc in collection.find():
doc["_id"] = str(doc["_id"])
pipelines.append(doc)
return pipelines
@app.post("/pipelines")
def create_pipeline(pipeline: Pipeline):
result = collection.insert_one(pipeline.model_dump(exclude={"id"}))
return {"id": str(result.inserted_id)}
@app.get("/pipelines/{pipeline_id}")
def get_pipeline(pipeline_id: str):
doc = collection.find_one({"_id": ObjectId(pipeline_id)})
if not doc:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
doc["_id"] = str(doc["_id"])
return doc
@app.delete("/pipelines/{pipeline_id}")
def delete_pipeline(pipeline_id: str):
result = collection.delete_one({"_id": ObjectId(pipeline_id)})
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
return {"deleted": True}Utile pour les webhooks et le déclenchement de pipelines.
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def run_etl_pipeline(pipeline_id: int):
"""Simule l'exécution d'un pipeline ETL."""
print(f" Démarrage du pipeline {pipeline_id}")
time.sleep(10) # Simule un traitement long
print(f" Pipeline {pipeline_id} terminé")
@app.post("/pipelines/{pipeline_id}/run")
def trigger_pipeline(pipeline_id: int, background_tasks: BackgroundTasks):
"""Déclenche un pipeline en arrière-plan."""
# Ajoute la tâche en arrière-plan
background_tasks.add_task(run_etl_pipeline, pipeline_id)
# Répond immédiatement
return {
"message": f"Pipeline {pipeline_id} déclenché",
"status": "running"
}@app.post("/webhook/data-arrival")
def webhook_data_arrival(
payload: dict,
background_tasks: BackgroundTasks
):
"""Webhook appelé quand de nouvelles données arrivent."""
source = payload.get("source")
file_path = payload.get("file_path")
# Déclencher le pipeline approprié
background_tasks.add_task(
process_new_data,
source=source,
file_path=file_path
)
return {"status": "accepted", "message": "Pipeline déclenché"}Permet d’ingérer des fichiers (CSV, JSON, Parquet) via API.
from fastapi import FastAPI, UploadFile, File
import pandas as pd
import io
app = FastAPI()
@app.post("/upload/csv")
async def upload_csv(file: UploadFile = File(...)):
"""Upload et analyse un fichier CSV."""
# Vérifier l'extension
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="Fichier CSV requis")
# Lire le contenu
contents = await file.read()
# Charger dans Pandas
df = pd.read_csv(io.StringIO(contents.decode('utf-8')))
return {
"filename": file.filename,
"rows": len(df),
"columns": list(df.columns),
"dtypes": df.dtypes.astype(str).to_dict(),
"preview": df.head(5).to_dict(orient="records")
}
@app.post("/upload/batch")
async def upload_multiple(files: list[UploadFile] = File(...)):
"""Upload plusieurs fichiers."""
results = []
for file in files:
results.append({
"filename": file.filename,
"size": len(await file.read())
})
return {"uploaded": len(files), "files": results}FastAPI génère automatiquement une documentation interactive !
| URL | Type | Description |
|---|---|---|
/docs |
Swagger UI | Interface interactive pour tester |
/redoc |
ReDoc | Documentation lisible |
/openapi.json |
OpenAPI | Schéma JSON de l’API |
from fastapi import FastAPI
app = FastAPI(
title="Data Pipeline API",
description="""
## API pour gérer les pipelines de données
Cette API permet de :
- Gérer les pipelines ETL
- Consulter les données
- Déclencher des exécutions
### Authentification
Utilise un token Bearer dans le header `Authorization`.
""",
version="1.0.0",
contact={
"name": "Data Team",
"email": "data@company.com"
},
license_info={
"name": "MIT"
}
)
# Documenter un endpoint
@app.get(
"/pipelines",
summary="Liste les pipelines",
description="Retourne tous les pipelines avec pagination",
response_description="Liste des pipelines",
tags=["Pipelines"]
)
def list_pipelines():
passVoici un fichier complet à copier et tester :
# pipeline_api.py
"""
API FastAPI pour Data Engineering
Lancer avec : uvicorn pipeline_api:app --reload
"""
from fastapi import FastAPI, HTTPException, Query, BackgroundTasks, UploadFile, File
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import pandas as pd
import io
import time
# ============ APPLICATION ============
app = FastAPI(
title="Data Pipeline API",
description="API pour gérer et monitorer les pipelines de données",
version="1.0.0"
)
# ============ MODÈLES ============
class PipelineCreate(BaseModel):
nom: str = Field(..., min_length=3, max_length=100)
description: Optional[str] = None
schedule: str = Field(..., description="Expression cron")
source: str = Field(..., description="Source de données")
destination: str = Field(..., description="Destination")
class Pipeline(PipelineCreate):
id: int
status: str = "idle"
last_run: Optional[datetime] = None
created_at: datetime
class RunResult(BaseModel):
pipeline_id: int
status: str
rows_processed: int
duration_seconds: float
timestamp: datetime
# ============ STORAGE (simulé) ============
pipelines_db: dict[int, dict] = {}
runs_history: list[dict] = []
next_id = 1
# ============ HELPERS ============
def simulate_pipeline_run(pipeline_id: int):
"""Simule l'exécution d'un pipeline."""
import random
pipeline = pipelines_db.get(pipeline_id)
if not pipeline:
return
pipeline["status"] = "running"
start = time.time()
# Simulation
time.sleep(random.uniform(1, 3))
rows = random.randint(1000, 100000)
duration = time.time() - start
pipeline["status"] = "idle"
pipeline["last_run"] = datetime.now()
# Enregistrer le run
runs_history.append({
"pipeline_id": pipeline_id,
"status": "success",
"rows_processed": rows,
"duration_seconds": round(duration, 2),
"timestamp": datetime.now()
})
# ============ ENDPOINTS ============
@app.get("/", tags=["Health"])
def root():
return {"status": "healthy", "service": "Data Pipeline API"}
# --- CRUD Pipelines ---
@app.post("/pipelines", response_model=Pipeline, status_code=201, tags=["Pipelines"])
def create_pipeline(pipeline: PipelineCreate):
"""Crée un nouveau pipeline."""
global next_id
new_pipeline = {
"id": next_id,
**pipeline.model_dump(),
"status": "idle",
"last_run": None,
"created_at": datetime.now()
}
pipelines_db[next_id] = new_pipeline
next_id += 1
return new_pipeline
@app.get("/pipelines", response_model=list[Pipeline], tags=["Pipelines"])
def list_pipelines(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
status: Optional[str] = None
):
"""Liste tous les pipelines."""
pipelines = list(pipelines_db.values())
if status:
pipelines = [p for p in pipelines if p["status"] == status]
return pipelines[skip:skip + limit]
@app.get("/pipelines/{pipeline_id}", response_model=Pipeline, tags=["Pipelines"])
def get_pipeline(pipeline_id: int):
"""Récupère un pipeline par ID."""
if pipeline_id not in pipelines_db:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
return pipelines_db[pipeline_id]
@app.delete("/pipelines/{pipeline_id}", status_code=204, tags=["Pipelines"])
def delete_pipeline(pipeline_id: int):
"""Supprime un pipeline."""
if pipeline_id not in pipelines_db:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
del pipelines_db[pipeline_id]
# --- Exécution ---
@app.post("/pipelines/{pipeline_id}/run", tags=["Execution"])
def run_pipeline(pipeline_id: int, background_tasks: BackgroundTasks):
"""Déclenche l'exécution d'un pipeline."""
if pipeline_id not in pipelines_db:
raise HTTPException(status_code=404, detail="Pipeline non trouvé")
pipeline = pipelines_db[pipeline_id]
if pipeline["status"] == "running":
raise HTTPException(status_code=409, detail="Pipeline déjà en cours")
background_tasks.add_task(simulate_pipeline_run, pipeline_id)
return {"message": f"Pipeline {pipeline_id} démarré", "status": "running"}
@app.get("/pipelines/{pipeline_id}/runs", response_model=list[RunResult], tags=["Execution"])
def get_pipeline_runs(pipeline_id: int, limit: int = 10):
"""Historique des exécutions d'un pipeline."""
runs = [r for r in runs_history if r["pipeline_id"] == pipeline_id]
return runs[-limit:]
# --- Data ---
@app.post("/upload", tags=["Data"])
async def upload_data(file: UploadFile = File(...)):
"""Upload un fichier de données."""
contents = await file.read()
if file.filename.endswith('.csv'):
df = pd.read_csv(io.StringIO(contents.decode('utf-8')))
elif file.filename.endswith('.json'):
df = pd.read_json(io.StringIO(contents.decode('utf-8')))
else:
raise HTTPException(status_code=400, detail="Format non supporté")
return {
"filename": file.filename,
"rows": len(df),
"columns": list(df.columns),
"preview": df.head().to_dict(orient="records")
}
# ============ MAIN ============
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)# Terminal 1 : Lancer le serveur
uvicorn pipeline_api:app --reload
# Terminal 2 : Tester avec curl
# Créer un pipeline
curl -X POST "http://localhost:8000/pipelines" \
-H "Content-Type: application/json" \
-d '{"nom": "ETL Users", "schedule": "0 * * * *", "source": "postgres", "destination": "bigquery"}'
# Lister les pipelines
curl "http://localhost:8000/pipelines"
# Déclencher un pipeline
curl -X POST "http://localhost:8000/pipelines/1/run"
# Voir l'historique
curl "http://localhost:8000/pipelines/1/runs"my_api/
├── app/
│ ├── __init__.py
│ ├── main.py # Point d'entrée FastAPI
│ ├── config.py # Configuration (env vars)
│ ├── database.py # Connexion DB
│ ├── models/ # Modèles SQLAlchemy/Pydantic
│ │ ├── __init__.py
│ │ └── pipeline.py
│ ├── schemas/ # Schémas Pydantic (request/response)
│ │ ├── __init__.py
│ │ └── pipeline.py
│ ├── routers/ # Endpoints par domaine
│ │ ├── __init__.py
│ │ ├── pipelines.py
│ │ └── data.py
│ └── services/ # Logique métier
│ ├── __init__.py
│ └── etl.py
├── tests/
│ ├── __init__.py
│ └── test_pipelines.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── README.md
| ✅ | Item |
|---|---|
| ☐ | Variables d’environnement pour les secrets |
| ☐ | Validation des entrées (Pydantic) |
| ☐ | Gestion des erreurs (HTTPException) |
| ☐ | Logging structuré |
| ☐ | Rate limiting |
| ☐ | CORS configuré |
| ☐ | Tests automatisés |
| ☐ | Documentation à jour |
| ☐ | Health check endpoint |
| Concept | Description |
|---|---|
| FastAPI | Framework moderne pour APIs REST |
| Pydantic | Validation automatique des données |
| Endpoints CRUD | GET, POST, PUT, PATCH, DELETE |
| Path/Query params | Paramètres d’URL et de requête |
| Pandas + API | Servir des données via HTTP |
| Background tasks | Exécuter des tâches asynchrones |
| Documentation | Swagger automatique |
| Cas | Exemple |
|---|---|
| Exposer des données | API pour dashboard/BI |
| Webhook | Déclencher un pipeline à l’arrivée de données |
| Validation | Vérifier les données avant ingestion |
| Monitoring | API de statut des pipelines |
| ML Serving | Exposer des prédictions |
🎉 Félicitations ! Tu maîtrises maintenant FastAPI pour le Data Engineering.