Projet Intégrateur : Video Games Analytics Platform

Du CSV au Dashboard — Ton Premier Pipeline Data Complet


Bienvenue dans ce projet intégrateur ! Tu vas construire une plateforme d’analyse de jeux vidéo de A à Z, en mobilisant toutes les compétences acquises dans les modules précédents.

Approche Pédagogique

Ce projet est structuré en défis. Pour chaque étape :

  1. Lis le défi et les consignes
  2. Réfléchis et essaie de coder toi-même
  3. Consulte les indices si tu bloques
  4. Vérifie ta solution en déroulant les réponses

⚠️ Important : Ne regarde pas les solutions avant d’avoir essayé ! C’est en pratiquant qu’on apprend.


Ce que tu vas construire

┌─────────────────────────────────────────────────────────────────────────────────┐
│                        VIDEO GAMES ANALYTICS PLATFORM                           │
└─────────────────────────────────────────────────────────────────────────────────┘

  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐      ┌──────────────┐
  │  📥 SOURCES  │      │ ⚙️ PROCESSING │      │ 💾 STOCKAGE  │      │📊 EXPOSITION │
  └──────────────┘      └──────────────┘      └──────────────┘      └──────────────┘
         │                     │                     │                     │
         ▼                     ▼                     ▼                     ▼
  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐      ┌──────────────┐
  │    Kaggle    │─────▶│    Pandas    │─────▶│   DuckDB     │─────▶│   FastAPI    │
  │  Video Games │      │  Nettoyage   │      │ SQL Analytics│      │   REST API   │
  └──────────────┘      └──────────────┘      └──────────────┘      └──────────────┘
         │                     │                     │                     │
         ▼                     ▼                     ▼                     ▼
  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐      ┌──────────────┐
  │ Web Scraping │─────▶│   PySpark    │─────▶│Elasticsearch │─────▶│  Streamlit   │
  │   RAWG API   │      │ Agrégations  │      │  Recherche   │      │  Dashboard   │
  └──────────────┘      └──────────────┘      └──────────────┘      └──────────────┘

Compétences Mobilisées

Module Compétence Application dans le projet
M01 Concepts Data Engineering Architecture du pipeline
M02 Bash & Linux Scripts d’automatisation
M03 Git Versioning du projet
M04-05 Python & Pandas Traitement de données
M06-07 SQL & Databases Requêtes analytiques avec DuckDB
M08 Big Data Concepts Pensée distribuée
M10 Elasticsearch Recherche full-text
M11 PySpark Traitement à l’échelle
M12 Orchestration Pipeline automatisé
M13 FastAPI API REST
NEW Web Scraping BeautifulSoup, Requests
NEW Streamlit Dashboard interactif

Le Dataset

Kaggle - Video Game Sales with Ratings

🔗 https://www.kaggle.com/datasets/rush4ratio/video-game-sales-with-ratings

Colonne Description
Name Nom du jeu
Platform Console (PS4, Xbox, PC…)
Year_of_Release Année de sortie
Genre Genre (Action, Sports, RPG…)
Publisher Éditeur
NA_Sales, EU_Sales, JP_Sales, Other_Sales Ventes par région (millions)
Global_Sales Ventes mondiales
Critic_Score Note Metacritic (0-100)
User_Score Note utilisateurs (0-10)
Rating Classification ESRB (E, T, M…)

Phase 0 : Setup du Projet

Avant de coder, il faut préparer l’environnement de travail.

Défi 0.1 : Créer la structure du projet

Consigne

Crée une structure de dossiers pour le projet videogames-analytics avec : - Un dossier data/ avec des sous-dossiers raw/, processed/, enriched/ - Un dossier scripts/ pour les scripts Python - Un dossier api/ pour FastAPI - Un dossier dashboard/ pour Streamlit - Un dossier notebooks/ pour l’exploration - Les fichiers requirements.txt, .gitignore, README.md

Structure attendue :

videogames-analytics/
├── data/
│   ├── raw/           # Données brutes (CSV Kaggle)
│   ├── processed/     # Données nettoyées (Parquet)
│   └── enriched/      # Données enrichies (scraping)
├── scripts/           # Scripts Python du pipeline
├── api/               # API FastAPI
├── dashboard/         # Dashboard Streamlit
├── notebooks/         # Notebooks d'exploration
├── tests/             # Tests unitaires
├── requirements.txt   # Dépendances Python
├── .gitignore         # Fichiers à ignorer
└── README.md          # Documentation

Questions pour réfléchir

  1. Quelle commande Bash permet de créer plusieurs dossiers en une seule ligne ?
  2. Comment créer des sous-dossiers imbriqués qui n’existent pas encore ?
  3. Quels fichiers/dossiers doit-on ignorer dans Git pour un projet Python data ?

Prends le temps de réfléchir et d’essayer avant de regarder les indices ou la solution ⬇️

