⚡ Real-Time OLAP & Dashboards

Bienvenue dans ce module où tu vas découvrir les moteurs OLAP temps réel — des bases de données optimisées pour des requêtes analytiques ultra-rapides sur des données en streaming. Tu apprendras à construire des dashboards live qui se rafraîchissent en temps réel.


Prérequis

Niveau Compétence
✅ Requis Kafka & Spark Streaming (M24)
✅ Requis SQL avancé
✅ Requis Docker
💡 Recommandé Distributed Messaging (M29)

🎯 Objectifs du module

À la fin de ce module, tu seras capable de :

  • Comprendre quand utiliser un OLAP engine vs Spark
  • Déployer et configurer ClickHouse
  • Ingérer des données depuis Kafka vers ClickHouse
  • Créer des Materialized Views pour pré-agrégation
  • Construire des dashboards temps réel avec Grafana
  • Connaître les alternatives : Druid et Pinot

1. Introduction : Pourquoi un OLAP Engine ?

1.1 Rappel : Architecture Streaming (M24)

Dans le module M24, tu as appris à construire des pipelines streaming :

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CE QU'ON A VU EN M24                                     │
│                                                                             │
│   Source ──▶ Kafka ──▶ Spark Streaming ──▶ Delta Lake                      │
│                              │                                              │
│                              └── Transformations                            │
│                                  Agrégations                                │
│                                  Windowing                                  │
│                                                                             │
│   ✅ Ingestion temps réel                                                   │
│   ✅ Transformations complexes                                              │
│   ✅ Exactly-once semantics                                                 │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 Le Problème : Queries Interactives

Spark est excellent pour le traitement mais moins pour les queries interactives :

Besoin Spark OLAP Engine
Query latency Secondes Millisecondes
Concurrent users ~10 ~1000
Ad-hoc queries Lent à démarrer Instantané
Dashboard refresh Coûteux Optimisé

1.3 OLAP vs OLTP vs Streaming

┌─────────────────────────────────────────────────────────────────────────────┐
│                    OLTP vs OLAP vs STREAMING                                │
│                                                                             │
│   OLTP (PostgreSQL)         STREAMING (Kafka+Spark)    OLAP (ClickHouse)   │
│   ─────────────────         ────────────────────────   ─────────────────   │
│                                                                             │
│   • Row-oriented            • Event processing         • Column-oriented   │
│   • Single row ops          • Continuous               • Analytical        │
│   • ACID transactions       • Transformations          • Fast aggregations │
│   • Low latency writes      • State management         • Low latency reads │
│                                                                             │
│   Use: Applications         Use: Pipelines             Use: Analytics      │
│   Ex: User signup           Ex: ETL, enrichment        Ex: Dashboards      │
└─────────────────────────────────────────────────────────────────────────────┘

1.4 Architecture Complète Real-Time Analytics

┌─────────────────────────────────────────────────────────────────────────────┐
│                    REAL-TIME ANALYTICS ARCHITECTURE                         │
│                                                                             │
│                                                                             │
│   ┌─────────┐     ┌─────────┐     ┌─────────────┐     ┌─────────────┐     │
│   │  Apps   │────▶│  Kafka  │────▶│   Spark     │────▶│ Delta Lake  │     │
│   │  IoT    │     │         │     │  Streaming  │     │ (historique)│     │
│   │  Events │     │         │     └─────────────┘     └─────────────┘     │
│   └─────────┘     │         │                                              │
│                   │         │     ┌─────────────┐     ┌─────────────┐     │
│                   │         │────▶│ ClickHouse  │────▶│  Grafana    │     │
│                   │         │     │   (OLAP)    │     │ (Dashboard) │     │
│                   └─────────┘     └─────────────┘     └─────────────┘     │
│                                                                             │
│   M24: Kafka + Spark ────────────────────┐                                 │
│   M33: OLAP + Dashboards ────────────────┘ ◀── CE MODULE                   │
└─────────────────────────────────────────────────────────────────────────────┘

2. ClickHouse : Le Moteur OLAP Ultra-Rapide

2.1 Qu’est-ce que ClickHouse ?

ClickHouse est un SGBD OLAP open-source créé par Yandex, conçu pour : - Requêtes analytiques sur des milliards de lignes - Latence de millisecondes - Ingestion à haute vitesse (millions de lignes/sec)

2.2 Pourquoi ClickHouse est Rapide ?

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CLICKHOUSE : SECRETS DE PERFORMANCE                      │
│                                                                             │
│   1. COLUMNAR STORAGE                                                       │
│   ───────────────────                                                       │
│   Row-based:    [id, name, amount, date] [id, name, amount, date] ...      │
│   Column-based: [id, id, id...] [name, name...] [amount, amount...] ✅     │
│                                                                             │
│   → Lit uniquement les colonnes nécessaires                                │
│   → Compression excellente (valeurs similaires groupées)                   │
│                                                                             │
│   2. VECTORIZED EXECUTION                                                   │
│   ───────────────────────                                                   │
│   Traite les données par blocs (SIMD), pas ligne par ligne                 │
│                                                                             │
│   3. DATA SKIPPING                                                          │
│   ──────────────────                                                        │
│   Indexes sparse + min/max par granule → skip des blocs inutiles           │
│                                                                             │
│   4. COMPRESSION                                                            │
│   ─────────────                                                             │
│   LZ4/ZSTD par défaut, 10-20x compression ratio                            │
└─────────────────────────────────────────────────────────────────────────────┘

2.3 Architecture ClickHouse

