s3://my-bucket/bronze/2024/01/transactions.parquet
└───┬───┘ └──────────┬─────────┘ └─────┬──────┘
Bucket Prefix Object Key
⚠️ IMPORTANT : Les préfixes NE SONT PAS des dossiers !
C'est juste une convention de nommage (clé-valeur plat)
Concept
Description
Exemple
Bucket
Conteneur racine, nom unique global
my-company-datalake
Key
Identifiant unique de l’objet
bronze/2024/01/data.parquet
Prefix
“Faux dossier”, filtre de listing
bronze/2024/
Object
Le fichier + ses metadata
Parquet, CSV, JSON…
5.2 Metadata & Tags
Chaque objet peut avoir des metadata custom :
Voir le code
# Exemple de metadata sur un objet S3metadata_example = {# Metadata système (automatiques)"Content-Type": "application/octet-stream","Content-Length": 1048576,"Last-Modified": "2024-01-15T10:30:00Z","ETag": "d41d8cd98f00b204e9800998ecf8427e",# Metadata custom (x-amz-meta-*)"x-amz-meta-source": "kafka-topic-orders","x-amz-meta-pipeline": "etl-daily","x-amz-meta-schema-version": "2.1",}# Tags (pour billing, governance)tags = {"Environment": "production","Team": "data-engineering","CostCenter": "DE-001",}print("Metadata et Tags permettent de :")print("- Tracer l'origine des données")print("- Filtrer pour la gouvernance")print("- Allouer les coûts par équipe")
5.3 Versioning & Lifecycle Policies
Versioning : Garder plusieurs versions d’un même objet
s3://bucket/data.csv
│
├── Version 1 (2024-01-01) ← Ancienne
├── Version 2 (2024-01-15) ← Ancienne
└── Version 3 (2024-02-01) ← Current
Lifecycle Policies : Automatiser la gestion du stockage
┌─────────────┐ 30 jours ┌─────────────┐ 90 jours ┌─────────────┐
│ Standard │ ─────────────▶ │ Infrequent │ ────────────▶ │ Glacier │
│ $0.023/GB │ │ $0.0125/GB│ │ $0.004/GB │
└─────────────┘ └─────────────┘ └─────────────┘
Hot Cool Archive
5.4 Classes de stockage
Classe
Usage
Latence
Coût stockage
Standard
Accès fréquent
ms
$0.023/GB
IA (Infrequent Access)
Accès rare
ms
$0.0125/GB
Glacier
Archivage
minutes-heures
$0.004/GB
Glacier Deep Archive
Compliance
heures
$0.00099/GB
5.5 Protocoles & Drivers
🔥 Section essentielle — Source de confusion fréquente !
Protocole
Cloud
Usage
Driver/SDK
Exemple
s3://
AWS
CLI, Python
boto3, aws cli
s3://bucket/key
s3a://
AWS
Spark/Hadoop
hadoop-aws
s3a://bucket/key
abfs://
Azure
Spark (legacy)
hadoop-azure
abfs://container@account.dfs.core.windows.net/
abfss://
Azure
Spark (TLS)
hadoop-azure
abfss://container@account.dfs.core.windows.net/
gs://
GCP
Spark, CLI
gcs-connector
gs://bucket/key
https://
Tous
Direct HTTP
requests
Signed URLs
⚠️ ATTENTION :
s3:// ≠ s3a://
- s3:// → AWS CLI, boto3 (haut niveau)
- s3a:// → Hadoop/Spark (bas niveau, optimisé Big Data)
Dans Spark, TOUJOURS utiliser s3a://, abfss://, ou gs://
5.6 Metadata Catalogs — Transition vers le module 23
L’Object Storage stocke des fichiers bruts. Mais pour faire du SQL, on a besoin de : - Schéma des tables (colonnes, types) - Localisation des partitions - Statistiques pour l’optimiseur
💡 Preview Module 23 : Delta Lake et Iceberg intègrent leur propre Transaction Log directement dans l’Object Storage. Plus besoin de catalogue externe !
Sans optimisation (tout en Standard) : 750 × $0.023 = $17.25/mois
Économie : 30% !
6. AWS S3 — Deep Dive
6.1 Concepts & Classes de stockage
Classe
Durabilité
Disponibilité
Min storage
Retrieval
Standard
11 nines
99.99%
-
Immédiat
Intelligent-Tiering
11 nines
99.9%
-
Immédiat
Standard-IA
11 nines
99.9%
30 jours
Immédiat
Glacier Instant
11 nines
99.9%
90 jours
ms
Glacier Flexible
11 nines
99.99%
90 jours
1-12h
Glacier Deep Archive
11 nines
99.99%
180 jours
12-48h
6.2 Opérations CLI
Voir le code
s3_cli_commands ="""# Lister les bucketsaws s3 ls# Lister le contenu d'un bucketaws s3 ls s3://my-bucket/bronze/# Copier un fichier local vers S3aws s3 cp data.csv s3://my-bucket/bronze/data.csv# Copier un fichier S3 vers localaws s3 cp s3://my-bucket/bronze/data.csv ./data.csv# Synchroniser un dossieraws s3 sync ./local-folder/ s3://my-bucket/bronze/# Supprimer un fichieraws s3 rm s3://my-bucket/bronze/old-data.csv# Supprimer récursivementaws s3 rm s3://my-bucket/temp/ --recursive"""print(s3_cli_commands)
6.3 Python avec boto3
Voir le code
# Installation : pip install boto3import boto3from botocore.exceptions import ClientError# Créer un client S3s3 = boto3.client('s3')# --- Upload ---def upload_file(file_path, bucket, key):"""Upload un fichier vers S3.""" s3.upload_file(file_path, bucket, key)print(f"✅ Uploaded {file_path} to s3://{bucket}/{key}")# --- Download ---def download_file(bucket, key, file_path):"""Download un fichier depuis S3.""" s3.download_file(bucket, key, file_path)print(f"✅ Downloaded s3://{bucket}/{key} to {file_path}")# --- List objects ---def list_objects(bucket, prefix=""):"""Liste les objets dans un bucket.""" response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)if'Contents'in response:for obj in response['Contents']:print(f" {obj['Key']} ({obj['Size']} bytes)")return response.get('Contents', [])# --- Presigned URL ---def generate_presigned_url(bucket, key, expiration=3600):"""Génère une URL temporaire pour accès direct.""" url = s3.generate_presigned_url('get_object', Params={'Bucket': bucket, 'Key': key}, ExpiresIn=expiration )return url# Exemple d'utilisation (commenté car pas de credentials)# upload_file('data.csv', 'my-bucket', 'bronze/data.csv')# list_objects('my-bucket', 'bronze/')print("📝 Fonctions boto3 définies (upload, download, list, presigned URL)")
🔐 Best Practice : JAMAIS de credentials dans le code !
Utiliser :
- Variables d'environnement
- Instance Profiles (EC2)
- IRSA (Kubernetes) ← Module 21
- AWS Secrets Manager
gcs_cli_commands ="""# gsutil - CLI pour GCS# Lister les bucketsgsutil ls# Lister le contenu d'un bucketgsutil ls gs://my-bucket/bronze/# Copiergsutil cp data.csv gs://my-bucket/bronze/gsutil cp gs://my-bucket/bronze/data.csv ./# Synchroniser (comme rsync)gsutil rsync -r ./local/ gs://my-bucket/bronze/# Copie parallèle (gros fichiers)gsutil -m cp -r ./data/ gs://my-bucket/bronze/"""print(gcs_cli_commands)
Voir le code
# Installation : pip install google-cloud-storagegcs_python_example ="""from google.cloud import storage# Créer le client (utilise GOOGLE_APPLICATION_CREDENTIALS)client = storage.Client()# Accéder au bucketbucket = client.bucket("my-bucket")# Uploadblob = bucket.blob("bronze/data.csv")blob.upload_from_filename("data.csv")# Downloadblob.download_to_filename("downloaded.csv")# List blobsblobs = bucket.list_blobs(prefix="bronze/")for blob in blobs: print(blob.name)# Signed URL (temporaire)url = blob.generate_signed_url(expiration=3600) # 1 heure"""print(gcs_python_example)
8.3 GCS avec Spark (gs://)
Voir le code
spark_gcs_config ="""# Configuration Spark pour GCS# Option 1 : Service Account Key (dev)spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")spark.conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/path/to/keyfile.json")# Option 2 : Application Default Credentials (GKE, Cloud Functions)# Pas de config nécessaire si ADC est configuré# Lire depuis GCSdf = spark.read.parquet("gs://my-bucket/silver/data/")# Écrire vers GCSdf.write.mode("overwrite").parquet("gs://my-bucket/gold/aggregates/")"""print(spark_gcs_config)
8.4 Authentification GCP
Méthode
Sécurité
Usage
Production ?
Service Account Key
⚠️ Moyenne
Dev, CI/CD
⚠️ Avec précaution
Workload Identity
✅ Haute
GKE
✅ Oui
ADC (Application Default Credentials)
✅ Haute
Cloud Functions, Cloud Run
✅ Oui
Exercice 6 : Lister et télécharger depuis GCS
Écris un script qui liste tous les fichiers .parquet dans un bucket et télécharge le premier.
💡 Voir la solution
from google.cloud import storageclient = storage.Client()bucket = client.bucket("my-bucket")# Lister les fichiers .parquetparquet_files = []for blob in bucket.list_blobs(prefix="silver/"):if blob.name.endswith('.parquet'): parquet_files.append(blob)print(f"Found: {blob.name}")# Télécharger le premierif parquet_files: first_file = parquet_files[0] first_file.download_to_filename("downloaded.parquet")print(f"Downloaded: {first_file.name}")
9. MinIO — Object Storage Local
9.1 Pourquoi MinIO ?
Avantage
Description
100% S3 compatible
Même API, même code boto3
Gratuit
Open-source, pas de compte cloud
Local
Parfait pour dev/test
Léger
Docker, un seul binaire
Production-ready
Utilisé aussi en production (on-premise)
💡 Le code écrit pour MinIO fonctionne sur S3 sans modification !
Il suffit de changer l'endpoint.
minio_commands ="""# Démarrer MinIOdocker-compose up -d# Accéder à la console web# http://localhost:9001# Login: minioadmin / minioadmin# Installer mc (MinIO Client)# Linuxwget https://dl.min.io/client/mc/release/linux-amd64/mcchmod +x mcsudo mv mc /usr/local/bin/# Macbrew install minio/stable/mc# Configurer mcmc alias set myminio http://localhost:9000 minioadmin minioadmin# Créer des bucketsmc mb myminio/bronzemc mb myminio/silvermc mb myminio/gold# Listermc ls myminio/# Uploadmc cp data.csv myminio/bronze/"""print(minio_commands)
9.4 Python avec MinIO (boto3)
Voir le code
# Le même code boto3 fonctionne avec MinIO !# Il suffit de spécifier endpoint_urlminio_boto3_example ="""import boto3from botocore.client import Config# Configuration pour MinIOs3 = boto3.client( 's3', endpoint_url='http://localhost:9000', # ← La seule différence ! aws_access_key_id='minioadmin', aws_secret_access_key='minioadmin', config=Config(signature_version='s3v4'))# Créer un buckets3.create_bucket(Bucket='bronze')# Upload (identique à S3)s3.upload_file('data.csv', 'bronze', 'raw/data.csv')# List (identique à S3)response = s3.list_objects_v2(Bucket='bronze')for obj in response.get('Contents', []): print(obj['Key'])# Download (identique à S3)s3.download_file('bronze', 'raw/data.csv', 'downloaded.csv')"""print(minio_boto3_example)print("\n💡 Ce code fonctionne sur S3 en enlevant juste endpoint_url !")
# Solution rapidedocker run -d--name minio \-p 9000:9000 -p 9001:9001 \-e MINIO_ROOT_USER=minioadmin \-e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server /data --console-address":9001"
10. Performance & Optimisation
10.1 Le problème des petits fichiers
❌ MAUVAIS : 10,000 fichiers × 1 MB = 10 GB
- 10,000 requêtes API (LIST + GET)
- Overhead énorme pour Spark
- Temps de lecture : minutes
✅ BON : 100 fichiers × 100 MB = 10 GB
- 100 requêtes API
- Parallélisme optimal
- Temps de lecture : secondes
Taille idéale : 100 MB - 1 GB par fichier
Solutions : - df.coalesce(n) ou df.repartition(n) avant écriture - Compaction périodique - Delta Lake / Iceberg (Module 23) = compaction automatique
Voir le code
# Éviter les petits fichiers avec Sparksmall_files_solution ="""# Lecture de beaucoup de petits fichiersdf = spark.read.parquet("s3a://bronze/data/") # 10,000 fichiers# ❌ Écriture directe = même nombre de fichiers# df.write.parquet("s3a://silver/data/")# ✅ Repartitionner avant d'écriredf.repartition(100) \ # 100 fichiers de ~100 MB .write.mode("overwrite") \ .parquet("s3a://silver/data/")# ✅ Ou coalesce (moins de shuffle)df.coalesce(100) \ .write.mode("overwrite") \ .parquet("s3a://silver/data/")"""print(small_files_solution)
Avantages : - Partition pruning (Spark ne lit que les partitions nécessaires) - Requêtes plus rapides
⚠️ Attention au over-partitioning :
Trop de partitions = trop de petits fichiers
Règle : max 10,000 partitions
Voir le code
# Partitionnement avec Sparkpartitioning_example ="""# Écriture partitionnéedf.write \ .partitionBy("year", "month") \ .mode("overwrite") \ .parquet("s3a://silver/sales/")# Lecture avec partition pruningdf = spark.read.parquet("s3a://silver/sales/")# Cette requête ne lit QUE year=2024/month=01df.filter("year = 2024 AND month = 1").show()"""print(partitioning_example)
10.3 Formats de fichiers
Format
Type
Compression
Lecture colonnes
Use case
CSV
Row
Non
❌
Échange, debug
JSON
Row
Non
❌
APIs, logs
Avro
Row
Oui
❌
Streaming, Kafka
Parquet
Columnar
Oui
✅
Analytics, Data Lake
ORC
Columnar
Oui
✅
Hive, analytics
💡 Pour le Data Engineering : PARQUET est le standard
- Compression excellente (snappy, zstd)
- Lecture par colonnes
- Predicate pushdown
- Schema intégré
Exercice 8 : Comparer Parquet vs CSV
Voir le code
# Exercice : Comparer la taille et le temps de lectureformat_comparison ="""import timefrom pyspark.sql import SparkSessionspark = SparkSession.builder.appName("Format Comparison").getOrCreate()# Créer un DataFrame de test (1M lignes)df = spark.range(1000000).toDF("id") \ .withColumn("name", lit("test_name")) \ .withColumn("amount", rand() * 1000)# Écrire en CSVstart = time.time()df.write.mode("overwrite").csv("s3a://test/csv/")csv_write_time = time.time() - start# Écrire en Parquetstart = time.time()df.write.mode("overwrite").parquet("s3a://test/parquet/")parquet_write_time = time.time() - start# Lire CSVstart = time.time()df_csv = spark.read.csv("s3a://test/csv/").count()csv_read_time = time.time() - start# Lire Parquetstart = time.time()df_parquet = spark.read.parquet("s3a://test/parquet/").count()parquet_read_time = time.time() - startprint(f"CSV Write: {csv_write_time:.2f}s, Read: {csv_read_time:.2f}s")print(f"Parquet Write: {parquet_write_time:.2f}s, Read: {parquet_read_time:.2f}s")"""print(format_comparison)print("\n💡 Résultat attendu : Parquet 5-10x plus rapide et 5-10x plus petit")
Exercice 9 : Impact de la taille des fichiers
Objectif : Mesurer l’impact du nombre de fichiers sur les performances.
# Scénario A : 1000 petits fichiersdf.repartition(1000).write.parquet("s3a://test/small-files/")# Scénario B : 10 gros fichiersdf.repartition(10).write.parquet("s3a://test/large-files/")# Mesurer le temps de lecture pour chaque
Résultat attendu : Scénario B sera beaucoup plus rapide.
11. Sécurité & Gouvernance
11.1 Encryption
Type
Description
Qui gère la clé ?
SSE-S3
Encryption côté serveur, clé AWS
AWS
SSE-KMS
Encryption avec AWS KMS
AWS (tu choisis la clé)
SSE-C
Encryption avec clé client
Toi
Client-side
Encryption avant upload
Toi
💡 Best Practice : SSE-KMS pour la plupart des cas
- Rotation automatique des clés
- Audit dans CloudTrail
- Contrôle d'accès fin
⚠️ Attention aux coûts cachés :
1. LISTING fréquent
- Spark fait un LIST avant chaque lecture
- 1M de fichiers = 1000 LIST calls = $5
2. EGRESS
- Données sortant du cloud = coûteux
- Cross-region = $0.02/GB
- Vers Internet = $0.09/GB
3. Small files
- Plus de requêtes API
- Plus de listing
# Étape 4 : Agréger avec Spark SQL (Silver → Gold)step4_code ="""# Lire depuis Silverdf_silver = spark.read.parquet("s3a://silver/sales/")# Créer une vue temporairedf_silver.createOrReplaceTempView("sales")# Agrégation avec Spark SQLdf_gold = spark.sql(\"\"\" SELECT category, COUNT(*) as num_transactions, SUM(quantity) as total_quantity, SUM(total) as total_revenue, AVG(total) as avg_transaction FROM sales GROUP BY category ORDER BY total_revenue DESC\"\"\")print("📊 Agrégations Gold:")df_gold.show()# Écrire vers Golddf_gold.coalesce(1) \ .write \ .mode("overwrite") \ .parquet("s3a://gold/category_summary/")print("✅ Données écrites vers gold/category_summary/")"""print(step4_code)
Voir le code
# Étape 5 : Vérifier les résultatsstep5_code ="""# Lister les fichiers créésimport subprocess# Avec mc CLIprint("📁 Contenu de bronze/:")!mc ls myminio/bronze/ --recursiveprint("\n📁 Contenu de silver/:")!mc ls myminio/silver/ --recursiveprint("\n📁 Contenu de gold/:")!mc ls myminio/gold/ --recursive# Ou avec boto3for bucket in ['bronze', 'silver', 'gold']: print(f"\n📁 {bucket}/") response = s3.list_objects_v2(Bucket=bucket) for obj in response.get('Contents', []): print(f" {obj['Key']} ({obj['Size']} bytes)")"""print(step5_code)
Quiz de fin de module
❓ Q1. Quelle est la différence entre un prefix et un dossier dans S3 ?
Aucune différence
Un prefix est un vrai dossier créé par S3
Un prefix est juste une convention de nommage, S3 est un key-value store plat
Un dossier peut contenir des sous-dossiers, pas un prefix
💡 Voir la réponse
✅ Réponse : c — S3 est un key-value store plat. Les “/” dans les clés sont juste des caractères comme les autres.
❓ Q2. Quel protocole utiliser pour lire S3 avec Spark ?
s3://
s3a://
https://
hdfs://
💡 Voir la réponse
✅ Réponse : b — s3a:// est le protocole Hadoop optimisé pour Spark. s3:// est pour AWS CLI/boto3.
❓ Q3. Pourquoi les petits fichiers sont-ils un problème ?
Ils prennent plus de place
Ils génèrent trop de requêtes API et d’overhead
Ils ne sont pas supportés par Parquet
Ils ne peuvent pas être partitionnés
💡 Voir la réponse
✅ Réponse : b — Chaque fichier = une requête API. 10,000 fichiers = 10,000 GET requests = lent et coûteux.
❓ Q4. Quelle est la différence entre SAS Token et Managed Identity sur Azure ?
SAS Token est plus sécurisé
Managed Identity est temporaire, SAS est permanent
SAS Token est temporaire et partageable, Managed Identity est liée à une ressource Azure
Aucune différence
💡 Voir la réponse
✅ Réponse : c — SAS Token = URL temporaire partageable. Managed Identity = identité attachée à une VM/AKS, plus sécurisé.
❓ Q5. Pourquoi MinIO est-il compatible avec S3 ?
C’est un produit AWS
Il implémente la même API REST que S3
Il utilise les mêmes serveurs
Il copie les données depuis S3
💡 Voir la réponse
✅ Réponse : b — MinIO implémente l’API S3 (REST). Le même code boto3 fonctionne avec les deux.
❓ Q6. Quelle classe de stockage pour des données rarement lues ?
S3 Standard
S3 Intelligent-Tiering
S3 Glacier
S3 One Zone-IA
💡 Voir la réponse
✅ Réponse : c — Glacier pour l’archivage (données rarement lues). Intelligent-Tiering si le pattern d’accès est imprévisible.
❓ Q7. Quel est l’avantage du partitionnement dans un Data Lake ?
Les fichiers sont plus petits
Spark peut ignorer les partitions non pertinentes (partition pruning)
Le stockage coûte moins cher
Les données sont automatiquement compressées
💡 Voir la réponse
✅ Réponse : b — Partition pruning = Spark lit uniquement les partitions qui matchent le filtre.
❓ Q8. Comment authentifier Spark on K8s vers S3 en production ?
Access Keys dans le code
Variables d’environnement
IAM Roles for Service Accounts (IRSA)
Fichier de config local
💡 Voir la réponse
✅ Réponse : c — IRSA permet d’associer un IAM Role à un ServiceAccount K8s. Pas de credentials dans le code.