Voir le code
%%bash
# TON CODE ICI
# Crée la structure du projet videogames-analytics
💡 Cliquer pour voir les indices
  • Utilise mkdir -p pour créer des dossiers imbriqués (l’option -p crée les parents si nécessaire)
  • Tu peux utiliser les accolades {} pour créer plusieurs dossiers : mkdir -p projet/{dossier1,dossier2}
  • Pour le .gitignore, pense à : __pycache__/, .env, data/raw/*, *.pyc, .venv/, *.db
  • Utilise cat << 'EOF' > fichier pour créer un fichier multi-lignes

Cliquer pour voir la solution complète
# Création de la structure en une commande
mkdir -p videogames-analytics/{data/{raw,processed,enriched},scripts,api,dashboard,notebooks,tests}

cd videogames-analytics

# Créer requirements.txt
cat << 'EOF' > requirements.txt
# Data Processing
pandas>=2.0.0
numpy>=1.24.0
pyarrow>=12.0.0

# Web Scraping
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=4.9.0

# Databases
duckdb>=0.9.0
elasticsearch>=8.0.0

# Big Data
pyspark>=3.5.0

# API
fastapi>=0.104.0
uvicorn>=0.24.0

# Dashboard
streamlit>=1.28.0
plotly>=5.18.0

# Utilities
python-dotenv>=1.0.0
tqdm>=4.66.0
EOF

# Créer .gitignore
cat << 'EOF' > .gitignore
# Data (on ne versionne pas les données)
data/raw/*
data/processed/*
data/enriched/*
!data/*/.gitkeep
*.db

# Python
__pycache__/
*.pyc
.venv/
venv/

# Environment
.env
*.log

# IDE
.idea/
.vscode/

# Jupyter
.ipynb_checkpoints/
EOF

# Créer les .gitkeep pour garder les dossiers vides dans Git
touch data/raw/.gitkeep data/processed/.gitkeep data/enriched/.gitkeep

echo "✅ Structure créée !"

Explications : - mkdir -p crée tous les dossiers parents manquants - Les accolades {a,b,c} créent plusieurs dossiers en une commande - cat << 'EOF' > fichier permet d’écrire plusieurs lignes dans un fichier - .gitkeep est une convention pour garder les dossiers vides dans Git

Défi 0.2 : Initialiser Git

Consigne

  1. Initialise un dépôt Git dans le dossier videogames-analytics
  2. Ajoute tous les fichiers
  3. Fais un premier commit avec le message : "🎮 Initial commit: project structure"

Questions pour réfléchir

  1. Quelle commande initialise un nouveau dépôt Git ?
  2. Comment ajouter tous les fichiers d’un coup au staging ?
  3. Quelle est la syntaxe pour créer un commit avec un message ?

Voir le code
%%bash
# TON CODE ICI
# Initialise Git et fais le premier commit
💡 Cliquer pour voir les indices

Les 3 commandes Git essentielles : - git init : initialise un nouveau dépôt - git add . : ajoute tous les fichiers au staging - git commit -m "message" : crée un commit


Cliquer pour voir la solution
cd videogames-analytics

# Initialiser le dépôt Git
git init

# Ajouter tous les fichiers au staging
git add .

# Créer le premier commit
git commit -m "🎮 Initial commit: project structure"

# Vérifier l'historique
git log --oneline

Résultat attendu :

abc1234 🎮 Initial commit: project structure

Phase 1 : Ingestion des Données

Téléchargement du Dataset

Télécharge le dataset depuis Kaggle : 1. Va sur https://www.kaggle.com/datasets/rush4ratio/video-game-sales-with-ratings 2. Télécharge le ZIP 3. Extrait le CSV dans data/raw/

💡 Si tu n’as pas de compte Kaggle, utilise le code ci-dessous pour générer des données d’exemple.

Voir le code
# Génération de données d'exemple (exécute cette cellule si tu n'as pas Kaggle)
import pandas as pd
import numpy as np
from pathlib import Path

PROJECT_ROOT = Path("videogames-analytics")
RAW_DIR = PROJECT_ROOT / "data" / "raw"
RAW_DIR.mkdir(parents=True, exist_ok=True)

np.random.seed(42)
n_games = 5000

platforms = ['PS4', 'XOne', 'PC', 'WiiU', 'PS3', 'X360', 'Wii', 'PSV', '3DS', 'PS2']
genres = ['Action', 'Sports', 'Shooter', 'Role-Playing', 'Racing', 'Platform', 
          'Fighting', 'Simulation', 'Adventure', 'Strategy', 'Puzzle', 'Misc']
publishers = ['Electronic Arts', 'Activision', 'Ubisoft', 'Nintendo', 'Sony', 
              'Take-Two', 'Sega', 'Capcom', 'Konami', 'Bandai Namco', 'Square Enix']
ratings = ['E', 'E10+', 'T', 'M', 'RP', None]

game_prefixes = ['Super', 'Ultimate', 'Call of', 'Legend of', 'Final', 'Grand', 'Dark']
game_suffixes = ['Warriors', 'Quest', 'Adventure', 'Legends', 'Chronicles', 'Heroes']
game_names = [f"{np.random.choice(game_prefixes)} {np.random.choice(game_suffixes)} {i}" 
              for i in range(n_games)]