┌─────────────────────────────────────────────────────────────────────────────┐
│                    CLICKHOUSE ARCHITECTURE                                  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                         CLUSTER                                     │  │
│   │                                                                     │  │
│   │   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐              │  │
│   │   │   Shard 1   │   │   Shard 2   │   │   Shard 3   │              │  │
│   │   │             │   │             │   │             │              │  │
│   │   │ ┌─────────┐ │   │ ┌─────────┐ │   │ ┌─────────┐ │              │  │
│   │   │ │Replica 1│ │   │ │Replica 1│ │   │ │Replica 1│ │              │  │
│   │   │ └─────────┘ │   │ └─────────┘ │   │ └─────────┘ │              │  │
│   │   │ ┌─────────┐ │   │ ┌─────────┐ │   │ ┌─────────┐ │              │  │
│   │   │ │Replica 2│ │   │ │Replica 2│ │   │ │Replica 2│ │              │  │
│   │   │ └─────────┘ │   │ └─────────┘ │   │ └─────────┘ │              │  │
│   │   └─────────────┘   └─────────────┘   └─────────────┘              │  │
│   │                                                                     │  │
│   │   Sharding: Distribution horizontale des données                   │  │
│   │   Replication: Haute disponibilité                                 │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                      ZOOKEEPER / CLICKHOUSE KEEPER                  │  │
│   │                      (coordination, replication)                    │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

2.4 Installation avec Docker

# Démarrer ClickHouse (single node)
docker run -d \
    --name clickhouse-server \
    -p 8123:8123 \
    -p 9000:9000 \
    -v clickhouse_data:/var/lib/clickhouse \
    -v clickhouse_logs:/var/log/clickhouse-server \
    clickhouse/clickhouse-server:latest

# Accéder au client CLI
docker exec -it clickhouse-server clickhouse-client

# Ou via HTTP (port 8123)
curl 'http://localhost:8123/?query=SELECT%201'

2.5 Docker Compose (ClickHouse + Kafka + Grafana)

# docker-compose.yaml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"  # HTTP
      - "9000:9000"  # Native
    volumes:
      - clickhouse_data:/var/lib/clickhouse
    ulimits:
      nofile:
        soft: 262144
        hard: 262144

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      GF_INSTALL_PLUGINS: grafana-clickhouse-datasource
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  clickhouse_data:
  grafana_data:
# Démarrer tout
docker-compose up -d

# Accès :
# - ClickHouse : http://localhost:8123
# - Grafana : http://localhost:3000 (admin/admin)

2.6 Table Engines

ClickHouse propose différents engines selon le use case :

Engine Use Case Caractéristiques
MergeTree Analytics standard Le plus utilisé, tri, partitioning
ReplacingMergeTree Déduplication Garde dernière version par clé
SummingMergeTree Pré-agrégation Somme automatique par clé
AggregatingMergeTree Agrégations complexes States d’agrégation
Kafka Ingestion Kafka Consomme directement un topic
Buffer Write buffering Accumule avant d’écrire

2.7 Créer une Table MergeTree

Voir le code
# Exemple SQL ClickHouse

create_table_sql = """
-- ═══════════════════════════════════════════════════════════════
-- TABLE : events (MergeTree)
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE IF NOT EXISTS events
(
    -- Colonnes
    event_id        UUID DEFAULT generateUUIDv4(),
    event_time      DateTime64(3),          -- Millisecond precision
    event_date      Date DEFAULT toDate(event_time),
    user_id         String,
    event_type      LowCardinality(String), -- Optimisé pour peu de valeurs distinctes
    page            String,
    country         LowCardinality(String),
    device          LowCardinality(String),
    session_id      String,
    duration_ms     UInt32,
    revenue         Decimal(10, 2) DEFAULT 0,
    properties      String                   -- JSON stocké comme String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)           -- Partition par mois
ORDER BY (event_date, event_type, user_id)  -- Clé de tri (crucial pour performance)
TTL event_date + INTERVAL 90 DAY            -- Retention 90 jours
SETTINGS index_granularity = 8192;          -- Granularité de l'index

-- ═══════════════════════════════════════════════════════════════
-- Insérer des données
-- ═══════════════════════════════════════════════════════════════

INSERT INTO events (event_time, user_id, event_type, page, country, device, session_id, duration_ms, revenue)
VALUES
    (now(), 'user_001', 'page_view', '/home', 'FR', 'mobile', 'sess_abc', 1500, 0),
    (now(), 'user_001', 'click', '/products', 'FR', 'mobile', 'sess_abc', 200, 0),
    (now(), 'user_002', 'purchase', '/checkout', 'US', 'desktop', 'sess_xyz', 5000, 99.99),
    (now(), 'user_003', 'page_view', '/home', 'DE', 'tablet', 'sess_123', 800, 0);

-- ═══════════════════════════════════════════════════════════════
-- Requêtes analytiques
-- ═══════════════════════════════════════════════════════════════

-- Events par type aujourd'hui
SELECT 
    event_type,
    count() AS event_count,
    uniq(user_id) AS unique_users,
    avg(duration_ms) AS avg_duration
FROM events
WHERE event_date = today()
GROUP BY event_type
ORDER BY event_count DESC;

-- Revenue par pays (dernière heure)
SELECT 
    country,
    sum(revenue) AS total_revenue,
    count() AS purchases
FROM events
WHERE event_type = 'purchase'
  AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY country
ORDER BY total_revenue DESC;

-- Funnel analysis
SELECT
    countIf(event_type = 'page_view') AS views,
    countIf(event_type = 'click') AS clicks,
    countIf(event_type = 'purchase') AS purchases,
    round(clicks / views * 100, 2) AS click_rate,
    round(purchases / clicks * 100, 2) AS conversion_rate
FROM events
WHERE event_date = today();
"""

