Voir le code
%%bash
# TON CODE ICI
# Crée la structure du projet videogames-analytics
Bienvenue dans ce projet intégrateur ! Tu vas construire une plateforme d’analyse de jeux vidéo de A à Z, en mobilisant toutes les compétences acquises dans les modules précédents.
Ce projet est structuré en défis. Pour chaque étape :
⚠️ Important : Ne regarde pas les solutions avant d’avoir essayé ! C’est en pratiquant qu’on apprend.
┌─────────────────────────────────────────────────────────────────────────────────┐
│ VIDEO GAMES ANALYTICS PLATFORM │
└─────────────────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 📥 SOURCES │ │ ⚙️ PROCESSING │ │ 💾 STOCKAGE │ │📊 EXPOSITION │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Kaggle │─────▶│ Pandas │─────▶│ DuckDB │─────▶│ FastAPI │
│ Video Games │ │ Nettoyage │ │ SQL Analytics│ │ REST API │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Web Scraping │─────▶│ PySpark │─────▶│Elasticsearch │─────▶│ Streamlit │
│ RAWG API │ │ Agrégations │ │ Recherche │ │ Dashboard │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
| Module | Compétence | Application dans le projet |
|---|---|---|
| M01 | Concepts Data Engineering | Architecture du pipeline |
| M02 | Bash & Linux | Scripts d’automatisation |
| M03 | Git | Versioning du projet |
| M04-05 | Python & Pandas | Traitement de données |
| M06-07 | SQL & Databases | Requêtes analytiques avec DuckDB |
| M08 | Big Data Concepts | Pensée distribuée |
| M10 | Elasticsearch | Recherche full-text |
| M11 | PySpark | Traitement à l’échelle |
| M12 | Orchestration | Pipeline automatisé |
| M13 | FastAPI | API REST |
| NEW | Web Scraping | BeautifulSoup, Requests |
| NEW | Streamlit | Dashboard interactif |
Kaggle - Video Game Sales with Ratings
🔗 https://www.kaggle.com/datasets/rush4ratio/video-game-sales-with-ratings
| Colonne | Description |
|---|---|
Name |
Nom du jeu |
Platform |
Console (PS4, Xbox, PC…) |
Year_of_Release |
Année de sortie |
Genre |
Genre (Action, Sports, RPG…) |
Publisher |
Éditeur |
NA_Sales, EU_Sales, JP_Sales, Other_Sales |
Ventes par région (millions) |
Global_Sales |
Ventes mondiales |
Critic_Score |
Note Metacritic (0-100) |
User_Score |
Note utilisateurs (0-10) |
Rating |
Classification ESRB (E, T, M…) |
Avant de coder, il faut préparer l’environnement de travail.
Crée une structure de dossiers pour le projet videogames-analytics avec : - Un dossier data/ avec des sous-dossiers raw/, processed/, enriched/ - Un dossier scripts/ pour les scripts Python - Un dossier api/ pour FastAPI - Un dossier dashboard/ pour Streamlit - Un dossier notebooks/ pour l’exploration - Les fichiers requirements.txt, .gitignore, README.md
Structure attendue :
videogames-analytics/
├── data/
│ ├── raw/ # Données brutes (CSV Kaggle)
│ ├── processed/ # Données nettoyées (Parquet)
│ └── enriched/ # Données enrichies (scraping)
├── scripts/ # Scripts Python du pipeline
├── api/ # API FastAPI
├── dashboard/ # Dashboard Streamlit
├── notebooks/ # Notebooks d'exploration
├── tests/ # Tests unitaires
├── requirements.txt # Dépendances Python
├── .gitignore # Fichiers à ignorer
└── README.md # Documentation
Prends le temps de réfléchir et d’essayer avant de regarder les indices ou la solution ⬇️
mkdir -p pour créer des dossiers imbriqués (l’option -p crée les parents si nécessaire){} pour créer plusieurs dossiers : mkdir -p projet/{dossier1,dossier2}.gitignore, pense à : __pycache__/, .env, data/raw/*, *.pyc, .venv/, *.dbcat << 'EOF' > fichier pour créer un fichier multi-lignes# Création de la structure en une commande
mkdir -p videogames-analytics/{data/{raw,processed,enriched},scripts,api,dashboard,notebooks,tests}
cd videogames-analytics
# Créer requirements.txt
cat << 'EOF' > requirements.txt
# Data Processing
pandas>=2.0.0
numpy>=1.24.0
pyarrow>=12.0.0
# Web Scraping
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=4.9.0
# Databases
duckdb>=0.9.0
elasticsearch>=8.0.0
# Big Data
pyspark>=3.5.0
# API
fastapi>=0.104.0
uvicorn>=0.24.0
# Dashboard
streamlit>=1.28.0
plotly>=5.18.0
# Utilities
python-dotenv>=1.0.0
tqdm>=4.66.0
EOF
# Créer .gitignore
cat << 'EOF' > .gitignore
# Data (on ne versionne pas les données)
data/raw/*
data/processed/*
data/enriched/*
!data/*/.gitkeep
*.db
# Python
__pycache__/
*.pyc
.venv/
venv/
# Environment
.env
*.log
# IDE
.idea/
.vscode/
# Jupyter
.ipynb_checkpoints/
EOF
# Créer les .gitkeep pour garder les dossiers vides dans Git
touch data/raw/.gitkeep data/processed/.gitkeep data/enriched/.gitkeep
echo "✅ Structure créée !"Explications : - mkdir -p crée tous les dossiers parents manquants - Les accolades {a,b,c} créent plusieurs dossiers en une commande - cat << 'EOF' > fichier permet d’écrire plusieurs lignes dans un fichier - .gitkeep est une convention pour garder les dossiers vides dans Git
videogames-analytics"🎮 Initial commit: project structure"Les 3 commandes Git essentielles : - git init : initialise un nouveau dépôt - git add . : ajoute tous les fichiers au staging - git commit -m "message" : crée un commit
Résultat attendu :
abc1234 🎮 Initial commit: project structure
Télécharge le dataset depuis Kaggle : 1. Va sur https://www.kaggle.com/datasets/rush4ratio/video-game-sales-with-ratings 2. Télécharge le ZIP 3. Extrait le CSV dans data/raw/
💡 Si tu n’as pas de compte Kaggle, utilise le code ci-dessous pour générer des données d’exemple.
# Génération de données d'exemple (exécute cette cellule si tu n'as pas Kaggle)
import pandas as pd
import numpy as np
from pathlib import Path
PROJECT_ROOT = Path("videogames-analytics")
RAW_DIR = PROJECT_ROOT / "data" / "raw"
RAW_DIR.mkdir(parents=True, exist_ok=True)
np.random.seed(42)
n_games = 5000
platforms = ['PS4', 'XOne', 'PC', 'WiiU', 'PS3', 'X360', 'Wii', 'PSV', '3DS', 'PS2']
genres = ['Action', 'Sports', 'Shooter', 'Role-Playing', 'Racing', 'Platform',
'Fighting', 'Simulation', 'Adventure', 'Strategy', 'Puzzle', 'Misc']
publishers = ['Electronic Arts', 'Activision', 'Ubisoft', 'Nintendo', 'Sony',
'Take-Two', 'Sega', 'Capcom', 'Konami', 'Bandai Namco', 'Square Enix']
ratings = ['E', 'E10+', 'T', 'M', 'RP', None]
game_prefixes = ['Super', 'Ultimate', 'Call of', 'Legend of', 'Final', 'Grand', 'Dark']
game_suffixes = ['Warriors', 'Quest', 'Adventure', 'Legends', 'Chronicles', 'Heroes']
game_names = [f"{np.random.choice(game_prefixes)} {np.random.choice(game_suffixes)} {i}"
for i in range(n_games)]
games_df = pd.DataFrame({
'Name': game_names,
'Platform': np.random.choice(platforms, n_games),
'Year_of_Release': np.random.choice(range(2000, 2024), n_games),
'Genre': np.random.choice(genres, n_games),
'Publisher': np.random.choice(publishers, n_games),
'NA_Sales': np.round(np.random.exponential(0.5, n_games), 2),
'EU_Sales': np.round(np.random.exponential(0.3, n_games), 2),
'JP_Sales': np.round(np.random.exponential(0.2, n_games), 2),
'Other_Sales': np.round(np.random.exponential(0.1, n_games), 2),
'Critic_Score': np.where(np.random.random(n_games) > 0.2,
np.random.randint(40, 100, n_games), np.nan),
'User_Score': np.where(np.random.random(n_games) > 0.3,
np.round(np.random.uniform(3, 10, n_games), 1), np.nan),
'Rating': np.random.choice(ratings, n_games)
})
games_df['Global_Sales'] = (games_df['NA_Sales'] + games_df['EU_Sales'] +
games_df['JP_Sales'] + games_df['Other_Sales']).round(2)
games_df.to_csv(RAW_DIR / 'Video_Games_Sales.csv', index=False)
print(f"✅ Dataset créé : {len(games_df):,} jeux")
print(f"📁 Fichier : {RAW_DIR / 'Video_Games_Sales.csv'}")
games_df.head()Charge le CSV et réponds à ces questions :
# TON CODE ICI
# Explore les données et réponds aux 5 questions
import pandas as pd
# Charge le CSV
df = pd.read_csv('videogames-analytics/data/raw/Video_Games_Sales.csv')
# Question 1 : Combien de jeux ?
# Question 2 : Valeurs manquantes ?
# Question 3 : Jeu le plus vendu ?
# Question 4 : Top 5 genres ?
# Question 5 : Plage d'années ?
len(df) ou df.shape[0]df.isnull().sum()df.isnull().sum() / len(df) * 100df.loc[df['colonne'].idxmax()]df['colonne'].value_counts()df['colonne'].min(), df['colonne'].max()import pandas as pd
# Charger les données
df = pd.read_csv('videogames-analytics/data/raw/Video_Games_Sales.csv')
# 1. Nombre de jeux
print(f"📊 Nombre de jeux : {len(df):,}")
print(f" (ou avec shape : {df.shape[0]:,} lignes, {df.shape[1]} colonnes)")
# 2. Valeurs manquantes
print("\n❓ Valeurs manquantes :")
missing = df.isnull().sum()
missing_pct = (missing / len(df) * 100).round(1)
missing_df = pd.DataFrame({'Manquantes': missing, '%': missing_pct})
print(missing_df[missing_df['Manquantes'] > 0])
# 3. Jeu le plus vendu
top_game = df.loc[df['Global_Sales'].idxmax()]
print(f"\n🏆 Jeu le plus vendu : {top_game['Name']}")
print(f" Ventes : {top_game['Global_Sales']}M$ | Plateforme : {top_game['Platform']}")
# 4. Top 5 genres
print("\n🎯 Top 5 genres :")
print(df['Genre'].value_counts().head())
# 5. Plage d'années
min_year = df['Year_of_Release'].min()
max_year = df['Year_of_Release'].max()
print(f"\n📅 Années : {min_year:.0f} - {max_year:.0f}")Crée une fonction clean_videogames_data(input_path, output_path) qui :
Year_of_Release en entier (en gérant les NaN)Decade : la décennie (ex: 2010 pour l’année 2015)Sales_Category basée sur Global_Sales :
< 0.1 → “Flop”0.1 - 1 → “Niche”1 - 5 → “Hit”> 5 → “Blockbuster”data/processed/# TON CODE ICI
# Crée la fonction de nettoyage
import pandas as pd
import numpy as np
from pathlib import Path
def clean_videogames_data(input_path: Path, output_path: Path) -> pd.DataFrame:
"""
Nettoie le dataset de jeux vidéo.
Args:
input_path: Chemin vers le CSV brut
output_path: Chemin pour sauvegarder le Parquet nettoyé
Returns:
DataFrame nettoyé
"""
# TON CODE ICI
pass
# Test ta fonction
# PROJECT_ROOT = Path('videogames-analytics')
# cleaned_df = clean_videogames_data(
# input_path=PROJECT_ROOT / 'data' / 'raw' / 'Video_Games_Sales.csv',
# output_path=PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'
# )df.drop_duplicates(subset=['col1', 'col2'])pd.to_numeric(df['col'], errors='coerce') puis .astype('Int64') (nullable integer)df['Year'] // 10 * 10 (division entière puis multiplication)pd.cut(df['col'], bins=[...], labels=[...])df.to_parquet('fichier.parquet', index=False)import pandas as pd
import numpy as np
from pathlib import Path
def clean_videogames_data(input_path: Path, output_path: Path) -> pd.DataFrame:
"""
Nettoie le dataset de jeux vidéo.
"""
print("🧹 Nettoyage des données...")
# Charger
df = pd.read_csv(input_path)
initial_count = len(df)
print(f" Lignes initiales : {initial_count:,}")
# 1. Supprimer les doublons
df = df.drop_duplicates(subset=['Name', 'Platform', 'Year_of_Release'])
print(f" Après déduplication : {len(df):,} (-{initial_count - len(df)})")
# 2. Convertir Year_of_Release en entier nullable
df['Year_of_Release'] = pd.to_numeric(df['Year_of_Release'], errors='coerce')
df['Year_of_Release'] = df['Year_of_Release'].astype('Int64')
# 3. Remplir les ventes manquantes par 0
sales_cols = ['NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales', 'Global_Sales']
df[sales_cols] = df[sales_cols].fillna(0)
# 4. Créer la décennie
df['Decade'] = (df['Year_of_Release'] // 10 * 10).astype('Int64')
# 5. Créer la catégorie de ventes
df['Sales_Category'] = pd.cut(
df['Global_Sales'],
bins=[-np.inf, 0.1, 1, 5, np.inf],
labels=['Flop', 'Niche', 'Hit', 'Blockbuster']
)
# 6. Sauvegarder en Parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_path, index=False)
print(f" ✅ Sauvegardé : {output_path}")
print(f" Lignes finales : {len(df):,}")
print(f" Nouvelles colonnes : Decade, Sales_Category")
return df
# Exécution
PROJECT_ROOT = Path('videogames-analytics')
cleaned_df = clean_videogames_data(
input_path=PROJECT_ROOT / 'data' / 'raw' / 'Video_Games_Sales.csv',
output_path=PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'
)
# Vérification
print("\n📊 Aperçu :")
print(cleaned_df[['Name', 'Year_of_Release', 'Decade', 'Global_Sales', 'Sales_Category']].head())Pourquoi Parquet ? - Compression : fichier 5-10x plus petit que CSV - Types préservés : pas de perte des types (dates, entiers, catégories) - Lecture rapide : format colonne optimisé pour l’analytique
Le Web Scraping consiste à extraire des données depuis des pages web automatiquement.
┌─────────────────────────────────────────────────────────────────┐
│ WEB SCRAPING PIPELINE │
└─────────────────────────────────────────────────────────────────┘
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ URL │────▶│ requests │────▶│ HTML │────▶│ BS4 │────▶ DataFrame
│ │ │ GET │ │ brut │ │ parse │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
| Outil | Usage |
|---|---|
requests |
Envoyer des requêtes HTTP et récupérer le HTML |
BeautifulSoup |
Parser le HTML et extraire les éléments |
lxml |
Parser HTML/XML (plus rapide) |
robots.txt — vérifie ce que le site autorisetime.sleep() entre les requêtesÉcris une fonction scrape_bestselling_games() qui :
wikitable# TON CODE ICI
# Crée la fonction de scraping Wikipedia
import requests
from bs4 import BeautifulSoup
import pandas as pd
def scrape_bestselling_games() -> pd.DataFrame:
"""
Scrape la liste des jeux les plus vendus depuis Wikipedia.
Returns:
DataFrame avec colonnes: name, sales, platform
"""
# TON CODE ICI
pass
# Test ta fonction
# df_wiki = scrape_bestselling_games()
# print(df_wiki)import requests
from bs4 import BeautifulSoup
# 1. Requête HTTP avec User-Agent
headers = {'User-Agent': 'MonBot/1.0 (contact@example.com)'}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Lève une exception si erreur HTTP
# 2. Parser le HTML
soup = BeautifulSoup(response.text, 'lxml')
# 3. Trouver un élément par classe
table = soup.find('table', {'class': 'wikitable'})
# 4. Trouver toutes les lignes
rows = table.find_all('tr')
# 5. Extraire le texte d'une cellule
cell.get_text(strip=True) # strip=True enlève les espacesimport requests
from bs4 import BeautifulSoup
import pandas as pd
def scrape_bestselling_games() -> pd.DataFrame:
"""
Scrape la liste des jeux les plus vendus depuis Wikipedia.
"""
url = "https://en.wikipedia.org/wiki/List_of_best-selling_video_games"
# Headers pour s'identifier (bonne pratique)
headers = {
'User-Agent': 'Mozilla/5.0 (Educational Bot - Data Engineering Bootcamp)'
}
# Requête HTTP
print(f"🌐 Récupération de {url}...")
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Erreur si status != 200
# Parser le HTML
soup = BeautifulSoup(response.text, 'lxml')
# Trouver le premier tableau wikitable
table = soup.find('table', {'class': 'wikitable'})
if not table:
raise ValueError("❌ Tableau non trouvé sur la page")
# Extraire les données
games = []
rows = table.find_all('tr')[1:11] # Skip header, prendre 10 lignes
for row in rows:
cells = row.find_all(['td', 'th'])
if len(cells) >= 3:
games.append({
'name': cells[0].get_text(strip=True),
'sales': cells[1].get_text(strip=True),
'platform': cells[2].get_text(strip=True) if len(cells) > 2 else 'N/A'
})
print(f"✅ {len(games)} jeux extraits")
return pd.DataFrame(games)
# Test
try:
df_wiki = scrape_bestselling_games()
print("\n🎮 Top 10 jeux les plus vendus (Wikipedia) :")
print(df_wiki.to_string(index=False))
except Exception as e:
print(f"❌ Erreur : {e}")Explications : - raise_for_status() lève une exception si le serveur retourne une erreur (404, 500, etc.) - find_all('tr')[1:11] : on ignore la première ligne (header) et on prend les 10 suivantes - get_text(strip=True) extrait le texte en enlevant les espaces superflus
On va stocker nos données dans deux systèmes complémentaires :
┌─────────────────────────────────────────────────────────────────┐
│ ARCHITECTURE STOCKAGE │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────┐
│ games_cleaned │
│ .parquet │
└────────┬────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ DuckDB │ │ Elasticsearch │
│ │ │ │
│ • SQL queries │ │ • Full-text │
│ • Analytics │ │ • Fuzzy search │
│ • Aggregations │ │ • Suggestions │
└─────────────────┘ └─────────────────┘
| Système | Usage | Avantage |
|---|---|---|
| DuckDB | Requêtes SQL analytiques | Ultra-rapide, zero config |
| Elasticsearch | Recherche full-text | Recherche fuzzy, suggestions |
data/videogames.dbgamesGROUP BY et PARTITION BY ?# TON CODE ICI
# Charge les données dans DuckDB et exécute les requêtes
import duckdb
from pathlib import Path
PROJECT_ROOT = Path('videogames-analytics')
DB_PATH = PROJECT_ROOT / 'data' / 'videogames.db'
# 1. Connexion et création de la table
# 2. Top 10 jeux les plus vendus
# 3. Ventes totales par genre
# 4. Top 3 jeux par genre (Window Function)
# Connexion DuckDB
conn = duckdb.connect(str(DB_PATH))
# Créer une table depuis un Parquet
conn.execute("""
CREATE OR REPLACE TABLE games AS
SELECT * FROM read_parquet('chemin/fichier.parquet')
""")
# Window Function pour classement par groupe
ROW_NUMBER() OVER (PARTITION BY genre ORDER BY sales DESC) as rank
# Puis filtrer avec WHERE rank <= 3import duckdb
from pathlib import Path
PROJECT_ROOT = Path('videogames-analytics')
DB_PATH = PROJECT_ROOT / 'data' / 'videogames.db'
PARQUET_PATH = PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'
# Connexion
conn = duckdb.connect(str(DB_PATH))
# 1. Créer la table depuis le Parquet
conn.execute(f"""
CREATE OR REPLACE TABLE games AS
SELECT * FROM read_parquet('{PARQUET_PATH}')
""")
print(f"✅ Table 'games' créée avec {conn.execute('SELECT COUNT(*) FROM games').fetchone()[0]:,} lignes")
# 2. Top 10 jeux les plus vendus
print("\n🏆 Top 10 jeux les plus vendus :")
print(conn.execute("""
SELECT Name, Platform, Genre, Global_Sales
FROM games
ORDER BY Global_Sales DESC
LIMIT 10
""").fetchdf())
# 3. Ventes totales par genre
print("\n📊 Ventes par genre :")
print(conn.execute("""
SELECT
Genre,
COUNT(*) as nb_games,
ROUND(SUM(Global_Sales), 2) as total_sales,
ROUND(AVG(Global_Sales), 2) as avg_sales
FROM games
GROUP BY Genre
ORDER BY total_sales DESC
""").fetchdf())
# 4. Top 3 jeux par genre (Window Function)
print("\n🎯 Top 3 par genre :")
print(conn.execute("""
WITH ranked AS (
SELECT
Genre,
Name,
Global_Sales,
ROW_NUMBER() OVER (
PARTITION BY Genre
ORDER BY Global_Sales DESC
) as rank
FROM games
)
SELECT Genre, rank, Name, Global_Sales
FROM ranked
WHERE rank <= 3
ORDER BY Genre, rank
""").fetchdf().head(20))
conn.close()
print(f"\n✅ Base sauvegardée : {DB_PATH}")Explication Window Function : - PARTITION BY Genre : crée des “fenêtres” par genre - ORDER BY Global_Sales DESC : ordonne dans chaque fenêtre - ROW_NUMBER() : numérote de 1 à N dans chaque fenêtre - On filtre ensuite WHERE rank <= 3 pour garder le top 3
Lance Elasticsearch en local (comme vu dans le M10) :
index_games_to_es(df, index_name) qui :
videogames avec un mapping appropriésearch_games(query) qui :
Nametext et keyword dans ES ?# TON CODE ICI
# Indexe les jeux dans Elasticsearch
from elasticsearch import Elasticsearch, helpers
import pandas as pd
from typing import List, Dict
def index_games_to_es(df: pd.DataFrame, index_name: str = "videogames") -> int:
"""
Indexe les jeux dans Elasticsearch.
Args:
df: DataFrame des jeux
index_name: Nom de l'index ES
Returns:
Nombre de documents indexés
"""
# TON CODE ICI
pass
def search_games(query: str, index_name: str = "videogames") -> List[Dict]:
"""
Recherche des jeux par nom (fuzzy search).
Args:
query: Terme de recherche
index_name: Nom de l'index ES
Returns:
Liste des résultats
"""
# TON CODE ICI
passfrom elasticsearch import Elasticsearch, helpers
# Connexion
es = Elasticsearch("http://localhost:9200")
es.ping() # Vérifie la connexion
# Supprimer un index
es.indices.delete(index="videogames", ignore=[404])
# Créer avec mapping
es.indices.create(index="videogames", body={"mappings": {...}})
# Bulk indexation
actions = [{"_index": "videogames", "_source": doc} for doc in docs]
helpers.bulk(es, actions)
# Recherche fuzzy
es.search(index="videogames", query={
"match": {"Name": {"query": "...", "fuzziness": "AUTO"}}
})from elasticsearch import Elasticsearch, helpers
import pandas as pd
from typing import List, Dict
def index_games_to_es(df: pd.DataFrame, index_name: str = "videogames") -> int:
"""
Indexe les jeux dans Elasticsearch.
"""
# Connexion
es = Elasticsearch("http://localhost:9200")
if not es.ping():
raise ConnectionError("❌ Elasticsearch non disponible sur localhost:9200")
print(f"✅ Connecté à Elasticsearch")
# Supprimer l'index s'il existe
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
print(f" Index '{index_name}' supprimé")
# Créer l'index avec mapping
mapping = {
"mappings": {
"properties": {
"Name": {"type": "text", "analyzer": "standard"},
"Platform": {"type": "keyword"},
"Genre": {"type": "keyword"},
"Publisher": {"type": "keyword"},
"Year_of_Release": {"type": "integer"},
"Global_Sales": {"type": "float"},
"Critic_Score": {"type": "float"},
"Sales_Category": {"type": "keyword"}
}
}
}
es.indices.create(index=index_name, body=mapping)
print(f" Index '{index_name}' créé")
# Préparer les documents (remplacer NaN par None)
records = df.where(pd.notnull(df), None).to_dict('records')
# Bulk indexation
actions = [
{"_index": index_name, "_source": record}
for record in records
]
success, errors = helpers.bulk(es, actions, raise_on_error=False)
print(f"✅ {success} documents indexés")
if errors:
print(f"⚠️ {len(errors)} erreurs")
return success
def search_games(query: str, index_name: str = "videogames") -> List[Dict]:
"""
Recherche des jeux par nom (fuzzy search).
"""
es = Elasticsearch("http://localhost:9200")
response = es.search(
index=index_name,
query={
"match": {
"Name": {
"query": query,
"fuzziness": "AUTO" # Tolère les fautes de frappe
}
}
},
size=10
)
results = []
for hit in response['hits']['hits']:
results.append({
'score': hit['_score'],
**hit['_source']
})
return results
# Test
try:
# Charger et indexer
df = pd.read_parquet('videogames-analytics/data/processed/games_cleaned.parquet')
index_games_to_es(df)
# Rechercher
print("\n🔍 Recherche 'Final Fantasi' (avec faute) :")
results = search_games("Final Fantasi")
for r in results[:5]:
print(f" {r['Name']} ({r['Platform']}) - Score: {r['score']:.2f}")
except Exception as e:
print(f"⚠️ Erreur : {e}")
print(" Assure-toi qu'Elasticsearch est lancé sur localhost:9200")Explications : - text : le champ est analysé (tokenisé, stemming) → pour la recherche full-text - keyword : valeur exacte → pour les filtres et agrégations - fuzziness: AUTO : tolère 1-2 caractères d’erreur selon la longueur du mot
Utilisons PySpark pour des analyses à grande échelle.
Utilise PySpark pour :
ROW_NUMBER() pour classer les jeux par ventes dans chaque genregroupBy().agg() et les Window Functions ?# TON CODE ICI
# Analyses PySpark avec Window Functions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Créer la session Spark
spark = SparkSession.builder \
.appName("VideoGamesAnalytics") \
.getOrCreate()
# 1. Charger les données
# 2. Stats par Publisher
# 3. Classement par genre avec Window
# 4. Sauvegarder
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Agrégation par groupe
df.groupBy("Publisher").agg(
F.count("*").alias("nb_games"),
F.sum("Global_Sales").alias("total_sales")
)
# Window Function
window = Window.partitionBy("Genre").orderBy(F.desc("Global_Sales"))
df.withColumn("rank", F.row_number().over(window))from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pathlib import Path
PROJECT_ROOT = Path('videogames-analytics')
# Créer la session Spark
spark = SparkSession.builder \
.appName("VideoGamesAnalytics") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
print(f"✅ Spark {spark.version} initialisé")
# 1. Charger les données
games_sdf = spark.read.parquet(str(PROJECT_ROOT / 'data' / 'processed' / 'games_cleaned.parquet'))
print(f"📊 {games_sdf.count():,} jeux chargés")
# 2. Stats par Publisher
print("\n🏢 Statistiques par Publisher :")
publisher_stats = games_sdf.groupBy("Publisher").agg(
F.count("*").alias("nb_games"),
F.round(F.sum("Global_Sales"), 2).alias("total_sales"),
F.round(F.avg("Critic_Score"), 1).alias("avg_critic"),
F.countDistinct("Genre").alias("nb_genres")
).orderBy(F.desc("total_sales"))
publisher_stats.show(10)
# 3. Classement par genre avec Window
print("\n🏆 Top 3 jeux par genre :")
window_genre = Window.partitionBy("Genre").orderBy(F.desc("Global_Sales"))
ranked_games = games_sdf.withColumn(
"rank_in_genre", F.row_number().over(window_genre)
).filter(
F.col("rank_in_genre") <= 3
).select(
"Genre", "rank_in_genre", "Name", "Platform", "Global_Sales"
).orderBy("Genre", "rank_in_genre")
ranked_games.show(20)
# 4. Sauvegarder
output_path = PROJECT_ROOT / 'data' / 'processed' / 'publisher_stats'
publisher_stats.write.mode("overwrite").parquet(str(output_path))
print(f"✅ Résultats sauvegardés : {output_path}")
spark.stop()Crée api/main.py avec les endpoints suivants :
| Endpoint | Méthode | Description |
|---|---|---|
/ |
GET | Message de bienvenue |
/games |
GET | Liste avec filtres (genre, platform, min_sales) |
/games/{name} |
GET | Détails d’un jeu |
/stats/genres |
GET | Stats par genre |
/stats/publishers |
GET | Top publishers |
/search?q=... |
GET | Recherche Elasticsearch |
Lancer l’API : uvicorn api.main:app --reload
Crée dashboard/app.py avec :
Lancer le dashboard : streamlit run dashboard/app.py
Crée scripts/run_pipeline.sh qui :
pipeline.log💡 Note : Les solutions complètes pour les phases 4-7 suivent le même format que les phases précédentes. Essaie d’abord par toi-même !
Tu as terminé le Projet Intégrateur Débutant !
┌─────────────────────────────────────────────────────────────────────────────┐
│ VIDEO GAMES ANALYTICS - COMPÉTENCES │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐
│ Video Games │
│ Analytics │
└────────┬────────┘
│
┌────────────────────────────┼────────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ INGESTION │ │ PROCESSING │ │ STORAGE │
├─────────────┤ ├─────────────┤ ├─────────────┤
│ • Kaggle │ │ • Pandas │ │ • DuckDB │
│ • Scraping │ │ • PySpark │ │ • ES │
│ • BS4 │ │ • Window │ │ • Parquet │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└────────────────────────────┼────────────────────────────┘
│
┌────────────────────────────┼────────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ SERVING │ │ DEVOPS │ │ DASHBOARD │
├─────────────┤ ├─────────────┤ ├─────────────┤
│ • FastAPI │ │ • Git │ │ • Streamlit │
│ • REST API │ │ • Bash │ │ • Plotly │
│ • Pydantic │ │ • Automation│ │ • Filters │
└─────────────┘ └─────────────┘ └─────────────┘
| Domaine | Compétences |
|---|---|
| Ingestion | CSV, Web Scraping, APIs |
| Processing | Pandas, PySpark, SQL |
| Storage | DuckDB, Elasticsearch, Parquet |
| Serving | FastAPI, Streamlit |
| DevOps | Git, Bash, Automation |
👉 Niveau Intermédiaire : Docker, Kubernetes, Kafka, Delta Lake, dbt, Airflow…
🎮 Video Games Analytics Platform — Data Engineering Bootcamp