games_df = pd.DataFrame({
    'Name': game_names,
    'Platform': np.random.choice(platforms, n_games),
    'Year_of_Release': np.random.choice(range(2000, 2024), n_games),
    'Genre': np.random.choice(genres, n_games),
    'Publisher': np.random.choice(publishers, n_games),
    'NA_Sales': np.round(np.random.exponential(0.5, n_games), 2),
    'EU_Sales': np.round(np.random.exponential(0.3, n_games), 2),
    'JP_Sales': np.round(np.random.exponential(0.2, n_games), 2),
    'Other_Sales': np.round(np.random.exponential(0.1, n_games), 2),
    'Critic_Score': np.where(np.random.random(n_games) > 0.2, 
                             np.random.randint(40, 100, n_games), np.nan),
    'User_Score': np.where(np.random.random(n_games) > 0.3,
                           np.round(np.random.uniform(3, 10, n_games), 1), np.nan),
    'Rating': np.random.choice(ratings, n_games)
})

games_df['Global_Sales'] = (games_df['NA_Sales'] + games_df['EU_Sales'] + 
                            games_df['JP_Sales'] + games_df['Other_Sales']).round(2)

games_df.to_csv(RAW_DIR / 'Video_Games_Sales.csv', index=False)

print(f"✅ Dataset créé : {len(games_df):,} jeux")
print(f"📁 Fichier : {RAW_DIR / 'Video_Games_Sales.csv'}")
games_df.head()

Défi 1.1 : Explorer les données

Consigne

Charge le CSV et réponds à ces questions :

  1. Combien de jeux contient le dataset ?
  2. Quelles colonnes ont des valeurs manquantes ? Quel pourcentage ?
  3. Quel est le jeu le plus vendu ?
  4. Quels sont les 5 genres les plus représentés ?
  5. Quelle est la plage d’années couverte par le dataset ?

Questions pour réfléchir

  • Quelle méthode Pandas donne les dimensions d’un DataFrame ?
  • Comment compter les valeurs manquantes par colonne ?
  • Comment trouver la ligne avec la valeur maximale d’une colonne ?
  • Comment compter les occurrences de chaque valeur d’une colonne ?

Voir le code
# TON CODE ICI
# Explore les données et réponds aux 5 questions

import pandas as pd

# Charge le CSV
df = pd.read_csv('videogames-analytics/data/raw/Video_Games_Sales.csv')

# Question 1 : Combien de jeux ?


# Question 2 : Valeurs manquantes ?


# Question 3 : Jeu le plus vendu ?


# Question 4 : Top 5 genres ?


# Question 5 : Plage d'années ?
💡 Cliquer pour voir les indices
  • Nombre de lignes : len(df) ou df.shape[0]
  • Valeurs manquantes : df.isnull().sum()
  • Pourcentage : df.isnull().sum() / len(df) * 100
  • Ligne avec max : df.loc[df['colonne'].idxmax()]
  • Compter par catégorie : df['colonne'].value_counts()
  • Min/Max : df['colonne'].min(), df['colonne'].max()

Cliquer pour voir la solution
import pandas as pd

# Charger les données
df = pd.read_csv('videogames-analytics/data/raw/Video_Games_Sales.csv')

# 1. Nombre de jeux
print(f"📊 Nombre de jeux : {len(df):,}")
print(f"   (ou avec shape : {df.shape[0]:,} lignes, {df.shape[1]} colonnes)")

# 2. Valeurs manquantes
print("\n❓ Valeurs manquantes :")
missing = df.isnull().sum()
missing_pct = (missing / len(df) * 100).round(1)
missing_df = pd.DataFrame({'Manquantes': missing, '%': missing_pct})
print(missing_df[missing_df['Manquantes'] > 0])

# 3. Jeu le plus vendu
top_game = df.loc[df['Global_Sales'].idxmax()]
print(f"\n🏆 Jeu le plus vendu : {top_game['Name']}")
print(f"   Ventes : {top_game['Global_Sales']}M$ | Plateforme : {top_game['Platform']}")

# 4. Top 5 genres
print("\n🎯 Top 5 genres :")
print(df['Genre'].value_counts().head())

# 5. Plage d'années
min_year = df['Year_of_Release'].min()
max_year = df['Year_of_Release'].max()
print(f"\n📅 Années : {min_year:.0f} - {max_year:.0f}")

Défi 1.2 : Nettoyer les données

Consigne

Crée une fonction clean_videogames_data(input_path, output_path) qui :

  1. Supprime les doublons sur les colonnes (Name, Platform, Year_of_Release)
  2. Convertit Year_of_Release en entier (en gérant les NaN)
  3. Remplit les ventes manquantes par 0
  4. Crée une colonne Decade : la décennie (ex: 2010 pour l’année 2015)
  5. Crée une colonne Sales_Category basée sur Global_Sales :
    • < 0.1 → “Flop”
    • 0.1 - 1 → “Niche”
    • 1 - 5 → “Hit”
    • > 5 → “Blockbuster”
  6. Sauvegarde le résultat en Parquet dans data/processed/

Questions pour réfléchir

  • Comment supprimer les doublons sur certaines colonnes seulement ?
  • Comment créer une colonne basée sur des conditions multiples (bins) ?
  • Pourquoi choisir Parquet plutôt que CSV pour les données nettoyées ?

Voir le code
# TON CODE ICI
# Crée la fonction de nettoyage