print(create_table_sql)

2.8 ORDER BY : La Clé de la Performance

Le ORDER BY est crucial dans ClickHouse. Il définit : - L’ordre physique des données sur disque - L’index primaire (sparse index) - Les colonnes à utiliser dans les filtres WHERE

┌─────────────────────────────────────────────────────────────────────────────┐
│                    ORDER BY BEST PRACTICES                                  │
│                                                                             │
│   RÈGLE 1 : Mettre les colonnes de filtre fréquent en premier              │
│   ─────────────────────────────────────────────────────────────            │
│   ORDER BY (date, user_id, event_type)                                     │
│             ^^^^                                                            │
│   Si tu filtres souvent par date, mets-la en premier                       │
│                                                                             │
│   RÈGLE 2 : Du moins cardinal au plus cardinal                             │
│   ─────────────────────────────────────────────                            │
│   ORDER BY (country, city, user_id)                                        │
│             ~200      ~50K   ~10M  valeurs distinctes                      │
│                                                                             │
│   RÈGLE 3 : Ne pas mettre trop de colonnes                                 │
│   ─────────────────────────────────────────                                │
│   3-5 colonnes max, sinon l'index grossit trop                             │
└─────────────────────────────────────────────────────────────────────────────┘
-- BON : filtre sur les premières colonnes du ORDER BY
SELECT * FROM events
WHERE event_date = '2024-01-15' AND event_type = 'purchase';

-- MOINS BON : filtre sur une colonne non dans ORDER BY
SELECT * FROM events
WHERE session_id = 'abc123';  -- Full scan possible

3. Ingestion Kafka → ClickHouse

3.1 Architecture d’Ingestion

┌─────────────────────────────────────────────────────────────────────────────┐
│                    KAFKA → CLICKHOUSE INGESTION                             │
│                                                                             │
│   ┌─────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐ │
│   │  Kafka  │────▶│ Kafka Engine│────▶│ Materialized│────▶│ MergeTree   │ │
│   │  Topic  │     │   (source)  │     │    View     │     │  (storage)  │ │
│   └─────────┘     └─────────────┘     └─────────────┘     └─────────────┘ │
│                                                                             │
│   Le pattern recommandé :                                                  │
│   1. Table Kafka Engine consomme le topic                                  │
│   2. Materialized View transforme et insère dans la table finale          │
│   3. Table MergeTree stocke les données                                    │
└─────────────────────────────────────────────────────────────────────────────┘

3.2 Configuration Complète

Voir le code
# Configuration Kafka → ClickHouse

kafka_ingestion_sql = """
-- ═══════════════════════════════════════════════════════════════
-- ÉTAPE 1 : Table de stockage final (MergeTree)
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE IF NOT EXISTS events_final
(
    event_time      DateTime64(3),
    event_date      Date DEFAULT toDate(event_time),
    user_id         String,
    event_type      LowCardinality(String),
    page            String,
    country         LowCardinality(String),
    amount          Decimal(10, 2),
    _kafka_topic    LowCardinality(String),
    _kafka_offset   UInt64,
    _inserted_at    DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_type, user_id)
TTL event_date + INTERVAL 180 DAY;

-- ═══════════════════════════════════════════════════════════════
-- ÉTAPE 2 : Table Kafka Engine (source)
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE IF NOT EXISTS events_kafka
(
    raw String
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka:29092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONAsString',
    kafka_num_consumers = 2,                    -- Parallélisme
    kafka_max_block_size = 65536,
    kafka_skip_broken_messages = 100;           -- Tolérance aux erreurs

-- ═══════════════════════════════════════════════════════════════
-- ÉTAPE 3 : Materialized View (transformation + insertion)
-- ═══════════════════════════════════════════════════════════════

CREATE MATERIALIZED VIEW IF NOT EXISTS events_kafka_mv
TO events_final
AS SELECT
    -- Parser le JSON
    parseDateTime64BestEffort(JSONExtractString(raw, 'timestamp')) AS event_time,
    JSONExtractString(raw, 'user_id') AS user_id,
    JSONExtractString(raw, 'event_type') AS event_type,
    JSONExtractString(raw, 'page') AS page,
    JSONExtractString(raw, 'country') AS country,
    toDecimal64(JSONExtractFloat(raw, 'amount'), 2) AS amount,
    _topic AS _kafka_topic,
    _offset AS _kafka_offset
FROM events_kafka;

-- ═══════════════════════════════════════════════════════════════
-- Vérifier l'ingestion
-- ═══════════════════════════════════════════════════════════════

-- Nombre d'events ingérés
SELECT count() FROM events_final;

-- Lag Kafka (derniers offsets)
SELECT 
    _kafka_topic,
    max(_kafka_offset) AS latest_offset,
    max(_inserted_at) AS last_insert
FROM events_final
GROUP BY _kafka_topic;

-- Events par minute (monitoring)
SELECT 
    toStartOfMinute(event_time) AS minute,
    count() AS events
FROM events_final
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute DESC
LIMIT 10;
"""

print(kafka_ingestion_sql)

3.3 Producer Python pour Tester

Voir le code
# Producer Kafka pour envoyer des events

