Voir le code
# Installation des packages (à exécuter une seule fois)
!pip install pandas numpy requests python-dotenv pytest pandera pyarrow openpyxl matplotlib seabornCe module couvre le traitement de données avancé avec Python : Pandas, visualisation, APIs, et pipelines ETL.
| Niveau | Compétence |
|---|---|
| ✅ Requis | Avoir suivi le module 04_python_basics_for_data_engineers |
| ✅ Requis | Maîtriser les bases de Python (variables, fonctions, boucles) |
| ✅ Requis | Savoir utiliser pip et les environnements virtuels |
À la fin de ce notebook, tu seras capable de :
Avant de commencer, assurons-nous d’avoir toutes les librairies nécessaires.
# Imports de base
import pandas as pd
import numpy as np
import json
import requests
from datetime import datetime
import time
import logging
import re
from pathlib import Path
# Configuration de l'affichage
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', None)
print("✅ Imports réussis !")
print(f"Version Pandas : {pd.__version__}")
print(f"Version NumPy : {np.__version__}")Pandas est LA librairie incontournable pour manipuler des données tabulaires en Python.
# Informations générales
print("📋 Informations du DataFrame :")
print(df.info())
print("\n" + "="*50)
# Statistiques descriptives
print("\n📊 Statistiques descriptives :")
print(df.describe())
# Premières lignes
print("\n🔝 Premières lignes :")
print(df.head(3))
# Dernières lignes
print("\n🔚 Dernières lignes :")
print(df.tail(2))# Stratégies de gestion des valeurs manquantes
# 1. Supprimer les lignes avec des NaN
df_drop = df.dropna()
print("🗑️ Après suppression des lignes avec NaN :")
print(df_drop)
# 2. Remplir avec une valeur par défaut
df_fill = df.fillna({
'age': df['age'].median(),
'salaire': df['salaire'].mean()
})
print("\n✨ Après remplissage des NaN :")
print(df_fill)
# 3. Forward fill (propager la valeur précédente)
df_ffill = df.fillna(method='ffill')
print("\n➡️ Après forward fill :")
print(df_ffill)# Supprimer les doublons
df_with_duplicates = pd.DataFrame({
'id': [1, 2, 3, 2, 4],
'nom': ['Alice', 'Bob', 'Charlie', 'Bob', 'David']
})
print("Avant suppression des doublons :")
print(df_with_duplicates)
df_no_duplicates = df_with_duplicates.drop_duplicates()
print("\nAprès suppression :")
print(df_no_duplicates)# Utilisons le DataFrame nettoyé
df_clean = df_fill.copy()
# Sélectionner une colonne
print("📌 Colonne 'nom' :")
print(df_clean['nom'])
# Sélectionner plusieurs colonnes
print("\n📌 Colonnes 'nom' et 'ville' :")
print(df_clean[['nom', 'ville']])
# Filtrer les lignes
print("\n🔍 Employés de Paris :")
print(df_clean[df_clean['ville'] == 'Paris'])
# Filtres multiples
print("\n🔍 Employés de Paris avec salaire > 50000 :")
print(df_clean[(df_clean['ville'] == 'Paris') & (df_clean['salaire'] > 50000)])# Indexation avancée avec loc et iloc
# loc : par label/nom
print("📍 loc[0, 'nom'] :")
print(df_clean.loc[0, 'nom'])
# iloc : par position numérique
print("\n📍 iloc[0, 0] (première ligne, première colonne) :")
print(df_clean.iloc[0, 0])
# Sélection de plages
print("\n📍 loc[0:2, ['nom', 'age']] :")
print(df_clean.loc[0:2, ['nom', 'age']])# Grouper par ville et calculer des statistiques
print("📊 Statistiques par ville :")
grouped = df_clean.groupby('ville').agg({
'nom': 'count',
'age': ['mean', 'min', 'max'],
'salaire': ['mean', 'sum']
})
print(grouped)
# Renommer les colonnes pour plus de clarté
print("\n📊 Salaire moyen par ville :")
salaire_moyen = df_clean.groupby('ville')['salaire'].mean().round(2)
print(salaire_moyen)# Créer une colonne calculée
# Méthode 1 : Apply (plus lent mais flexible)
def categoriser_age(age):
if age < 30:
return 'Junior'
elif age < 40:
return 'Senior'
else:
return 'Expert'
df_clean['categorie_apply'] = df_clean['age'].apply(categoriser_age)
# Méthode 2 : Vectorisation (plus rapide)
df_clean['categorie_vect'] = pd.cut(
df_clean['age'],
bins=[0, 30, 40, 100],
labels=['Junior', 'Senior', 'Expert']
)
print("🔧 Colonnes calculées :")
print(df_clean[['nom', 'age', 'categorie_apply', 'categorie_vect']])# Comparaison de performance (sur un grand dataset)
import time
# Créer un grand DataFrame
big_df = pd.DataFrame({
'valeur': np.random.randint(1, 100, 100000)
})
# Méthode Apply
start = time.time()
big_df['double_apply'] = big_df['valeur'].apply(lambda x: x * 2)
time_apply = time.time() - start
# Méthode Vectorisée
start = time.time()
big_df['double_vect'] = big_df['valeur'] * 2
time_vect = time.time() - start
print(f"⏱️ Temps Apply : {time_apply:.4f}s")
print(f"⏱️ Temps Vectorisation : {time_vect:.4f}s")
print(f"🚀 Vectorisation est {time_apply/time_vect:.1f}x plus rapide !")# Optimiser les types de données
df_optimized = df_clean.copy()
# Avant optimisation
print("Avant optimisation :")
print(df_optimized.dtypes)
print(f"Mémoire : {df_optimized.memory_usage(deep=True).sum() / 1024:.2f} KB")
# Convertir en types plus efficaces
df_optimized['age'] = df_optimized['age'].astype('int8')
df_optimized['salaire'] = df_optimized['salaire'].astype('int32')
df_optimized['ville'] = df_optimized['ville'].astype('category')
print("\nAprès optimisation :")
print(df_optimized.dtypes)
print(f"Mémoire : {df_optimized.memory_usage(deep=True).sum() / 1024:.2f} KB")# Créer un DataFrame avec des dates
df_dates = pd.DataFrame({
'date_str': ['2024-01-15', '2024-02-20', '2024-03-10', '2024-04-05'],
'montant': [1000, 1500, 1200, 1800]
})
# Convertir en datetime
df_dates['date'] = pd.to_datetime(df_dates['date_str'])
# Extraire des composantes
df_dates['annee'] = df_dates['date'].dt.year
df_dates['mois'] = df_dates['date'].dt.month
df_dates['jour'] = df_dates['date'].dt.day
df_dates['nom_mois'] = df_dates['date'].dt.month_name()
df_dates['jour_semaine'] = df_dates['date'].dt.day_name()
print("📅 DataFrame avec dates extraites :")
print(df_dates)# Calculs avec les dates
df_dates['jours_depuis_debut'] = (df_dates['date'] - df_dates['date'].min()).dt.days
# Ajouter/soustraire des périodes
df_dates['date_plus_30j'] = df_dates['date'] + pd.Timedelta(days=30)
df_dates['date_moins_1mois'] = df_dates['date'] - pd.DateOffset(months=1)
print(" Calculs de dates :")
print(df_dates[['date', 'jours_depuis_debut', 'date_plus_30j', 'date_moins_1mois']])# Export CSV
df_clean.to_csv('employes_clean.csv', index=False)
print("✅ Export CSV réussi")
# Export JSON
df_clean.to_json('employes_clean.json', orient='records', indent=2)
print("✅ Export JSON réussi")
# Export Parquet (format columnar, très efficace)
df_clean.to_parquet('employes_clean.parquet', index=False)
print("✅ Export Parquet réussi")
# Export Excel
df_clean.to_excel('employes_clean.xlsx', index=False, sheet_name='Employés')
print("✅ Export Excel réussi")Objectif : Analyser un fichier de ventes
import pandas as pd
import numpy as np
# 1. Créer un DataFrame avec des données de ventes
np.random.seed(42)
dates = pd.date_range(start='2024-01-01', periods=100, freq='D')
produits = ['Laptop', 'Téléphone', 'Tablette', 'Casque', 'Souris']
ventes_data = {
'date': np.random.choice(dates, 200),
'produit': np.random.choice(produits, 200),
'quantite': np.random.randint(1, 20, 200),
'prix_unitaire': np.random.choice([999, 599, 449, 79, 29], 200)
}
df_ventes = pd.DataFrame(ventes_data)
df_ventes['montant'] = df_ventes['quantite'] * df_ventes['prix_unitaire']
# 2. Chiffre d'affaires total
ca_total = df_ventes['montant'].sum()
print(f"💰 CA total : {ca_total:,.0f} €")
# 3. Produit le plus vendu
produit_top = df_ventes.groupby('produit')['quantite'].sum().idxmax()
print(f"🏆 Produit top : {produit_top}")
# 4. Ventes mensuelles
df_ventes['mois'] = pd.to_datetime(df_ventes['date']).dt.to_period('M')
print(df_ventes.groupby('mois')['montant'].sum())
# 5. Export CSV
df_ventes.to_csv('ventes_analyse.csv', index=False)En 2024-2025, de nombreux outils permettent d’automatiser l’exploration des données et de générer des rapports complets en quelques lignes de code. Ces outils sont un gain de temps énorme pour les Data Engineers.
| Outil | Type | Points forts | Quand l’utiliser |
|---|---|---|---|
| Julius.ai | IA Cloud | Analyse en langage naturel, pas de code | Exploration rapide, non-techniques |
| ydata-profiling | Librairie | Rapport HTML complet, alertes | Premier aperçu d’un dataset |
| sweetviz | Librairie | Comparaison train/test, beau design | Comparer deux datasets |
| D-Tale | App Web | Interface interactive type Excel | Exploration interactive |
| Pygwalker | Librairie | Interface Tableau dans Jupyter | Visualisation drag & drop |
💡 Ces outils ne remplacent pas Pandas, mais accélèrent la phase d’exploration.
Julius.ai est une plateforme d’IA qui permet d’analyser des données en langage naturel, sans écrire de code.
👉 julius.ai — Gratuit avec limitations, plans payants disponibles
| Fonctionnalité | Description |
|---|---|
| Upload de fichiers | CSV, Excel, JSON, bases de données |
| Questions en français | “Quelle est la moyenne des salaires par ville ?” |
| Génération de code | Python/Pandas généré automatiquement |
| Visualisations | Graphiques créés à la demande |
| Export | Code Python, graphiques, rapports |
- "Montre-moi les 10 premières lignes"
- "Combien de valeurs manquantes par colonne ?"
- "Crée un graphique des ventes par mois"
- "Quelle est la corrélation entre age et salaire ?"
- "Nettoie les doublons et les valeurs aberrantes"
- "Génère un rapport de qualité des données"
| Situation | Comment Julius aide |
|---|---|
| Nouveau dataset inconnu | Exploration rapide sans code |
| Réunion avec non-techniques | Démo interactive |
| Prototypage rapide | Générer du code Pandas à réutiliser |
| Debugging | “Pourquoi j’ai des NaN dans cette colonne ?” |
💡 Astuce : Utilise Julius pour explorer, puis copie le code Python généré dans ton pipeline !
ydata-profiling (anciennement pandas-profiling) génère un rapport HTML interactif complet sur ton DataFrame.
# Installation (décommenter si nécessaire)
# !pip install ydata-profiling
from ydata_profiling import ProfileReport
# Créer un dataset d'exemple
df_exemple = pd.DataFrame({
'nom': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Henry'],
'age': [25, 30, 35, None, 28, 45, 32, 29],
'ville': ['Paris', 'Lyon', 'Paris', 'Marseille', 'Lyon', 'Paris', 'Lyon', 'Paris'],
'salaire': [45000, 55000, 60000, 50000, None, 75000, 52000, 48000],
'experience': [2, 5, 8, 3, 4, 15, 7, 3],
'date_embauche': pd.to_datetime(['2022-01-15', '2019-06-20', '2016-03-10',
'2021-09-01', '2020-04-15', '2010-01-01',
'2017-08-20', '2021-11-30'])
})
# Générer le rapport (mode minimal pour rapidité)
profile = ProfileReport(
df_exemple,
title="Rapport Employés",
minimal=True, # Mode rapide
explorative=True
)
# Afficher dans le notebook
profile.to_notebook_iframe()
# Ou sauvegarder en HTML
# profile.to_file("rapport_employes.html")| Section | Contenu |
|---|---|
| Overview | Nombre de lignes, colonnes, types, taille mémoire |
| Variables | Stats par colonne (min, max, mean, distribution) |
| Interactions | Corrélations entre variables |
| Correlations | Matrices de corrélation (Pearson, Spearman) |
| Missing values | Visualisation des valeurs manquantes |
| Duplicates | Détection des doublons |
| Alerts | ⚠️ Alertes automatiques (haute cardinalité, skewness, etc.) |
# Rapport complet (plus lent)
profile = ProfileReport(df, minimal=False)
# Comparer deux datasets
profile_train = ProfileReport(df_train, title="Train")
profile_test = ProfileReport(df_test, title="Test")
comparison = profile_train.compare(profile_test)
comparison.to_file("comparison.html")
# Exclure certaines analyses (plus rapide)
profile = ProfileReport(
df,
correlations=None, # Désactiver les corrélations
interactions=None # Désactiver les interactions
)Sweetviz est spécialisé dans la comparaison de datasets (train vs test, avant vs après nettoyage).
# Comparaison de deux datasets (ex: train vs test)
df_train = df_exemple.iloc[:5]
df_test = df_exemple.iloc[5:]
# Générer le rapport de comparaison
comparison_report = sv.compare([df_train, "Train"], [df_test, "Test"])
comparison_report.show_notebook()
# Analyse avec variable cible (pour ML)
# report = sv.analyze(df, target_feat="salaire")| Fonctionnalité | Description |
|---|---|
| Comparaison côte à côte | Voir les différences entre 2 datasets |
| Variable cible | Analyse par rapport à une target (ML) |
| Design moderne | Rapports visuellement attractifs |
| Rapide | Plus léger que ydata-profiling |
| Critère | ydata-profiling | Sweetviz |
|---|---|---|
| Profondeur d’analyse | ⭐⭐⭐ Très détaillé | ⭐⭐ Essentiel |
| Vitesse | 🐢 Plus lent | 🐇 Plus rapide |
| Comparaison | ✅ Possible | ⭐⭐⭐ Excellent |
| Design | Classique | Moderne |
| Alertes | ✅ Oui | ❌ Non |
D-Tale lance une interface web interactive pour explorer tes données comme dans Excel/Google Sheets, mais avec la puissance de Python derrière.
┌─────────────────────────────────────────────────────────────────┐
│ D-Tale [Export] [Code]│
├─────────────────────────────────────────────────────────────────┤
│ [Filters] [Sort] [Charts] [Correlations] [Describe] [Missing] │
├─────────────────────────────────────────────────────────────────┤
│ nom │ age │ ville │ salaire │ experience │ │
│───────────┼───────┼──────────┼─────────┼────────────│ │
│ Alice │ 25 │ Paris │ 45000 │ 2 │ │
│ Bob │ 30 │ Lyon │ 55000 │ 5 │ │
│ Charlie │ 35 │ Paris │ 60000 │ 8 │ │
│ ... │ ... │ ... │ ... │ ... │ │
└─────────────────────────────────────────────────────────────────┘
| Action | Comment |
|---|---|
| Filtrer | Cliquer sur une colonne → Filter |
| Trier | Cliquer sur l’en-tête de colonne |
| Graphiques | Menu Charts → choisir le type |
| Stats | Menu Describe → stats par colonne |
| Exporter le code | Bouton “Code Export” → copier le Pandas généré |
💡 Killer feature : D-Tale génère le code Pandas de toutes tes manipulations !
Pygwalker transforme ton DataFrame en une interface drag & drop comme Tableau/Power BI, directement dans Jupyter.
| Situation | Outil recommandé |
|---|---|
| Premier aperçu rapide d’un dataset | ydata-profiling (minimal=True) |
| Comparer data1 vs data2 | Sweetviz |
| Exploration interactive (comme Excel) | D-Tale |
| Créer des graphiques sans code | Pygwalker |
| Poser des questions en français | Julius.ai |
| Données sensibles (pas de cloud) | D-Tale ou ydata-profiling (tout local) |
| Générer du code Pandas | Julius.ai ou D-Tale |
| ✅ Faire | ❌ Éviter |
|---|---|
| Utiliser ces outils pour explorer | Les utiliser en production |
| Copier le code généré dans ton pipeline | Dépendre de l’interface pour le traitement |
| Partager les rapports HTML avec l’équipe | Envoyer des données sensibles sur Julius.ai |
| Combiner plusieurs outils | Se limiter à un seul |
💡 Workflow recommandé : 1. Julius.ai pour les premières questions 2. ydata-profiling pour un rapport complet 3. D-Tale pour explorer interactivement 4. Copier le code dans ton pipeline Pandas
Matplotlib est la bibliothèque de visualisation de base en Python. Elle permet de créer des graphiques de haute qualité et est la fondation de nombreuses autres bibliothèques de visualisation.
# Installation de Matplotlib (si nécessaire)
!pip install matplotlib
# Import de Matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# Configuration pour afficher les graphiques dans le notebook
%matplotlib inline
# Configuration du style
plt.style.use('seaborn-v0_8-whitegrid') # Style plus moderne
plt.rcParams['figure.figsize'] = [10, 6] # Taille par défaut
plt.rcParams['font.size'] = 12
print("✅ Matplotlib importé avec succès !")
print(f"Version Matplotlib : {plt.matplotlib.__version__}")# Données pour un graphique linéaire
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
# Création du graphique
plt.figure(figsize=(12, 6))
plt.plot(x, y1, label='Sin(x)', color='blue', linewidth=2)
plt.plot(x, y2, label='Cos(x)', color='red', linestyle='--', linewidth=2)
# Personnalisation
plt.title('Fonctions trigonométriques', fontsize=16, fontweight='bold')
plt.xlabel('x', fontsize=12)
plt.ylabel('y', fontsize=12)
plt.legend(loc='upper right')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()# Données de ventes par mois
mois = ['Jan', 'Fév', 'Mar', 'Avr', 'Mai', 'Juin']
ventes_2023 = [1200, 1500, 1800, 1600, 2000, 2200]
ventes_2024 = [1400, 1700, 1900, 1800, 2300, 2500]
x = np.arange(len(mois))
width = 0.35
fig, ax = plt.subplots(figsize=(12, 6))
# Barres groupées
bars1 = ax.bar(x - width/2, ventes_2023, width, label='2023', color='steelblue')
bars2 = ax.bar(x + width/2, ventes_2024, width, label='2024', color='coral')
# Personnalisation
ax.set_title('Comparaison des ventes 2023 vs 2024', fontsize=16, fontweight='bold')
ax.set_xlabel('Mois', fontsize=12)
ax.set_ylabel('Ventes (€)', fontsize=12)
ax.set_xticks(x)
ax.set_xticklabels(mois)
ax.legend()
# Ajouter les valeurs sur les barres
for bar in bars1:
height = bar.get_height()
ax.annotate(f'{height}', xy=(bar.get_x() + bar.get_width()/2, height),
xytext=(0, 3), textcoords='offset points', ha='center', fontsize=9)
plt.tight_layout()
plt.show()# Données aléatoires avec corrélation
np.random.seed(42)
x = np.random.randn(100)
y = 2 * x + np.random.randn(100) * 0.5
colors = np.random.rand(100)
sizes = np.random.rand(100) * 200
# Scatter plot avec couleurs et tailles variables
plt.figure(figsize=(10, 8))
scatter = plt.scatter(x, y, c=colors, s=sizes, alpha=0.6, cmap='viridis')
# Ajouter une ligne de tendance
z = np.polyfit(x, y, 1)
p = np.poly1d(z)
plt.plot(x, p(x), 'r--', linewidth=2, label=f'Tendance: y = {z[0]:.2f}x + {z[1]:.2f}')
plt.colorbar(scatter, label='Valeur')
plt.title('Nuage de points avec ligne de tendance', fontsize=16, fontweight='bold')
plt.xlabel('Variable X')
plt.ylabel('Variable Y')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()# Données de distribution
np.random.seed(42)
data_normal = np.random.normal(loc=50, scale=10, size=1000)
data_skewed = np.random.exponential(scale=10, size=1000)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Histogramme distribution normale
axes[0].hist(data_normal, bins=30, color='steelblue', edgecolor='black', alpha=0.7)
axes[0].axvline(data_normal.mean(), color='red', linestyle='--', label=f'Moyenne: {data_normal.mean():.1f}')
axes[0].set_title('Distribution Normale', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Valeur')
axes[0].set_ylabel('Fréquence')
axes[0].legend()
# Histogramme distribution exponentielle
axes[1].hist(data_skewed, bins=30, color='coral', edgecolor='black', alpha=0.7)
axes[1].axvline(data_skewed.mean(), color='red', linestyle='--', label=f'Moyenne: {data_skewed.mean():.1f}')
axes[1].set_title('Distribution Exponentielle', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Valeur')
axes[1].set_ylabel('Fréquence')
axes[1].legend()
plt.tight_layout()
plt.show()# Données de répartition
categories = ['Produit A', 'Produit B', 'Produit C', 'Produit D', 'Autres']
parts = [35, 25, 20, 15, 5]
colors = ['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#ff99cc']
explode = (0.05, 0, 0, 0, 0) # Mettre en évidence le premier segment
plt.figure(figsize=(10, 8))
wedges, texts, autotexts = plt.pie(parts, labels=categories, colors=colors, explode=explode,
autopct='%1.1f%%', startangle=90, shadow=True)
# Améliorer l'apparence du texte
for autotext in autotexts:
autotext.set_fontsize(11)
autotext.set_fontweight('bold')
plt.title('Répartition des ventes par produit', fontsize=16, fontweight='bold')
plt.axis('equal') # Assure que le cercle est bien rond
plt.tight_layout()
plt.show()# Créer une grille de sous-graphiques
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Données
x = np.linspace(0, 10, 50)
# Graphique 1: Ligne
axes[0, 0].plot(x, np.sin(x), 'b-', linewidth=2)
axes[0, 0].set_title('Graphique linéaire')
axes[0, 0].set_xlabel('X')
axes[0, 0].set_ylabel('Sin(X)')
axes[0, 0].grid(True, alpha=0.3)
# Graphique 2: Barres
categories = ['A', 'B', 'C', 'D']
values = [23, 45, 56, 78]
axes[0, 1].bar(categories, values, color=['#ff6b6b', '#4ecdc4', '#45b7d1', '#96ceb4'])
axes[0, 1].set_title('Graphique à barres')
# Graphique 3: Scatter
x_scatter = np.random.rand(50)
y_scatter = np.random.rand(50)
axes[1, 0].scatter(x_scatter, y_scatter, c='purple', alpha=0.6, s=100)
axes[1, 0].set_title('Nuage de points')
# Graphique 4: Histogramme
data = np.random.randn(1000)
axes[1, 1].hist(data, bins=30, color='orange', edgecolor='black', alpha=0.7)
axes[1, 1].set_title('Histogramme')
plt.suptitle('Tableau de bord - Vue d\'ensemble', fontsize=16, fontweight='bold', y=1.02)
plt.tight_layout()
plt.show()# Créer un graphique à sauvegarder
fig, ax = plt.subplots(figsize=(10, 6))
x = np.linspace(0, 10, 100)
ax.plot(x, np.sin(x), 'b-', linewidth=2, label='Sin(x)')
ax.set_title('Graphique à exporter', fontsize=14)
ax.legend()
ax.grid(True, alpha=0.3)
# Sauvegarder dans différents formats
fig.savefig('graphique.png', dpi=300, bbox_inches='tight')
fig.savefig('graphique.pdf', bbox_inches='tight')
fig.savefig('graphique.svg', bbox_inches='tight')
print("✅ Graphiques sauvegardés en PNG, PDF et SVG")
plt.show()Objectif : Créer un tableau de bord de visualisation
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
np.random.seed(42)
mois = ['Jan', 'Fév', 'Mar', 'Avr', 'Mai', 'Juin']
data = {'mois': mois, 'ventes': np.random.randint(100, 300, 6),
'profit': np.random.randint(20, 80, 6)}
df = pd.DataFrame(data)
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
# Ligne
axes[0,0].plot(df['mois'], df['ventes'], marker='o')
axes[0,0].set_title('📈 Évolution mensuelle')
# Barres
axes[0,1].bar(df['mois'], df['ventes'], color='steelblue')
axes[0,1].set_title('📊 Ventes par mois')
# Scatter
axes[1,0].scatter(df['ventes'], df['profit'], c='coral', s=100)
axes[1,0].set_title('📍 Ventes vs Profit')
# Histogramme
axes[1,1].hist(df['profit'], bins=5, color='green', alpha=0.7)
axes[1,1].set_title('📊 Distribution profits')
plt.tight_layout()
plt.savefig('dashboard.png', dpi=150)
plt.show()Seaborn est une bibliothèque de visualisation basée sur Matplotlib qui offre une interface de haut niveau pour créer des graphiques statistiques attrayants. Elle est particulièrement utile pour l’exploration de données.
# Installation de Seaborn (si nécessaire)
!pip install seaborn
# Import de Seaborn
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
# Configuration du style
sns.set_theme(style='whitegrid', palette='muted')
plt.rcParams['figure.figsize'] = [10, 6]
print("✅ Seaborn importé avec succès !")
print(f"Version Seaborn : {sns.__version__}")# Histogramme avec KDE (Kernel Density Estimation)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Histogramme simple
sns.histplot(data=tips, x='total_bill', kde=True, ax=axes[0], color='steelblue')
axes[0].set_title('Distribution du montant total', fontsize=14, fontweight='bold')
# Histogramme avec hue (groupement)
sns.histplot(data=tips, x='total_bill', hue='time', kde=True, ax=axes[1])
axes[1].set_title('Distribution par moment de la journée', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()# Box plot - Distribution par catégorie
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Box plot simple
sns.boxplot(data=tips, x='day', y='total_bill', ax=axes[0], palette='Set2')
axes[0].set_title('Montant total par jour', fontsize=14, fontweight='bold')
# Box plot avec hue
sns.boxplot(data=tips, x='day', y='total_bill', hue='sex', ax=axes[1], palette='Set1')
axes[1].set_title('Montant total par jour et sexe', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()# Bar plot avec estimation statistique
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Bar plot avec intervalle de confiance
sns.barplot(data=tips, x='day', y='total_bill', ax=axes[0], palette='Blues_d', errorbar='ci')
axes[0].set_title('Montant moyen par jour (avec IC 95%)', fontsize=14, fontweight='bold')
# Count plot (compte les occurrences)
sns.countplot(data=tips, x='day', hue='time', ax=axes[1], palette='Set2')
axes[1].set_title('Nombre de repas par jour', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()# Scatter plot avec régression
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Scatter plot simple avec régression
sns.regplot(data=tips, x='total_bill', y='tip', ax=axes[0], color='coral')
axes[0].set_title('Relation montant/pourboire avec régression', fontsize=14, fontweight='bold')
# Scatter plot avec hue et style
sns.scatterplot(data=tips, x='total_bill', y='tip', hue='time', style='sex',
size='size', sizes=(50, 200), ax=axes[1], palette='Set1')
axes[1].set_title('Relation multidimensionnelle', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()# Matrice de corrélation
# Sélectionner uniquement les colonnes numériques
numeric_cols = tips.select_dtypes(include=[np.number])
correlation_matrix = numeric_cols.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
fmt='.2f', linewidths=0.5, square=True)
plt.title('Matrice de corrélation', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()# Heatmap de données pivotées
pivot_data = tips.pivot_table(values='tip', index='day', columns='time', aggfunc='mean')
plt.figure(figsize=(8, 6))
sns.heatmap(pivot_data, annot=True, fmt='.2f', cmap='YlOrRd', linewidths=0.5)
plt.title('Pourboire moyen par jour et moment', fontsize=14, fontweight='bold')
plt.show()# Explorer différents styles
styles = ['white', 'dark', 'whitegrid', 'darkgrid', 'ticks']
fig, axes = plt.subplots(1, 5, figsize=(20, 4))
for ax, style in zip(axes, styles):
with sns.axes_style(style):
sns.histplot(tips['total_bill'], ax=ax, color='steelblue')
ax.set_title(f"Style: {style}")
plt.tight_layout()
plt.show()# Palettes de couleurs
palettes = ['deep', 'muted', 'bright', 'pastel', 'dark', 'colorblind']
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for ax, palette in zip(axes, palettes):
sns.barplot(data=tips, x='day', y='total_bill', palette=palette, ax=ax)
ax.set_title(f"Palette: {palette}", fontsize=12)
plt.tight_layout()
plt.show()# Créer un tableau de bord d'analyse complet
sns.set_theme(style='whitegrid')
fig = plt.figure(figsize=(16, 12))
# Créer une grille personnalisée
gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)
# 1. Distribution des montants
ax1 = fig.add_subplot(gs[0, 0])
sns.histplot(data=tips, x='total_bill', kde=True, ax=ax1, color='steelblue')
ax1.set_title('Distribution des montants', fontweight='bold')
# 2. Distribution des pourboires
ax2 = fig.add_subplot(gs[0, 1])
sns.histplot(data=tips, x='tip', kde=True, ax=ax2, color='coral')
ax2.set_title('Distribution des pourboires', fontweight='bold')
# 3. Relation montant/pourboire
ax3 = fig.add_subplot(gs[0, 2])
sns.regplot(data=tips, x='total_bill', y='tip', ax=ax3, color='purple', scatter_kws={'alpha':0.5})
ax3.set_title('Montant vs Pourboire', fontweight='bold')
# 4. Boxplot par jour
ax4 = fig.add_subplot(gs[1, 0])
sns.boxplot(data=tips, x='day', y='total_bill', ax=ax4, palette='Set2')
ax4.set_title('Montants par jour', fontweight='bold')
# 5. Violin plot par temps
ax5 = fig.add_subplot(gs[1, 1])
sns.violinplot(data=tips, x='time', y='total_bill', hue='sex', split=True, ax=ax5, palette='muted')
ax5.set_title('Distribution par temps et sexe', fontweight='bold')
# 6. Count plot
ax6 = fig.add_subplot(gs[1, 2])
sns.countplot(data=tips, x='day', hue='time', ax=ax6, palette='Set1')
ax6.set_title('Nombre de repas', fontweight='bold')
# 7. Heatmap de corrélation (grande)
ax7 = fig.add_subplot(gs[2, :])
pivot = tips.pivot_table(values='tip', index='day', columns='size', aggfunc='mean')
sns.heatmap(pivot, annot=True, fmt='.2f', cmap='YlOrRd', ax=ax7, linewidths=0.5)
ax7.set_title('Pourboire moyen par jour et taille de groupe', fontweight='bold')
plt.suptitle('📊 Tableau de bord - Analyse des pourboires', fontsize=18, fontweight='bold', y=1.01)
plt.tight_layout()
plt.savefig('dashboard_seaborn.png', dpi=300, bbox_inches='tight')
print("✅ Dashboard sauvegardé en PNG")
plt.show()Objectif : Analyser le dataset ‘titanic’ de Seaborn
sns.load_dataset('titanic')import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
titanic = sns.load_dataset('titanic')
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Violin plot
sns.violinplot(data=titanic, x='class', y='age', hue='survived',
split=True, ax=axes[0,0])
axes[0,0].set_title('Âge par classe')
# Bar plot survie
titanic.groupby(['sex','class'])['survived'].mean().unstack().plot(
kind='bar', ax=axes[0,1])
axes[0,1].set_title('Survie par sexe/classe')
# Heatmap
sns.heatmap(titanic.select_dtypes(include=[np.number]).corr(),
annot=True, cmap='coolwarm', ax=axes[1,0])
# Scatter
sns.scatterplot(data=titanic, x='age', y='fare', hue='survived', ax=axes[1,1])
plt.tight_layout()
plt.savefig('titanic_dashboard.png')
plt.show()
# FacetGrid
g = sns.FacetGrid(titanic, col='sex', row='class', hue='survived')
g.map(sns.histplot, 'age')
g.add_legend()
plt.show()Le traitement de texte est essentiel en Data Engineering (logs, parsing, normalisation).
# Données textuelles brutes
textes = pd.DataFrame({
'texte': [
' BONJOUR ',
'Salut tout le monde!',
'Python_est_génial',
'Data-Engineering-2024'
]
})
# Nettoyage basique
textes['clean'] = textes['texte'].str.strip() # Supprimer espaces
textes['lower'] = textes['texte'].str.lower() # Minuscules
textes['upper'] = textes['texte'].str.upper() # Majuscules
textes['replace'] = textes['texte'].str.replace('_', ' ') # Remplacer
print("🧹 Nettoyage de texte :")
print(textes).str accessor)# Données d'exemple
df_text = pd.DataFrame({
'email': ['alice@example.com', 'bob@test.org', 'charlie@mail.fr'],
'nom_complet': ['Jean Dupont', 'Marie Martin', 'Pierre Durand'],
'telephone': ['0612345678', '06-98-76-54-32', '06 11 22 33 44']
})
# Vérifier si contient
df_text['email_gmail'] = df_text['email'].str.contains('gmail')
# Commencer/finir par
df_text['email_com'] = df_text['email'].str.endswith('.com')
# Extraire le domaine
df_text['domaine'] = df_text['email'].str.split('@').str[1]
# Séparer nom et prénom
df_text[['prenom', 'nom']] = df_text['nom_complet'].str.split(' ', expand=True)
# Longueur
df_text['longueur_nom'] = df_text['nom_complet'].str.len()
print("🔤 Méthodes string :")
print(df_text)import re
# Exemples de regex courantes
texte_test = """
Contact: alice@example.com ou bob@test.org
Téléphones: 06.12.34.56.78, 01-23-45-67-89
URL: https://www.example.com
Prix: 29.99€, 15.50€, 100€
"""
# Extraire les emails
emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', texte_test)
print("📧 Emails trouvés :")
print(emails)
# Extraire les téléphones
telephones = re.findall(r'\d{2}[-.\s]?\d{2}[-.\s]?\d{2}[-.\s]?\d{2}[-.\s]?\d{2}', texte_test)
print("\n📞 Téléphones trouvés :")
print(telephones)
# Extraire les URLs
urls = re.findall(r'https?://[^\s]+', texte_test)
print("\n🔗 URLs trouvées :")
print(urls)
# Extraire les prix
prix = re.findall(r'\d+\.?\d*€', texte_test)
print("\n💰 Prix trouvés :")
print(prix)# Validation avec regex
def valider_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
# Test
emails_test = ['alice@example.com', 'bob@invalid', 'charlie.fr', 'david@test.org']
for email in emails_test:
valide = "✅" if valider_email(email) else "❌"
print(f"{valide} {email}")# Exemple de logs Apache/Nginx
logs = """
192.168.1.1 - - [01/Dec/2024:10:15:30 +0000] "GET /api/users HTTP/1.1" 200 1234
192.168.1.2 - - [01/Dec/2024:10:16:45 +0000] "POST /api/login HTTP/1.1" 401 567
192.168.1.3 - - [01/Dec/2024:10:17:20 +0000] "GET /api/products HTTP/1.1" 200 8901
"""
# Pattern pour parser les logs
pattern = r'(\S+) - - \[([^\]]+)\] "(\S+) (\S+) (\S+)" (\d+) (\d+)'
# Extraire les informations
matches = re.findall(pattern, logs)
# Créer un DataFrame
df_logs = pd.DataFrame(matches, columns=[
'ip', 'timestamp', 'methode', 'endpoint', 'protocole', 'status', 'bytes'
])
# Convertir les types
df_logs['status'] = df_logs['status'].astype(int)
df_logs['bytes'] = df_logs['bytes'].astype(int)
print("📋 Logs parsés :")
print(df_logs)# Créer un fichier avec encodage spécifique
texte_accentue = "Voici du texte avec des accents : éàùô çñ"
# Sauvegarder en UTF-8
with open('test_utf8.txt', 'w', encoding='utf-8') as f:
f.write(texte_accentue)
# Sauvegarder en Latin-1
with open('test_latin1.txt', 'w', encoding='latin-1') as f:
f.write(texte_accentue)
# Lire avec le bon encodage
print("✅ Lecture UTF-8 :")
with open('test_utf8.txt', 'r', encoding='utf-8') as f:
print(f.read())
print("\n✅ Lecture Latin-1 :")
with open('test_latin1.txt', 'r', encoding='latin-1') as f:
print(f.read())Objectif : Nettoyer et valider des données clients
import pandas as pd
import re
df = pd.DataFrame({
'nom': [' alice DUPONT ', 'BOB martin', 'Charlie Brown'],
'email': ['alice@gmail.com', 'bob@invalid', 'charlie@test.fr'],
'telephone': ['06 12 34 56 78', '+33698765432', '06-11-22-33-44']
})
# Nettoyer noms
df['nom_clean'] = df['nom'].str.strip().str.title()
# Valider emails
email_re = r'^[\w.+-]+@[\w-]+\.[a-z]{2,}$'
df['email_ok'] = df['email'].apply(lambda x: bool(re.match(email_re, x)))
# Normaliser téléphones
def norm_tel(t):
d = re.sub(r'\D', '', t)
if d.startswith('33'): d = '0' + d[2:]
return ' '.join([d[i:i+2] for i in range(0,10,2)]) if len(d)==10 else None
df['tel_clean'] = df['telephone'].apply(norm_tel)
# Export valides
df[df['email_ok'] & df['tel_clean'].notna()].to_csv('clients_ok.csv', index=False)Les APIs sont une source de données majeure en Data Engineering.
import json
# Créer un dictionnaire Python
data = {
"nom": "Alice",
"age": 30,
"competences": ["Python", "SQL", "Pandas"],
"actif": True
}
# Convertir en JSON
json_str = json.dumps(data, indent=2)
print("JSON formaté :")
print(json_str)
# Reconvertir en dictionnaire
data_reloaded = json.loads(json_str)
print("\n Rechargé :")
print(data_reloaded)# Sauvegarder et lire des fichiers JSON
# Sauvegarder
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print("✅ JSON sauvegardé")
# Lire
with open('data.json', 'r', encoding='utf-8') as f:
data_loaded = json.load(f)
print("\n JSON chargé :")
print(data_loaded)requestsimport requests
# API publique gratuite : JSONPlaceholder
url = "https://jsonplaceholder.typicode.com/users"
# GET Request
response = requests.get(url)
# Vérifier le statut
print(f"Status code: {response.status_code}")
if response.status_code == 200:
users = response.json()
print(f"\n✅ {len(users)} utilisateurs récupérés")
print("\nPremier utilisateur :")
print(json.dumps(users[0], indent=2))
else:
print("❌ Erreur lors de la requête")def fetch_data_safe(url):
"""Récupère des données avec gestion d'erreurs"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # Lève une exception si statut >= 400
return response.json()
except requests.exceptions.Timeout:
print("⏱️ Timeout : le serveur met trop de temps à répondre")
return None
except requests.exceptions.HTTPError as e:
print(f"❌ Erreur HTTP : {e}")
return None
except requests.exceptions.RequestException as e:
print(f"❌ Erreur de connexion : {e}")
return None
# Test avec une URL valide
data = fetch_data_safe("https://jsonplaceholder.typicode.com/users/1")
if data:
print("✅ Données récupérées :")
print(json.dumps(data, indent=2))
# Test avec une URL invalide
data = fetch_data_safe("https://jsonplaceholder.typicode.com/invalid")# Exemple 1 : API Key dans les headers
headers = {
"Authorization": "Bearer YOUR_API_KEY_HERE",
"Content-Type": "application/json"
}
# response = requests.get(url, headers=headers)
# Exemple 2 : API Key dans les paramètres
params = {
"api_key": "YOUR_API_KEY_HERE",
"format": "json"
}
# response = requests.get(url, params=params)
# Exemple 3 : Basic Auth
from requests.auth import HTTPBasicAuth
# response = requests.get(url, auth=HTTPBasicAuth('username', 'password'))
print("💡 Les exemples ci-dessus montrent différentes méthodes d'authentification")def fetch_all_pages(base_url, max_pages=5):
"""Récupère toutes les pages d'une API paginée"""
all_data = []
for page in range(1, max_pages + 1):
url = f"{base_url}?_page={page}&_limit=10"
print(f" Récupération page {page}...")
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if not data: # Plus de données
break
all_data.extend(data)
else:
print(f"❌ Erreur page {page}")
break
return all_data
# Test avec JSONPlaceholder
posts = fetch_all_pages("https://jsonplaceholder.typicode.com/posts", max_pages=3)
print(f"\n✅ Total récupéré : {len(posts)} posts")import time
from datetime import datetime
def fetch_with_retry(url, max_retries=3, delay=2):
"""Récupère des données avec retry et backoff exponentiel"""
for attempt in range(max_retries):
try:
print(f"Tentative {attempt + 1}/{max_retries}")
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"❌ Erreur : {e}")
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt) # Backoff exponentiel
print(f"⏳ Attente de {wait_time}s avant nouvelle tentative...")
time.sleep(wait_time)
else:
print("❌ Échec après toutes les tentatives")
return None
# Test
data = fetch_with_retry("https://jsonplaceholder.typicode.com/users/1")
if data:
print("\n✅ Succès !")# Rate limiting simple
def fetch_with_rate_limit(urls, requests_per_second=2):
"""Récupère plusieurs URLs en respectant un rate limit"""
delay = 1.0 / requests_per_second
results = []
for url in urls:
start = time.time()
print(f"⏬ Récupération : {url}")
response = requests.get(url)
if response.status_code == 200:
results.append(response.json())
elapsed = time.time() - start
sleep_time = max(0, delay - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
return results
# Test
urls = [
"https://jsonplaceholder.typicode.com/users/1",
"https://jsonplaceholder.typicode.com/users/2",
"https://jsonplaceholder.typicode.com/users/3"
]
start_time = time.time()
results = fetch_with_rate_limit(urls, requests_per_second=1)
total_time = time.time() - start_time
print(f"\n✅ {len(results)} URLs récupérées en {total_time:.2f}s")# JSON complexe imbriqué
complex_json = {
"id": 1,
"nom": "Entreprise A",
"employes": [
{
"id": 101,
"nom": "Alice",
"competences": ["Python", "SQL"],
"adresse": {"ville": "Paris", "code_postal": "75001"}
},
{
"id": 102,
"nom": "Bob",
"competences": ["Java", "Docker"],
"adresse": {"ville": "Lyon", "code_postal": "69001"}
}
]
}
# Normaliser avec json_normalize
df_complex = pd.json_normalize(
complex_json,
record_path='employes',
meta=['nom'],
meta_prefix='entreprise_'
)
print("🔄 JSON normalisé :")
print(df_complex)Objectif : Récupérer et analyser des données d’une API publique
import requests
import pandas as pd
# 1-2. Récupérer les posts
posts = requests.get("https://jsonplaceholder.typicode.com/posts").json()
df_posts = pd.DataFrame(posts)
# 3. Posts par utilisateur
posts_count = df_posts.groupby('userId').size().reset_index(name='nb_posts')
posts_count = posts_count.sort_values('nb_posts', ascending=False)
# 4. Détails des 5 top users
top_5 = posts_count.head(5)['userId'].tolist()
users = []
for uid in top_5:
u = requests.get(f"https://jsonplaceholder.typicode.com/users/{uid}").json()
users.append({'userId': u['id'], 'name': u['name'], 'email': u['email']})
df_result = pd.DataFrame(users).merge(posts_count, on='userId')
print(df_result)
# 5. Export JSON
df_result.to_json('top_users.json', orient='records', indent=2)La validation des données est cruciale pour garantir leur qualité.
# Créer des données de test
test_data = pd.DataFrame({
'user_id': [1, 2, 3, 2, 5],
'email': ['alice@test.com', 'bob@test', None, 'bob@test', 'eve@test.com'],
'age': [25, 150, -5, 30, 28],
'salaire': [45000, 55000, 60000, 55000, None]
})
print("Données de test :")
print(test_data)
# Vérifications
print("\n Vérifications :")
print(f"Colonnes manquantes : {set(['user_id', 'email', 'age', 'salaire']) - set(test_data.columns)}")
print(f"Valeurs nulles : {test_data.isnull().sum().sum()}")
print(f"Doublons : {test_data.duplicated().sum()}")
print(f"Types : \n{test_data.dtypes}")class DataValidator:
"""Validateur simple pour DataFrames"""
def __init__(self, df):
self.df = df
self.errors = []
def check_columns(self, required_columns):
"""Vérifie présence des colonnes requises"""
missing = set(required_columns) - set(self.df.columns)
if missing:
self.errors.append(f"Colonnes manquantes: {missing}")
return False
return True
def check_nulls(self, max_null_pct=10):
"""Vérifie le pourcentage de valeurs nulles"""
null_pct = (self.df.isnull().sum() / len(self.df)) * 100
violations = null_pct[null_pct > max_null_pct]
if not violations.empty:
self.errors.append(f"Trop de nulls: {violations.to_dict()}")
return False
return True
def check_range(self, column, min_val, max_val):
"""Vérifie que les valeurs sont dans une plage"""
if column in self.df.columns:
violations = self.df[(self.df[column] < min_val) | (self.df[column] > max_val)]
if len(violations) > 0:
self.errors.append(f"{column}: {len(violations)} valeurs hors plage [{min_val}, {max_val}]")
return False
return True
def check_duplicates(self, subset=None):
"""Vérifie les doublons"""
duplicates = self.df.duplicated(subset=subset).sum()
if duplicates > 0:
self.errors.append(f"{duplicates} doublons trouvés")
return False
return True
def check_types(self, column, expected_type):
"""Vérifie le type d'une colonne"""
if column in self.df.columns:
if self.df[column].dtype != expected_type:
self.errors.append(f"{column}: type attendu {expected_type}, obtenu {self.df[column].dtype}")
return False
return True
def validate(self):
"""Retourne True si valide, False sinon"""
return len(self.errors) == 0
def report(self):
"""Génère un rapport de validation"""
return {
'is_valid': self.validate(),
'total_errors': len(self.errors),
'errors': self.errors
}
# Utilisation
validator = DataValidator(test_data)
validator.check_columns(['user_id', 'email', 'age'])
validator.check_nulls(max_null_pct=15)
validator.check_range('age', 0, 120)
validator.check_duplicates(subset=['user_id', 'email'])
report = validator.report()
print("\n Rapport de validation:")
print(json.dumps(report, indent=2))# Définir un schéma de validation
schema = {
'user_id': {'type': 'int64', 'nullable': False, 'unique': True},
'email': {'type': 'object', 'nullable': False, 'pattern': r'.+@.+\..+'},
'age': {'type': 'int64', 'nullable': False, 'min': 0, 'max': 120},
'salaire': {'type': 'int64', 'nullable': True, 'min': 0}
}
def validate_schema(df, schema):
"""Valide un DataFrame contre un schéma"""
errors = []
for column, rules in schema.items():
# Vérifier si la colonne existe
if column not in df.columns:
errors.append(f"Colonne manquante: {column}")
continue
# Vérifier les nulls
if not rules.get('nullable', True) and df[column].isnull().any():
errors.append(f"{column}: contient des valeurs nulles")
# Vérifier l'unicité
if rules.get('unique', False) and df[column].duplicated().any():
errors.append(f"{column}: contient des doublons")
# Vérifier la plage
if 'min' in rules:
violations = df[df[column] < rules['min']]
if len(violations) > 0:
errors.append(f"{column}: {len(violations)} valeurs < {rules['min']}")
if 'max' in rules:
violations = df[df[column] > rules['max']]
if len(violations) > 0:
errors.append(f"{column}: {len(violations)} valeurs > {rules['max']}")
# Vérifier le pattern (pour les strings)
if 'pattern' in rules:
pattern = rules['pattern']
invalid = df[column].dropna()[~df[column].dropna().str.match(pattern)]
if len(invalid) > 0:
errors.append(f"{column}: {len(invalid)} valeurs ne matchent pas le pattern")
return {
'is_valid': len(errors) == 0,
'errors': errors
}
# Test
result = validate_schema(test_data, schema)
print("\n📋 Validation avec schéma :")
print(json.dumps(result, indent=2))Objectif : Créer un validateur pour des transactions
import pandas as pd
import numpy as np
# 1. Créer transactions (avec erreurs)
np.random.seed(42)
df = pd.DataFrame({
'id': list(range(1,51)) + [25, 30], # doublons
'date': pd.date_range('2024-01-01', periods=52),
'montant': list(np.random.uniform(10, 500, 50)) + [-50, 0], # négatifs
'type': np.random.choice(['achat', 'remboursement'], 52)
})
# 2-4. Validation
erreurs = []
if (df['montant'] <= 0).any():
erreurs.append(f"❌ {(df['montant']<=0).sum()} montants invalides")
if df['id'].duplicated().any():
erreurs.append(f"❌ {df['id'].duplicated().sum()} doublons")
# 5. Rapport
print("📋 RAPPORT")
print(f"Valide: {len(erreurs)==0}")
for e in erreurs: print(e)
# Nettoyage
df_clean = df[(df['montant']>0) & ~df['id'].duplicated(keep='first')]
print(f"✅ {len(df_clean)}/{len(df)} lignes valides")Construisons un pipeline ETL complet en intégrant tous les concepts.
# Créer la structure de dossiers
from pathlib import Path
dirs = ['data/raw', 'data/processed', 'data/output', 'logs']
for dir_path in dirs:
Path(dir_path).mkdir(parents=True, exist_ok=True)
print("✅ Structure de dossiers créée")
print("\n📁 Structure :")
print("""
project/
├── data/
│ ├── raw/
│ ├── processed/
│ └── output/
└── logs/
""")import logging
from datetime import datetime
# Configuration du logging
log_file = f"logs/pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
logger = logging.getLogger('ETL_Pipeline')
logger.info("🚀 Pipeline démarré")# Configuration centralisée
class Config:
"""Configuration du pipeline"""
# Chemins
RAW_DATA_DIR = 'data/raw'
PROCESSED_DATA_DIR = 'data/processed'
OUTPUT_DIR = 'data/output'
# API
API_URL = 'https://jsonplaceholder.typicode.com'
API_TIMEOUT = 10
API_MAX_RETRIES = 3
# Validation
MAX_NULL_PCT = 10
# Export
EXPORT_FORMATS = ['csv', 'parquet', 'json']
config = Config()
logger.info("⚙️ Configuration chargée")def extract_from_api(url, max_retries=3):
"""Extrait des données depuis une API"""
logger.info(f"📥 Extraction depuis {url}")
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=config.API_TIMEOUT)
response.raise_for_status()
data = response.json()
logger.info(f"✅ {len(data)} enregistrements extraits")
return data
except Exception as e:
logger.warning(f"⚠️ Tentative {attempt + 1}/{max_retries} échouée: {e}")
if attempt == max_retries - 1:
logger.error("❌ Extraction échouée")
raise
time.sleep(2 ** attempt)
# Test extraction
users_data = extract_from_api(f"{config.API_URL}/users")
df_raw = pd.DataFrame(users_data)
# Sauvegarder les données brutes
raw_file = f"{config.RAW_DATA_DIR}/users_raw_{datetime.now().strftime('%Y%m%d')}.csv"
df_raw.to_csv(raw_file, index=False)
logger.info(f"💾 Données brutes sauvegardées: {raw_file}")def transform_data(df):
"""Transforme et nettoie les données"""
logger.info("🔄 Début de la transformation")
df_transformed = df.copy()
# 1. Normaliser les colonnes imbriquées
if 'address' in df.columns:
address_df = pd.json_normalize(df['address'])
address_df.columns = ['address_' + col for col in address_df.columns]
df_transformed = pd.concat([df_transformed.drop('address', axis=1), address_df], axis=1)
logger.info("✅ Colonnes adresse normalisées")
# 2. Nettoyer les noms de colonnes
df_transformed.columns = df_transformed.columns.str.lower().str.replace('.', '_')
logger.info("✅ Noms de colonnes nettoyés")
# 3. Gérer les valeurs manquantes
null_counts = df_transformed.isnull().sum()
if null_counts.sum() > 0:
logger.warning(f"⚠️ {null_counts.sum()} valeurs manquantes détectées")
df_transformed = df_transformed.dropna()
logger.info("✅ Valeurs manquantes supprimées")
# 4. Créer des colonnes dérivées
if 'name' in df_transformed.columns:
df_transformed['name_length'] = df_transformed['name'].str.len()
logger.info("✅ Colonne dérivée 'name_length' créée")
# 5. Ajouter metadata
df_transformed['processed_at'] = datetime.now().isoformat()
logger.info(f"✅ Transformation terminée: {len(df_transformed)} lignes")
return df_transformed
# Test transformation
df_transformed = transform_data(df_raw)
print("\n📊 Données transformées :")
print(df_transformed.head())
print(f"\nColonnes: {df_transformed.columns.tolist()}")def validate_data(df):
"""Valide la qualité des données"""
logger.info("🔍 Début de la validation")
validator = DataValidator(df)
# Définir les règles de validation
required_columns = ['id', 'name', 'email']
validator.check_columns(required_columns)
validator.check_nulls(max_null_pct=config.MAX_NULL_PCT)
validator.check_duplicates(subset=['id'])
# Générer le rapport
report = validator.report()
if report['is_valid']:
logger.info("✅ Validation réussie")
else:
logger.error(f"❌ Validation échouée: {report['total_errors']} erreurs")
for error in report['errors']:
logger.error(f" - {error}")
return report
# Test validation
validation_report = validate_data(df_transformed)
print("\n📋 Rapport de validation :")
print(json.dumps(validation_report, indent=2))def load_data(df, base_filename):
"""Exporte les données dans plusieurs formats"""
logger.info("💾 Début de l'export")
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
files_created = []
for format_type in config.EXPORT_FORMATS:
filename = f"{config.OUTPUT_DIR}/{base_filename}_{timestamp}.{format_type}"
try:
if format_type == 'csv':
df.to_csv(filename, index=False)
elif format_type == 'parquet':
df.to_parquet(filename, index=False)
elif format_type == 'json':
df.to_json(filename, orient='records', indent=2)
file_size = Path(filename).stat().st_size / 1024 # KB
logger.info(f"✅ Export {format_type.upper()}: {filename} ({file_size:.2f} KB)")
files_created.append(filename)
except Exception as e:
logger.error(f"❌ Erreur export {format_type}: {e}")
return files_created
# Test export
exported_files = load_data(df_transformed, 'users_processed')
print("\n📦 Fichiers exportés :")
for file in exported_files:
print(f" - {file}")def run_pipeline():
"""Exécute le pipeline complet"""
start_time = time.time()
logger.info("="*50)
logger.info("🚀 DÉMARRAGE DU PIPELINE")
logger.info("="*50)
try:
# EXTRACT
logger.info("\n📥 PHASE 1: EXTRACTION")
data = extract_from_api(f"{config.API_URL}/users")
df = pd.DataFrame(data)
logger.info(f"Lignes extraites: {len(df)}")
# TRANSFORM
logger.info("\n🔄 PHASE 2: TRANSFORMATION")
df_clean = transform_data(df)
logger.info(f"Lignes après transformation: {len(df_clean)}")
# VALIDATE
logger.info("\n🔍 PHASE 3: VALIDATION")
validation = validate_data(df_clean)
if not validation['is_valid']:
logger.error("❌ Validation échouée, arrêt du pipeline")
return False
# LOAD
logger.info("\n💾 PHASE 4: EXPORT")
files = load_data(df_clean, 'users_final')
# STATISTIQUES
duration = time.time() - start_time
logger.info("\n" + "="*50)
logger.info("📊 STATISTIQUES DU PIPELINE")
logger.info("="*50)
logger.info(f"Durée totale: {duration:.2f}s")
logger.info(f"Lignes traitées: {len(df_clean)}")
logger.info(f"Fichiers créés: {len(files)}")
logger.info(f"Taux de réussite: 100%")
logger.info("="*50)
logger.info("✅ PIPELINE TERMINÉ AVEC SUCCÈS")
logger.info("="*50)
return True
except Exception as e:
logger.error(f"❌ ERREUR FATALE: {e}")
logger.exception("Stack trace:")
return False
# Exécuter le pipeline
success = run_pipeline()Objectif : Créer votre propre pipeline ETL
import requests, pandas as pd, logging, time
from pathlib import Path
logging.basicConfig(level=logging.INFO, format='%(message)s')
log = logging.getLogger()
def extract():
log.info("📥 Extract...")
posts = pd.DataFrame(requests.get("https://jsonplaceholder.typicode.com/posts").json())
users = pd.DataFrame(requests.get("https://jsonplaceholder.typicode.com/users").json())
return posts, users
def transform(posts, users):
log.info("🔄 Transform...")
users = users[['id','name','email']].rename(columns={'id':'userId','name':'author'})
df = posts.merge(users, on='userId')
df['words'] = df['body'].str.split().str.len()
return df
def validate(df):
log.info("🔍 Validate...")
return not df['id'].isna().any() and not df['id'].duplicated().any()
def load(df):
log.info("💾 Load...")
Path('output').mkdir(exist_ok=True)
df.to_csv('output/posts.csv', index=False)
df.to_json('output/posts.json', orient='records')
def run():
start = time.time()
log.info("🚀 START")
posts, users = extract()
df = transform(posts, users)
if not validate(df): return log.error("❌ FAILED")
load(df)
log.info(f"✅ DONE in {time.time()-start:.1f}s - {len(df)} rows")
run()Pour aller plus loin dans la professionnalisation de votre code.
# Installer python-dotenv
!pip install python-dotenv
# Créer un fichier .env (à ne JAMAIS commiter)
env_content = """
API_KEY=votre_cle_api_secrete
DATABASE_URL=postgresql://user:password@localhost:5432/db
ENVIRONMENT=development
"""
with open('.env', 'w') as f:
f.write(env_content)
print("✅ Fichier .env créé")
print("⚠️ N'oubliez pas d'ajouter .env à votre .gitignore !")from dotenv import load_dotenv
import os
# Charger les variables d'environnement
load_dotenv()
# Accéder aux variables
api_key = os.getenv('API_KEY')
db_url = os.getenv('DATABASE_URL')
env = os.getenv('ENVIRONMENT')
print(f"🔑 API Key: {api_key[:10]}...")
print(f"🗄️ Database URL: {db_url[:30]}...")
print(f"🌍 Environment: {env}")# Exemple de fonction à tester
def calculer_age_moyen(df, colonne='age'):
"""Calcule l'âge moyen d'un DataFrame"""
if colonne not in df.columns:
raise ValueError(f"Colonne '{colonne}' introuvable")
return df[colonne].mean()
# Tests
def test_calculer_age_moyen():
# Test avec données valides
df_test = pd.DataFrame({'age': [20, 30, 40]})
assert calculer_age_moyen(df_test) == 30, "Test 1 échoué"
print("✅ Test 1: données valides")
# Test avec colonne manquante
try:
calculer_age_moyen(pd.DataFrame({'nom': ['Alice']}), 'age')
print("❌ Test 2 échoué: devrait lever une exception")
except ValueError:
print("✅ Test 2: exception levée correctement")
# Test avec valeurs nulles
df_null = pd.DataFrame({'age': [20, None, 40]})
result = calculer_age_moyen(df_null)
assert result == 30, "Test 3 échoué"
print("✅ Test 3: gestion des nulls")
print("\n🎉 Tous les tests passent !")
# Exécuter les tests
test_calculer_age_moyen()| Section | Compétences acquises |
|---|---|
| Pandas | Manipulation de données, nettoyage, agrégations, merges |
| Matplotlib | Graphiques de base, personnalisation, export |
| Seaborn | Visualisations statistiques, heatmaps, pair plots |
| Texte & Regex | Nettoyage, parsing de logs, expressions régulières |
| APIs | Appels REST, pagination, retry logic |
| Validation | Schémas, checks de qualité |
| Pipeline ETL | Architecture complète Extract-Transform-Load |
| Bonnes pratiques | Logging, configuration, tests |
Maintenant que tu maîtrises le traitement de données, passons aux bases de données !
👉 Module suivant : 06_intro_databases — Introduction aux bases de données
🎉 Félicitations ! Tu as terminé le module Python Data Processing pour Data Engineers.
Comment utiliser Pygwalker