import pandas as pd
import numpy as np
from pathlib import Path

def clean_videogames_data(input_path: Path, output_path: Path) -> pd.DataFrame:
    """
    Nettoie le dataset de jeux vidéo.
    
    Args:
        input_path: Chemin vers le CSV brut
        output_path: Chemin pour sauvegarder le Parquet nettoyé
    
    Returns:
        DataFrame nettoyé
    """
    # TON CODE ICI
    pass


# Test ta fonction
# PROJECT_ROOT = Path('videogames-analytics')
# cleaned_df = clean_videogames_data(
#     input_path=PROJECT_ROOT / 'data' / 'raw' / 'Video_Games_Sales.csv',
#     output_path=PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'
# )
💡 Cliquer pour voir les indices
  • Doublons sur colonnes spécifiques : df.drop_duplicates(subset=['col1', 'col2'])
  • Conversion avec NaN : pd.to_numeric(df['col'], errors='coerce') puis .astype('Int64') (nullable integer)
  • Décennie : df['Year'] // 10 * 10 (division entière puis multiplication)
  • Catégories avec bins : pd.cut(df['col'], bins=[...], labels=[...])
  • Parquet : df.to_parquet('fichier.parquet', index=False)

Cliquer pour voir la solution
import pandas as pd
import numpy as np
from pathlib import Path

def clean_videogames_data(input_path: Path, output_path: Path) -> pd.DataFrame:
    """
    Nettoie le dataset de jeux vidéo.
    """
    print("🧹 Nettoyage des données...")
    
    # Charger
    df = pd.read_csv(input_path)
    initial_count = len(df)
    print(f"   Lignes initiales : {initial_count:,}")
    
    # 1. Supprimer les doublons
    df = df.drop_duplicates(subset=['Name', 'Platform', 'Year_of_Release'])
    print(f"   Après déduplication : {len(df):,} (-{initial_count - len(df)})")
    
    # 2. Convertir Year_of_Release en entier nullable
    df['Year_of_Release'] = pd.to_numeric(df['Year_of_Release'], errors='coerce')
    df['Year_of_Release'] = df['Year_of_Release'].astype('Int64')
    
    # 3. Remplir les ventes manquantes par 0
    sales_cols = ['NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales', 'Global_Sales']
    df[sales_cols] = df[sales_cols].fillna(0)
    
    # 4. Créer la décennie
    df['Decade'] = (df['Year_of_Release'] // 10 * 10).astype('Int64')
    
    # 5. Créer la catégorie de ventes
    df['Sales_Category'] = pd.cut(
        df['Global_Sales'],
        bins=[-np.inf, 0.1, 1, 5, np.inf],
        labels=['Flop', 'Niche', 'Hit', 'Blockbuster']
    )
    
    # 6. Sauvegarder en Parquet
    output_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)
    
    print(f"   ✅ Sauvegardé : {output_path}")
    print(f"   Lignes finales : {len(df):,}")
    print(f"   Nouvelles colonnes : Decade, Sales_Category")
    
    return df

# Exécution
PROJECT_ROOT = Path('videogames-analytics')
cleaned_df = clean_videogames_data(
    input_path=PROJECT_ROOT / 'data' / 'raw' / 'Video_Games_Sales.csv',
    output_path=PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'
)

# Vérification
print("\n📊 Aperçu :")
print(cleaned_df[['Name', 'Year_of_Release', 'Decade', 'Global_Sales', 'Sales_Category']].head())

Pourquoi Parquet ? - Compression : fichier 5-10x plus petit que CSV - Types préservés : pas de perte des types (dates, entiers, catégories) - Lecture rapide : format colonne optimisé pour l’analytique


Phase 2 : Web Scraping

Le Web Scraping consiste à extraire des données depuis des pages web automatiquement.

┌─────────────────────────────────────────────────────────────────┐
│                      WEB SCRAPING PIPELINE                      │
└─────────────────────────────────────────────────────────────────┘

   ┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
   │   URL    │────▶│ requests │────▶│   HTML   │────▶│   BS4    │────▶ DataFrame
   │          │     │  GET     │     │  brut    │     │  parse   │
   └──────────┘     └──────────┘     └──────────┘     └──────────┘

Outils Python

Outil Usage
requests Envoyer des requêtes HTTP et récupérer le HTML
BeautifulSoup Parser le HTML et extraire les éléments
lxml Parser HTML/XML (plus rapide)

Règles d’Éthique du Scraping

  1. Respecter le robots.txt — vérifie ce que le site autorise
  2. Ajouter des délaistime.sleep() entre les requêtes
  3. S’identifier — utilise un User-Agent descriptif
  4. Ne pas surcharger — limite le nombre de requêtes
  5. Vérifier les CGU — certains sites interdisent le scraping

Défi 2.1 : Scraper Wikipedia

Consigne

Écris une fonction scrape_bestselling_games() qui :

  1. Récupère la page : https://en.wikipedia.org/wiki/List_of_best-selling_video_games
  2. Trouve le premier tableau avec la classe wikitable
  3. Extrait les 10 premiers jeux avec : Nom, Ventes, Plateforme(s)
  4. Retourne un DataFrame pandas