producer_code = '''
from kafka import KafkaProducer
import json
import random
from datetime import datetime
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

event_types = ['page_view', 'click', 'scroll', 'purchase', 'signup']
pages = ['/home', '/products', '/cart', '/checkout', '/profile']
countries = ['FR', 'US', 'DE', 'GB', 'ES', 'IT']

def generate_event():
    event_type = random.choice(event_types)
    return {
        'timestamp': datetime.utcnow().isoformat(),
        'user_id': f'user_{random.randint(1, 1000):04d}',
        'event_type': event_type,
        'page': random.choice(pages),
        'country': random.choice(countries),
        'amount': round(random.uniform(10, 500), 2) if event_type == 'purchase' else 0
    }

# Envoyer des events en continu
print("Sending events to Kafka...")
try:
    while True:
        event = generate_event()
        producer.send('events', value=event)
        print(f"Sent: {event['event_type']} from {event['country']}")
        time.sleep(0.1)  # 10 events/sec
except KeyboardInterrupt:
    print("Stopped")
finally:
    producer.close()
'''

print("# kafka_producer.py")
print(producer_code)

4. Materialized Views pour Pré-Agrégation

4.1 Pourquoi Pré-Agréger ?

Approche Query Time Storage Flexibilité
Raw data + query à la volée Lent sur gros volumes Minimal Maximum
Materialized View Ultra-rapide Modéré Pré-défini

4.2 SummingMergeTree : Agrégation Automatique

Voir le code
# Materialized View avec SummingMergeTree

mv_summing_sql = """
-- ═══════════════════════════════════════════════════════════════
-- AGRÉGATION HORAIRE : events par type, pays, heure
-- ═══════════════════════════════════════════════════════════════

-- Table de destination (SummingMergeTree)
CREATE TABLE IF NOT EXISTS events_hourly
(
    event_hour      DateTime,
    event_type      LowCardinality(String),
    country         LowCardinality(String),
    event_count     UInt64,
    unique_users    AggregateFunction(uniq, String),  -- HyperLogLog
    total_amount    Decimal(18, 2),
    avg_amount      AggregateFunction(avg, Decimal(10, 2))
)
ENGINE = SummingMergeTree((event_count, total_amount))
PARTITION BY toYYYYMM(event_hour)
ORDER BY (event_hour, event_type, country);

-- Materialized View qui alimente la table
CREATE MATERIALIZED VIEW IF NOT EXISTS events_hourly_mv
TO events_hourly
AS SELECT
    toStartOfHour(event_time) AS event_hour,
    event_type,
    country,
    count() AS event_count,
    uniqState(user_id) AS unique_users,        -- State pour merge
    sum(amount) AS total_amount,
    avgState(amount) AS avg_amount             -- State pour merge
FROM events_final
GROUP BY event_hour, event_type, country;

-- ═══════════════════════════════════════════════════════════════
-- REQUÊTES SUR L'AGRÉGAT (ultra-rapides !)
-- ═══════════════════════════════════════════════════════════════

-- Events par heure (dernières 24h)
SELECT 
    event_hour,
    sum(event_count) AS total_events,
    uniqMerge(unique_users) AS unique_users,   -- Merge les HLL
    sum(total_amount) AS revenue
FROM events_hourly
WHERE event_hour >= now() - INTERVAL 24 HOUR
GROUP BY event_hour
ORDER BY event_hour;

-- Top pays par revenue
SELECT 
    country,
    sum(event_count) AS events,
    sum(total_amount) AS revenue,
    avgMerge(avg_amount) AS avg_order_value
FROM events_hourly
WHERE event_type = 'purchase'
  AND event_hour >= today()
GROUP BY country
ORDER BY revenue DESC;
"""

print(mv_summing_sql)
Voir le code
# Materialized View pour métriques temps réel (par minute)

mv_realtime_sql = """
-- ═══════════════════════════════════════════════════════════════
-- MÉTRIQUES TEMPS RÉEL (par minute, pour dashboards)
-- ═══════════════════════════════════════════════════════════════

CREATE TABLE IF NOT EXISTS events_minute
(
    event_minute    DateTime,
    event_type      LowCardinality(String),
    event_count     UInt64,
    unique_users    UInt64,
    total_amount    Decimal(18, 2)
)
ENGINE = SummingMergeTree((event_count, unique_users, total_amount))
ORDER BY (event_minute, event_type)
TTL event_minute + INTERVAL 7 DAY;  -- Garder 7 jours seulement

CREATE MATERIALIZED VIEW IF NOT EXISTS events_minute_mv
TO events_minute
AS SELECT
    toStartOfMinute(event_time) AS event_minute,
    event_type,
    count() AS event_count,
    uniq(user_id) AS unique_users,
    sum(amount) AS total_amount
FROM events_final
GROUP BY event_minute, event_type;

-- ═══════════════════════════════════════════════════════════════
-- QUERIES POUR DASHBOARD TEMPS RÉEL
-- ═══════════════════════════════════════════════════════════════

-- Dernières 5 minutes (refresh toutes les 5 sec)
SELECT 
    event_minute,
    sum(event_count) AS events,
    sum(unique_users) AS users,
    sum(total_amount) AS revenue
FROM events_minute
WHERE event_minute >= now() - INTERVAL 5 MINUTE
GROUP BY event_minute
ORDER BY event_minute;

-- Events par seconde (approximation)
SELECT 
    sum(event_count) / 60 AS events_per_second
FROM events_minute
WHERE event_minute >= now() - INTERVAL 1 MINUTE;

-- Comparaison vs même heure hier
SELECT 
    'today' AS period,
    sum(event_count) AS events,
    sum(total_amount) AS revenue
FROM events_minute
WHERE event_minute >= toStartOfHour(now())

UNION ALL

SELECT 
    'yesterday' AS period,
    sum(event_count) AS events,
    sum(total_amount) AS revenue
FROM events_minute
WHERE event_minute >= toStartOfHour(now() - INTERVAL 1 DAY)
  AND event_minute < toStartOfHour(now() - INTERVAL 1 DAY) + INTERVAL 1 HOUR;
"""