Questions pour réfléchir

  • Comment envoyer une requête HTTP GET en Python ?
  • Pourquoi faut-il spécifier un User-Agent ?
  • Comment trouver un élément HTML par sa classe avec BeautifulSoup ?
  • Comment extraire le texte d’une balise HTML ?

Voir le code
# TON CODE ICI
# Crée la fonction de scraping Wikipedia

import requests
from bs4 import BeautifulSoup
import pandas as pd

def scrape_bestselling_games() -> pd.DataFrame:
    """
    Scrape la liste des jeux les plus vendus depuis Wikipedia.
    
    Returns:
        DataFrame avec colonnes: name, sales, platform
    """
    # TON CODE ICI
    pass


# Test ta fonction
# df_wiki = scrape_bestselling_games()
# print(df_wiki)
💡 Cliquer pour voir les indices
import requests
from bs4 import BeautifulSoup

# 1. Requête HTTP avec User-Agent
headers = {'User-Agent': 'MonBot/1.0 (contact@example.com)'}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()  # Lève une exception si erreur HTTP

# 2. Parser le HTML
soup = BeautifulSoup(response.text, 'lxml')

# 3. Trouver un élément par classe
table = soup.find('table', {'class': 'wikitable'})

# 4. Trouver toutes les lignes
rows = table.find_all('tr')

# 5. Extraire le texte d'une cellule
cell.get_text(strip=True)  # strip=True enlève les espaces

Cliquer pour voir la solution
import requests
from bs4 import BeautifulSoup
import pandas as pd

def scrape_bestselling_games() -> pd.DataFrame:
    """
    Scrape la liste des jeux les plus vendus depuis Wikipedia.
    """
    url = "https://en.wikipedia.org/wiki/List_of_best-selling_video_games"
    
    # Headers pour s'identifier (bonne pratique)
    headers = {
        'User-Agent': 'Mozilla/5.0 (Educational Bot - Data Engineering Bootcamp)'
    }
    
    # Requête HTTP
    print(f"🌐 Récupération de {url}...")
    response = requests.get(url, headers=headers, timeout=10)
    response.raise_for_status()  # Erreur si status != 200
    
    # Parser le HTML
    soup = BeautifulSoup(response.text, 'lxml')
    
    # Trouver le premier tableau wikitable
    table = soup.find('table', {'class': 'wikitable'})
    
    if not table:
        raise ValueError("❌ Tableau non trouvé sur la page")
    
    # Extraire les données
    games = []
    rows = table.find_all('tr')[1:11]  # Skip header, prendre 10 lignes
    
    for row in rows:
        cells = row.find_all(['td', 'th'])
        if len(cells) >= 3:
            games.append({
                'name': cells[0].get_text(strip=True),
                'sales': cells[1].get_text(strip=True),
                'platform': cells[2].get_text(strip=True) if len(cells) > 2 else 'N/A'
            })
    
    print(f"✅ {len(games)} jeux extraits")
    return pd.DataFrame(games)


# Test
try:
    df_wiki = scrape_bestselling_games()
    print("\n🎮 Top 10 jeux les plus vendus (Wikipedia) :")
    print(df_wiki.to_string(index=False))
except Exception as e:
    print(f"❌ Erreur : {e}")

Explications : - raise_for_status() lève une exception si le serveur retourne une erreur (404, 500, etc.) - find_all('tr')[1:11] : on ignore la première ligne (header) et on prend les 10 suivantes - get_text(strip=True) extrait le texte en enlevant les espaces superflus


Phase 3 : Stockage des Données

On va stocker nos données dans deux systèmes complémentaires :

┌─────────────────────────────────────────────────────────────────┐
│                     ARCHITECTURE STOCKAGE                       │
└─────────────────────────────────────────────────────────────────┘

                        ┌─────────────────┐
                        │  games_cleaned  │
                        │    .parquet     │
                        └────────┬────────┘
                                 │
                    ┌────────────┴────────────┐
                    │                         │
                    ▼                         ▼
          ┌─────────────────┐       ┌─────────────────┐
          │     DuckDB      │       │  Elasticsearch  │
          │                 │       │                 │
          │  • SQL queries  │       │  • Full-text    │
          │  • Analytics    │       │  • Fuzzy search │
          │  • Aggregations │       │  • Suggestions  │
          └─────────────────┘       └─────────────────┘
Système Usage Avantage
DuckDB Requêtes SQL analytiques Ultra-rapide, zero config
Elasticsearch Recherche full-text Recherche fuzzy, suggestions

Défi 3.1 : Charger dans DuckDB

Consigne

  1. Crée une base DuckDB data/videogames.db
  2. Charge le fichier Parquet nettoyé dans une table games
  3. Écris et exécute les requêtes SQL suivantes :
    • Top 10 jeux les plus vendus
    • Ventes totales par genre (triées par ventes décroissantes)
    • Top 3 jeux par genre (utilise une Window Function)

Questions pour réfléchir

  • Comment DuckDB peut lire directement un fichier Parquet sans le charger en mémoire ?
  • Quelle est la différence entre GROUP BY et PARTITION BY ?
  • Quelle fonction SQL permet de numéroter les lignes dans chaque groupe ?