print(mv_realtime_sql)

5. Alternatives : Apache Druid & Pinot

5.1 Apache Druid

Druid est un OLAP engine optimisé pour les time-series et le real-time.

┌─────────────────────────────────────────────────────────────────────────────┐
│                    APACHE DRUID ARCHITECTURE                                │
│                                                                             │
│   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐                      │
│   │   Kafka     │──▶│   Middle    │──▶│   Historical│                      │
│   │   (stream)  │   │   Manager   │   │   (segments)│                      │
│   └─────────────┘   └─────────────┘   └─────────────┘                      │
│                                              │                              │
│   ┌─────────────┐                            │                              │
│   │   Batch     │────────────────────────────┘                              │
│   │   (HDFS/S3) │                                                           │
│   └─────────────┘                                                           │
│                                              │                              │
│                           ┌─────────────────┐│                              │
│                           │     Broker      │◀──────── Queries              │
│                           │   (scatter/gather)                              │
│                           └─────────────────┘                               │
└─────────────────────────────────────────────────────────────────────────────┘

Caractéristiques Druid : - Columnar storage avec compression - Ingestion real-time ET batch - Roll-up automatique (pré-agrégation à l’ingestion) - Optimisé pour GROUP BY sur time-series

Quand utiliser Druid : - Time-series analytics (monitoring, IoT) - Très hauts volumes (trillions de rows) - Besoin de roll-up à l’ingestion

5.2 Apache Pinot

Pinot est un OLAP engine créé par LinkedIn, optimisé pour les user-facing analytics.

Caractéristiques Pinot : - Latence ultra-basse (<100ms P99) - Optimisé pour queries concurrentes (1000+ QPS) - Star-tree index pour agrégations pré-calculées - Upsert support (contrairement à Druid)

Quand utiliser Pinot : - Analytics user-facing (dashboards clients) - Très haute concurrence - Besoin d’upserts

5.3 Comparaison ClickHouse vs Druid vs Pinot

Feature ClickHouse Druid Pinot
Type OLAP DB Time-series OLAP User-facing OLAP
SQL Full SQL Druid SQL (limité) PQL + SQL
Latency ~10-100ms ~100-500ms ~10-50ms
Concurrency ~100 ~100 ~1000+
Upserts Oui (ReplacingMergeTree) Non Oui
Joins Oui Limité Limité
Complexity Simple Complexe Medium
Best for General analytics Time-series User-facing

5.4 Recommandations

┌─────────────────────────────────────────────────────────────────────────────┐
│                    QUEL OLAP CHOISIR ?                                      │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │  Tu veux de l'analytics interne (dashboards, ad-hoc) ?              │  │
│   │  → ClickHouse (simple, SQL complet, flexible)                       │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │  Tu as des time-series à très haut volume avec roll-up ?            │  │
│   │  → Druid (optimisé pour ça, mais plus complexe)                     │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │  Tu as des dashboards user-facing avec 1000+ users concurrents ?    │  │
│   │  → Pinot (conçu pour ça, latence garantie)                          │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│   Dans le doute : commence par ClickHouse (plus simple à opérer)           │
└─────────────────────────────────────────────────────────────────────────────┘

6. Real-Time Dashboards

6.1 Architecture Dashboard Temps Réel

┌─────────────────────────────────────────────────────────────────────────────┐
│                    REAL-TIME DASHBOARD ARCHITECTURE                         │
│                                                                             │
│   Option 1: PULL (Polling)                                                  │
│   ─────────────────────────                                                 │
│                                                                             │
│   Dashboard ──(every 5s)──▶ ClickHouse ──▶ Response                        │
│                                                                             │
│   ✅ Simple                                                                 │
│   ❌ Latence = intervalle de refresh                                        │
│   ❌ Charge DB si beaucoup de clients                                       │
│                                                                             │
│   Option 2: PUSH (WebSocket/SSE)                                            │
│   ────────────────────────────                                              │
│                                                                             │
│   Kafka ──▶ Stream Processor ──▶ WebSocket ──▶ Dashboard                   │
│                                                                             │
│   ✅ Latence minimale                                                       │
│   ✅ Efficient (pas de polling)                                             │
│   ❌ Plus complexe à implémenter                                            │
│                                                                             │
│   Option 3: HYBRIDE (recommandé)                                            │
│   ────────────────────────────                                              │
│                                                                             │
│   • Données historiques : Query ClickHouse                                 │
│   • Métriques live : WebSocket depuis Kafka                                │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 Grafana + ClickHouse

Grafana est l’outil le plus populaire pour les dashboards temps réel.

Configuration du Data Source

# grafana/provisioning/datasources/clickhouse.yaml
apiVersion: 1

datasources:
  - name: ClickHouse
    type: grafana-clickhouse-datasource
    access: proxy
    url: http://clickhouse:8123
    jsonData:
      defaultDatabase: default
      port: 9000
      server: clickhouse
      username: default
      tlsSkipVerify: true
    secureJsonData:
      password: ""

Exemples de Queries pour Panels

Voir le code
# Queries Grafana pour ClickHouse

grafana_queries = """
-- ═══════════════════════════════════════════════════════════════
-- PANEL 1 : Time Series - Events par minute
-- ═══════════════════════════════════════════════════════════════

SELECT 
    $__timeInterval(event_minute) AS time,
    sum(event_count) AS events
FROM events_minute
WHERE $__timeFilter(event_minute)
GROUP BY time
ORDER BY time;

-- ═══════════════════════════════════════════════════════════════
-- PANEL 2 : Stat - Total events (dernière heure)
-- ═══════════════════════════════════════════════════════════════

SELECT sum(event_count) AS total_events
FROM events_minute
WHERE event_minute >= now() - INTERVAL 1 HOUR;

-- ═══════════════════════════════════════════════════════════════
-- PANEL 3 : Pie Chart - Events par type
-- ═══════════════════════════════════════════════════════════════

SELECT 
    event_type,
    sum(event_count) AS count
FROM events_minute
WHERE event_minute >= now() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY count DESC;

-- ═══════════════════════════════════════════════════════════════
-- PANEL 4 : Bar Chart - Top 10 pays par revenue
-- ═══════════════════════════════════════════════════════════════

SELECT 
    country,
    sum(total_amount) AS revenue
FROM events_hourly
WHERE event_hour >= today()
  AND event_type = 'purchase'
GROUP BY country
ORDER BY revenue DESC
LIMIT 10;

-- ═══════════════════════════════════════════════════════════════
-- PANEL 5 : Gauge - Conversion rate (temps réel)
-- ═══════════════════════════════════════════════════════════════

SELECT 
    round(
        sumIf(event_count, event_type = 'purchase') / 
        sumIf(event_count, event_type = 'page_view') * 100, 
        2
    ) AS conversion_rate
FROM events_minute
WHERE event_minute >= now() - INTERVAL 1 HOUR;

-- ═══════════════════════════════════════════════════════════════
-- PANEL 6 : Table - Derniers events (live)
-- ═══════════════════════════════════════════════════════════════

SELECT 
    event_time,
    user_id,
    event_type,
    country,
    amount
FROM events_final
WHERE event_time >= now() - INTERVAL 5 MINUTE
ORDER BY event_time DESC
LIMIT 100;
"""

print(grafana_queries)

6.3 Dashboard JSON (Import dans Grafana)

Voir le code
import json

grafana_dashboard = {
    "dashboard": {
        "title": "Real-Time Analytics",
        "tags": ["clickhouse", "realtime"],
        "timezone": "browser",
        "refresh": "5s",  # Auto-refresh toutes les 5 secondes
        "panels": [
            {
                "id": 1,
                "title": "Events per Minute",
                "type": "timeseries",
                "gridPos": {"x": 0, "y": 0, "w": 12, "h": 8},
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": "SELECT $__timeInterval(event_minute) AS time, sum(event_count) AS events FROM events_minute WHERE $__timeFilter(event_minute) GROUP BY time ORDER BY time",
                    "format": "time_series"
                }]
            },
            {
                "id": 2,
                "title": "Total Events (1h)",
                "type": "stat",
                "gridPos": {"x": 12, "y": 0, "w": 4, "h": 4},
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": "SELECT sum(event_count) AS total FROM events_minute WHERE event_minute >= now() - INTERVAL 1 HOUR",
                    "format": "table"
                }],
                "options": {
                    "colorMode": "value",
                    "graphMode": "none"
                }
            },
            {
                "id": 3,
                "title": "Revenue (1h)",
                "type": "stat",
                "gridPos": {"x": 16, "y": 0, "w": 4, "h": 4},
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": "SELECT sum(total_amount) AS revenue FROM events_minute WHERE event_minute >= now() - INTERVAL 1 HOUR",
                    "format": "table"
                }],
                "options": {
                    "colorMode": "value"
                },
                "fieldConfig": {
                    "defaults": {
                        "unit": "currencyEUR"
                    }
                }
            },
            {
                "id": 4,
                "title": "Events by Type",
                "type": "piechart",
                "gridPos": {"x": 12, "y": 4, "w": 8, "h": 8},
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": "SELECT event_type, sum(event_count) AS count FROM events_minute WHERE event_minute >= now() - INTERVAL 1 HOUR GROUP BY event_type",
                    "format": "table"
                }]
            },
            {
                "id": 5,
                "title": "Top Countries by Revenue",
                "type": "barchart",
                "gridPos": {"x": 0, "y": 8, "w": 12, "h": 8},
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": "SELECT country, sum(total_amount) AS revenue FROM events_hourly WHERE event_hour >= today() AND event_type = 'purchase' GROUP BY country ORDER BY revenue DESC LIMIT 10",
                    "format": "table"
                }]
            },
            {
                "id": 6,
                "title": "Live Events",
                "type": "table",
                "gridPos": {"x": 12, "y": 12, "w": 12, "h": 8},
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": "SELECT event_time, user_id, event_type, country, amount FROM events_final WHERE event_time >= now() - INTERVAL 5 MINUTE ORDER BY event_time DESC LIMIT 50",
                    "format": "table"
                }]
            }
        ]
    },
    "overwrite": True
}

print("📊 Grafana Dashboard JSON:")
print(json.dumps(grafana_dashboard, indent=2)[:2000] + "...")

6.4 Apache Superset (Alternative)