Voir le code
# TON CODE ICI
# Charge les données dans DuckDB et exécute les requêtes

import duckdb
from pathlib import Path

PROJECT_ROOT = Path('videogames-analytics')
DB_PATH = PROJECT_ROOT / 'data' / 'videogames.db'

# 1. Connexion et création de la table


# 2. Top 10 jeux les plus vendus


# 3. Ventes totales par genre


# 4. Top 3 jeux par genre (Window Function)
💡 Cliquer pour voir les indices
# Connexion DuckDB
conn = duckdb.connect(str(DB_PATH))

# Créer une table depuis un Parquet
conn.execute("""
    CREATE OR REPLACE TABLE games AS 
    SELECT * FROM read_parquet('chemin/fichier.parquet')
""")

# Window Function pour classement par groupe
ROW_NUMBER() OVER (PARTITION BY genre ORDER BY sales DESC) as rank

# Puis filtrer avec WHERE rank <= 3

Cliquer pour voir la solution
import duckdb
from pathlib import Path

PROJECT_ROOT = Path('videogames-analytics')
DB_PATH = PROJECT_ROOT / 'data' / 'videogames.db'
PARQUET_PATH = PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'

# Connexion
conn = duckdb.connect(str(DB_PATH))

# 1. Créer la table depuis le Parquet
conn.execute(f"""
    CREATE OR REPLACE TABLE games AS 
    SELECT * FROM read_parquet('{PARQUET_PATH}')
""")
print(f"✅ Table 'games' créée avec {conn.execute('SELECT COUNT(*) FROM games').fetchone()[0]:,} lignes")

# 2. Top 10 jeux les plus vendus
print("\n🏆 Top 10 jeux les plus vendus :")
print(conn.execute("""
    SELECT Name, Platform, Genre, Global_Sales
    FROM games
    ORDER BY Global_Sales DESC
    LIMIT 10
""").fetchdf())

# 3. Ventes totales par genre
print("\n📊 Ventes par genre :")
print(conn.execute("""
    SELECT 
        Genre,
        COUNT(*) as nb_games,
        ROUND(SUM(Global_Sales), 2) as total_sales,
        ROUND(AVG(Global_Sales), 2) as avg_sales
    FROM games
    GROUP BY Genre
    ORDER BY total_sales DESC
""").fetchdf())

# 4. Top 3 jeux par genre (Window Function)
print("\n🎯 Top 3 par genre :")
print(conn.execute("""
    WITH ranked AS (
        SELECT 
            Genre,
            Name,
            Global_Sales,
            ROW_NUMBER() OVER (
                PARTITION BY Genre 
                ORDER BY Global_Sales DESC
            ) as rank
        FROM games
    )
    SELECT Genre, rank, Name, Global_Sales
    FROM ranked
    WHERE rank <= 3
    ORDER BY Genre, rank
""").fetchdf().head(20))

conn.close()
print(f"\n✅ Base sauvegardée : {DB_PATH}")

Explication Window Function : - PARTITION BY Genre : crée des “fenêtres” par genre - ORDER BY Global_Sales DESC : ordonne dans chaque fenêtre - ROW_NUMBER() : numérote de 1 à N dans chaque fenêtre - On filtre ensuite WHERE rank <= 3 pour garder le top 3

Défi 3.2 : Indexer dans Elasticsearch

Prérequis

Lance Elasticsearch en local (comme vu dans le M10) :

cd elasticsearch-8.x.x
./bin/elasticsearch

Consigne

  1. Crée une fonction index_games_to_es(df, index_name) qui :
    • Se connecte à Elasticsearch local (http://localhost:9200)
    • Supprime l’index s’il existe déjà
    • Crée un index videogames avec un mapping approprié
    • Indexe tous les jeux en utilisant le bulk API
  2. Crée une fonction search_games(query) qui :
    • Fait une recherche fuzzy sur le champ Name
    • Retourne les 10 meilleurs résultats

Questions pour réfléchir

  • Pourquoi utiliser le bulk API plutôt que des insertions une par une ?
  • Quelle est la différence entre les types text et keyword dans ES ?
  • Comment fonctionne la recherche fuzzy ?

Voir le code
# TON CODE ICI
# Indexe les jeux dans Elasticsearch

from elasticsearch import Elasticsearch, helpers
import pandas as pd
from typing import List, Dict

def index_games_to_es(df: pd.DataFrame, index_name: str = "videogames") -> int:
    """
    Indexe les jeux dans Elasticsearch.
    
    Args:
        df: DataFrame des jeux
        index_name: Nom de l'index ES
    
    Returns:
        Nombre de documents indexés
    """
    # TON CODE ICI
    pass


def search_games(query: str, index_name: str = "videogames") -> List[Dict]:
    """
    Recherche des jeux par nom (fuzzy search).
    
    Args:
        query: Terme de recherche
        index_name: Nom de l'index ES
    
    Returns:
        Liste des résultats
    """
    # TON CODE ICI
    pass
💡 Cliquer pour voir les indices
from elasticsearch import Elasticsearch, helpers

# Connexion
es = Elasticsearch("http://localhost:9200")
es.ping()  # Vérifie la connexion

# Supprimer un index
es.indices.delete(index="videogames", ignore=[404])

# Créer avec mapping
es.indices.create(index="videogames", body={"mappings": {...}})

# Bulk indexation
actions = [{"_index": "videogames", "_source": doc} for doc in docs]
helpers.bulk(es, actions)

# Recherche fuzzy
es.search(index="videogames", query={
    "match": {"Name": {"query": "...", "fuzziness": "AUTO"}}
})

Cliquer pour voir la solution
from elasticsearch import Elasticsearch, helpers
import pandas as pd
from typing import List, Dict

def index_games_to_es(df: pd.DataFrame, index_name: str = "videogames") -> int:
    """
    Indexe les jeux dans Elasticsearch.
    """
    # Connexion
    es = Elasticsearch("http://localhost:9200")
    
    if not es.ping():
        raise ConnectionError("❌ Elasticsearch non disponible sur localhost:9200")
    
    print(f"✅ Connecté à Elasticsearch")
    
    # Supprimer l'index s'il existe
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"   Index '{index_name}' supprimé")
    
    # Créer l'index avec mapping
    mapping = {
        "mappings": {
            "properties": {
                "Name": {"type": "text", "analyzer": "standard"},
                "Platform": {"type": "keyword"},
                "Genre": {"type": "keyword"},
                "Publisher": {"type": "keyword"},
                "Year_of_Release": {"type": "integer"},
                "Global_Sales": {"type": "float"},
                "Critic_Score": {"type": "float"},
                "Sales_Category": {"type": "keyword"}
            }
        }
    }
    es.indices.create(index=index_name, body=mapping)
    print(f"   Index '{index_name}' créé")
    
    # Préparer les documents (remplacer NaN par None)
    records = df.where(pd.notnull(df), None).to_dict('records')
    
    # Bulk indexation
    actions = [
        {"_index": index_name, "_source": record}
        for record in records
    ]
    
    success, errors = helpers.bulk(es, actions, raise_on_error=False)
    
    print(f"✅ {success} documents indexés")
    if errors:
        print(f"⚠️ {len(errors)} erreurs")
    
    return success


def search_games(query: str, index_name: str = "videogames") -> List[Dict]:
    """
    Recherche des jeux par nom (fuzzy search).
    """
    es = Elasticsearch("http://localhost:9200")
    
    response = es.search(
        index=index_name,
        query={
            "match": {
                "Name": {
                    "query": query,
                    "fuzziness": "AUTO"  # Tolère les fautes de frappe
                }
            }
        },
        size=10
    )
    
    results = []
    for hit in response['hits']['hits']:
        results.append({
            'score': hit['_score'],
            **hit['_source']
        })
    
    return results


# Test
try:
    # Charger et indexer
    df = pd.read_parquet('videogames-analytics/data/processed/games_cleaned.parquet')
    index_games_to_es(df)
    
    # Rechercher
    print("\n🔍 Recherche 'Final Fantasi' (avec faute) :")
    results = search_games("Final Fantasi")
    for r in results[:5]:
        print(f"   {r['Name']} ({r['Platform']}) - Score: {r['score']:.2f}")
        
except Exception as e:
    print(f"⚠️ Erreur : {e}")
    print("   Assure-toi qu'Elasticsearch est lancé sur localhost:9200")

Explications : - text : le champ est analysé (tokenisé, stemming) → pour la recherche full-text - keyword : valeur exacte → pour les filtres et agrégations - fuzziness: AUTO : tolère 1-2 caractères d’erreur selon la longueur du mot


⚡ Phase 4 : Traitement PySpark

Utilisons PySpark pour des analyses à grande échelle.

Défi 4.1 : Analyses avec Window Functions

Consigne

Utilise PySpark pour :

  1. Charger le fichier Parquet
  2. Calculer les statistiques par Publisher :
    • Nombre de jeux
    • Ventes totales
    • Score critique moyen
    • Nombre de genres différents
  3. Utiliser ROW_NUMBER() pour classer les jeux par ventes dans chaque genre
  4. Sauvegarder les résultats en Parquet

Questions pour réfléchir

  • Quelle est la différence entre groupBy().agg() et les Window Functions ?
  • Comment créer une fenêtre partitionnée en PySpark ?
  • Quelle fonction Spark donne le rang dans une fenêtre ?

Voir le code
# TON CODE ICI
# Analyses PySpark avec Window Functions

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Créer la session Spark
spark = SparkSession.builder \
    .appName("VideoGamesAnalytics") \
    .getOrCreate()

# 1. Charger les données


# 2. Stats par Publisher


# 3. Classement par genre avec Window


# 4. Sauvegarder
💡 Cliquer pour voir les indices
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Agrégation par groupe
df.groupBy("Publisher").agg(
    F.count("*").alias("nb_games"),
    F.sum("Global_Sales").alias("total_sales")
)

# Window Function
window = Window.partitionBy("Genre").orderBy(F.desc("Global_Sales"))
df.withColumn("rank", F.row_number().over(window))

Cliquer pour voir la solution
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pathlib import Path

PROJECT_ROOT = Path('videogames-analytics')

# Créer la session Spark
spark = SparkSession.builder \
    .appName("VideoGamesAnalytics") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print(f"✅ Spark {spark.version} initialisé")