Apache Superset est une alternative open-source à Grafana, plus orientée BI.

# Docker Compose pour Superset
docker run -d -p 8088:8088 \
    --name superset \
    -e SUPERSET_SECRET_KEY='your-secret-key' \
    apache/superset

# Setup initial
docker exec -it superset superset fab create-admin \
    --username admin \
    --firstname Admin \
    --lastname User \
    --email admin@example.com \
    --password admin

docker exec -it superset superset db upgrade
docker exec -it superset superset init

# Accès : http://localhost:8088

Connexion ClickHouse dans Superset :

clickhousedb://default:@clickhouse:8123/default

6.5 Comparaison Grafana vs Superset

Feature Grafana Superset
Focus Monitoring, time-series BI, exploration
Refresh Excellent (auto, push) Bon (polling)
SQL Editor Basique Excellent
Exploration Limitée Très bonne
Alerting Intégré Via plugin
Best for Ops dashboards Business analytics

7. Patterns et Best Practices

7.1 Pre-Aggregation Patterns

┌─────────────────────────────────────────────────────────────────────────────┐
│                    PRE-AGGREGATION PATTERNS                                 │
│                                                                             │
│   PATTERN 1 : Multi-Level Aggregation                                       │
│   ──────────────────────────────────                                        │
│                                                                             │
│   Raw Events ──▶ Per-Minute ──▶ Per-Hour ──▶ Per-Day                       │
│   (détail)       (7 jours)      (90 jours)   (1+ an)                       │
│                                                                             │
│   → Queries rapides à chaque niveau                                        │
│   → Retention différente selon granularité                                 │
│                                                                             │
│   PATTERN 2 : Dimension-Specific Tables                                     │
│   ─────────────────────────────────────                                     │
│                                                                             │
│   events_by_country  (agrégé par pays)                                     │
│   events_by_product  (agrégé par produit)                                  │
│   events_by_user     (agrégé par user)                                     │
│                                                                             │
│   → Ultra-rapide pour les dimensions connues                               │
│   → Moins flexible pour ad-hoc                                             │
└─────────────────────────────────────────────────────────────────────────────┘

7.2 Tiered Storage

-- ClickHouse : Storage policies
CREATE TABLE events_tiered
(
    event_time DateTime,
    event_type String,
    data String
)
ENGINE = MergeTree()
ORDER BY event_time
TTL 
    event_time + INTERVAL 7 DAY TO VOLUME 'hot',     -- SSD
    event_time + INTERVAL 30 DAY TO VOLUME 'warm',   -- HDD
    event_time + INTERVAL 365 DAY TO VOLUME 'cold';  -- S3

7.3 Retention Policies

-- TTL pour auto-delete
ALTER TABLE events_minute
MODIFY TTL event_minute + INTERVAL 7 DAY;

-- Voir l'espace utilisé
SELECT 
    table,
    formatReadableSize(sum(bytes_on_disk)) AS size,
    sum(rows) AS rows
FROM system.parts
WHERE active
GROUP BY table
ORDER BY sum(bytes_on_disk) DESC;

7.4 Monitoring des Pipelines

-- Lag d'ingestion (Kafka offset vs current)
SELECT 
    max(_kafka_offset) AS latest_offset,
    max(event_time) AS latest_event,
    dateDiff('second', max(event_time), now()) AS lag_seconds
FROM events_final;

-- Throughput (events/sec)
SELECT 
    toStartOfMinute(event_time) AS minute,
    count() / 60 AS events_per_second
FROM events_final
WHERE event_time >= now() - INTERVAL 10 MINUTE
GROUP BY minute
ORDER BY minute DESC;

-- Alerter si lag > 5 minutes
SELECT 
    CASE 
        WHEN dateDiff('minute', max(event_time), now()) > 5 
        THEN 'ALERT: Ingestion lag > 5 min!'
        ELSE 'OK'
    END AS status
FROM events_final;

7.5 Cost Optimization

Technique Impact Implémentation
Compression -80% storage LZ4 (défaut) ou ZSTD
TTL Limite le volume TTL date + INTERVAL X DAY
Partitioning Pruning efficace PARTITION BY toYYYYMM(date)
Materialized Views -90% query cost Pré-agrégation
LowCardinality -50% pour enums LowCardinality(String)

8. Client Python pour ClickHouse

Voir le code
# Client Python pour ClickHouse