# 1. Charger les données
games_sdf = spark.read.parquet(str(PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'))
print(f"📊 {games_sdf.count():,} jeux chargés")

# 2. Stats par Publisher
print("\n🏢 Statistiques par Publisher :")
publisher_stats = games_sdf.groupBy("Publisher").agg(
    F.count("*").alias("nb_games"),
    F.round(F.sum("Global_Sales"), 2).alias("total_sales"),
    F.round(F.avg("Critic_Score"), 1).alias("avg_critic"),
    F.countDistinct("Genre").alias("nb_genres")
).orderBy(F.desc("total_sales"))

publisher_stats.show(10)

# 3. Classement par genre avec Window
print("\n🏆 Top 3 jeux par genre :")
window_genre = Window.partitionBy("Genre").orderBy(F.desc("Global_Sales"))

ranked_games = games_sdf.withColumn(
    "rank_in_genre", F.row_number().over(window_genre)
).filter(
    F.col("rank_in_genre") <= 3
).select(
    "Genre", "rank_in_genre", "Name", "Platform", "Global_Sales"
).orderBy("Genre", "rank_in_genre")

ranked_games.show(20)

# 4. Sauvegarder
output_path = PROJECT_ROOT / 'data' / 'processed' / 'publisher_stats'
publisher_stats.write.mode("overwrite").parquet(str(output_path))
print(f"✅ Résultats sauvegardés : {output_path}")

spark.stop()

Phase 5 : API REST avec FastAPI

Défi 5.1 : Créer les endpoints

Consigne

Crée api/main.py avec les endpoints suivants :

Endpoint Méthode Description
/ GET Message de bienvenue
/games GET Liste avec filtres (genre, platform, min_sales)
/games/{name} GET Détails d’un jeu
/stats/genres GET Stats par genre
/stats/publishers GET Top publishers
/search?q=... GET Recherche Elasticsearch

Lancer l’API : uvicorn api.main:app --reload


Phase 6 : Dashboard Streamlit

Défi 6.1 : Dashboard interactif

Consigne

Crée dashboard/app.py avec :

  • Sidebar : filtres (genre, plateforme, années)
  • KPIs : nombre de jeux, ventes totales, score moyen
  • Graphiques : bar chart par genre, line chart évolution, pie chart régions
  • Tableau : top 10 jeux

Lancer le dashboard : streamlit run dashboard/app.py


Phase 7 : Automatisation

Défi 7.1 : Script de pipeline

Consigne

Crée scripts/run_pipeline.sh qui :

  1. Affiche la date/heure de démarrage
  2. Installe les dépendances
  3. Exécute chaque étape du pipeline
  4. Log tout dans pipeline.log
  5. Affiche un résumé à la fin

💡 Note : Les solutions complètes pour les phases 4-7 suivent le même format que les phases précédentes. Essaie d’abord par toi-même !


🎉 Félicitations !

Tu as terminé le Projet Intégrateur Débutant !

✅ Ce que tu as construit

┌─────────────────────────────────────────────────────────────────────────────┐
│                    VIDEO GAMES ANALYTICS - COMPÉTENCES                      │
└─────────────────────────────────────────────────────────────────────────────┘

                              ┌─────────────────┐
                              │  Video Games    │
                              │   Analytics     │
                              └────────┬────────┘
                                       │
          ┌────────────────────────────┼────────────────────────────┐
          │                            │                            │
          ▼                            ▼                            ▼
   ┌─────────────┐              ┌─────────────┐              ┌─────────────┐
   │  INGESTION  │              │  PROCESSING │              │   STORAGE   │
   ├─────────────┤              ├─────────────┤              ├─────────────┤
   │ • Kaggle    │              │ • Pandas    │              │ • DuckDB    │
   │ • Scraping  │              │ • PySpark   │              │ • ES        │
   │ • BS4       │              │ • Window    │              │ • Parquet   │
   └─────────────┘              └─────────────┘              └─────────────┘
          │                            │                            │
          └────────────────────────────┼────────────────────────────┘
                                       │
          ┌────────────────────────────┼────────────────────────────┐
          │                            │                            │
          ▼                            ▼                            ▼
   ┌─────────────┐              ┌─────────────┐              ┌─────────────┐
   │   SERVING   │              │   DEVOPS    │              │  DASHBOARD  │
   ├─────────────┤              ├─────────────┤              ├─────────────┤
   │ • FastAPI   │              │ • Git       │              │ • Streamlit │
   │ • REST API  │              │ • Bash      │              │ • Plotly    │
   │ • Pydantic  │              │ • Automation│              │ • Filters   │
   └─────────────┘              └─────────────┘              └─────────────┘

📚 Compétences Validées

Domaine Compétences
Ingestion CSV, Web Scraping, APIs
Processing Pandas, PySpark, SQL
Storage DuckDB, Elasticsearch, Parquet
Serving FastAPI, Streamlit
DevOps Git, Bash, Automation

🚀 Prochaine étape

👉 Niveau Intermédiaire : Docker, Kubernetes, Kafka, Delta Lake, dbt, Airflow…


🎮 Video Games Analytics Platform — Data Engineering Bootcamp

Retour au sommet