python_client_code = '''
# pip install clickhouse-connect

import clickhouse_connect
from datetime import datetime, timedelta

# ═══════════════════════════════════════════════════════════════
# CONNEXION
# ═══════════════════════════════════════════════════════════════

client = clickhouse_connect.get_client(
    host='localhost',
    port=8123,
    username='default',
    password=''
)

# ═══════════════════════════════════════════════════════════════
# QUERIES
# ═══════════════════════════════════════════════════════════════

# Query simple
result = client.query("SELECT count() FROM events_final")
print(f"Total events: {result.result_rows[0][0]}")

# Query avec paramètres
result = client.query(
    "SELECT event_type, count() FROM events_final "
    "WHERE event_time >= {start:DateTime} "
    "GROUP BY event_type",
    parameters={"start": datetime.now() - timedelta(hours=1)}
)

for row in result.result_rows:
    print(f"{row[0]}: {row[1]}")

# Query vers DataFrame
df = client.query_df(
    "SELECT toStartOfMinute(event_time) AS minute, count() AS events "
    "FROM events_final "
    "WHERE event_time >= now() - INTERVAL 1 HOUR "
    "GROUP BY minute ORDER BY minute"
)
print(df.head())

# ═══════════════════════════════════════════════════════════════
# INSERT
# ═══════════════════════════════════════════════════════════════

# Insert batch
data = [
    [datetime.now(), "user_001", "page_view", "FR", 0],
    [datetime.now(), "user_002", "click", "US", 0],
    [datetime.now(), "user_003", "purchase", "DE", 99.99],
]

client.insert(
    "events_final",
    data,
    column_names=["event_time", "user_id", "event_type", "country", "amount"]
)

# Insert depuis DataFrame
import pandas as pd

df = pd.DataFrame({
    "event_time": [datetime.now()] * 100,
    "user_id": [f"user_{i:04d}" for i in range(100)],
    "event_type": ["page_view"] * 100,
    "country": ["FR"] * 100,
    "amount": [0.0] * 100
})

client.insert_df("events_final", df)

# ═══════════════════════════════════════════════════════════════
# ASYNC (pour haute performance)
# ═══════════════════════════════════════════════════════════════

import asyncio
import clickhouse_connect.driver.asyncclient as async_client

async def async_queries():
    client = await async_client.create_async_client(host="localhost")
    
    result = await client.query("SELECT count() FROM events_final")
    print(f"Async count: {result.result_rows[0][0]}")
    
    await client.close()

# asyncio.run(async_queries())
'''

print(python_client_code)

9. Exercices Pratiques

Exercice 1 : Setup ClickHouse + Kafka Ingestion

  1. Démarrer ClickHouse et Kafka avec Docker Compose
  2. Créer un topic Kafka events
  3. Créer la table events_final (MergeTree)
  4. Créer la table Kafka Engine + Materialized View
  5. Envoyer des events avec le producer Python
  6. Vérifier l’ingestion dans ClickHouse

Exercice 2 : Materialized Views Multi-Niveaux

Créer 3 niveaux d’agrégation : - events_minute : TTL 7 jours - events_hourly : TTL 90 jours - events_daily : TTL 2 ans

Vérifier que les queries sur chaque niveau sont rapides.


Exercice 3 : Dashboard Grafana

Créer un dashboard avec : - Time series : events par minute - Stats : total events, revenue, unique users - Pie chart : events par type - Table : derniers events live

Configurer auto-refresh à 5 secondes.


Exercice 4 : Alerting

Créer des alertes Grafana pour : - Lag d’ingestion > 5 minutes - Events/sec < 10 (drop de trafic) - Revenue = 0 depuis 30 minutes


Exercice 5 : Benchmark

  1. Générer 10 millions d’events
  2. Comparer les temps de query sur :
    • Table raw (MergeTree)
    • Materialized View minute
    • Materialized View horaire
  3. Documenter les gains de performance

10. Mini-Projet : Real-Time Analytics Platform

Objectif

Construire une plateforme d’analytics temps réel complète.

┌─────────────────────────────────────────────────────────────────────────────┐
│                   MINI-PROJET : REAL-TIME ANALYTICS                         │
│                                                                             │
│   ┌─────────────┐     ┌─────────┐     ┌─────────────┐                      │
│   │   Event     │────▶│  Kafka  │────▶│ ClickHouse  │                      │
│   │  Generator  │     │  Topic  │     │             │                      │
│   │  (Python)   │     │         │     │ ┌─────────┐ │     ┌─────────────┐ │
│   └─────────────┘     └─────────┘     │ │  Raw    │ │────▶│   Grafana   │ │
│                                       │ │  Table  │ │     │  Dashboard  │ │
│   Throughput:                         │ └────┬────┘ │     │             │ │
│   1000 events/sec                     │      │      │     │  • Live     │ │
│                                       │ ┌────▼────┐ │     │  • Refresh  │ │
│                                       │ │  MVs    │ │     │    5 sec    │ │
│                                       │ │ min/hr  │ │     │             │ │
│                                       │ └─────────┘ │     └─────────────┘ │
│                                       └─────────────┘                      │
│                                                                             │
│   Métriques à afficher :                                                   │
│   • Events/minute (time series)                                            │
│   • Revenue temps réel                                                     │
│   • Top pays, Top produits                                                 │
│   • Conversion funnel                                                      │
│   • Alertes si anomalie                                                    │
└─────────────────────────────────────────────────────────────────────────────┘

Livrables

  1. docker-compose.yaml : Kafka + ClickHouse + Grafana
  2. Event Generator : Python script générant 1000 events/sec
  3. ClickHouse Schema : Tables + Materialized Views
  4. Grafana Dashboard : 6+ panels avec auto-refresh
  5. Alerting : 3+ alertes configurées
  6. Documentation : README avec architecture et setup

Structure du Projet

realtime-analytics/
├── docker-compose.yaml
├── clickhouse/
│   └── init.sql                # Schema + MVs
├── producer/
│   ├── requirements.txt
│   └── event_generator.py
├── grafana/
│   ├── provisioning/
│   │   ├── datasources/
│   │   │   └── clickhouse.yaml
│   │   └── dashboards/
│   │       └── realtime.json
│   └── dashboards/
│       └── realtime_analytics.json
└── README.md

Critères de Succès


📚 Ressources

Documentation

Articles

Tutoriels


➡️ Prochaine étape

👉 Module suivant : 34_cloud_data_platform — Cloud Data Platforms (AWS/GCP/Azure)


🎉 Félicitations ! Tu maîtrises maintenant les OLAP engines et les dashboards temps réel.

Retour au sommet