⚙️ MLOps · Avancé

Pipeline MLOps de recommandation d'articles

⏱ 90 minutes🐍 Python 3.11🧪 MLflow 2.x🌬 Airflow 2.9☸️ Kubernetes🏪 Feast

Ce cours construit un pipeline MLOps end-to-end pour recommander des articles sur une plateforme web (type blog, e-commerce éditorial ou média). On passe d'un notebook expérimental à un service de recommandation déployé, versionné, monitoré, avec re-entraînement automatique : ingestion, feature store, expérimentations MLflow, orchestration Airflow, conteneurisation Docker, serving FastAPI sur Kubernetes, monitoring de drift et boucle de feedback utilisateur.

1. Problème métier et architecture MLOps

📖 Terme : MLOps

Définition : MLOps (Machine Learning Operations) est l'ensemble des pratiques qui industrialisent le cycle de vie d'un modèle ML : versioning des données et du code, expérimentations reproductibles, déploiement continu, monitoring en production et re-entraînement automatique. C'est l'équivalent du DevOps, mais appliqué aux systèmes de Machine Learning.

But : Faire passer un modèle du notebook à la production de manière fiable, traçable et automatisée, puis maintenir sa qualité dans le temps.

Pourquoi ici : Un système de recommandation en production n'est pas un modèle figé : les contenus changent, les utilisateurs évoluent, la distribution des données dérive. Sans MLOps, les recommandations se dégradent silencieusement en quelques semaines. MLOps apporte la rigueur nécessaire pour maintenir la qualité dans la durée.

Dans une plateforme web d'articles (blog, média, e-commerce éditorial), le taux de clic (CTR) sur les recommandations est un KPI business direct. Un modèle qui dérive de 30% coûte directement en engagement et en revenus publicitaires. MLOps permet de détecter la dégradation avant que les métriques business ne chutent.

architecture.txt
┌─────────────────────────────────────────────────────────────────────┐
│                    PLATEFORME WEB (articles)                        │
│  ┌────────────┐   ┌──────────────┐   ┌────────────────────────────┐ │
│  │ Front React│──►│ API Gateway  │──►│ /recommend (FastAPI + K8s) │ │
│  └────────────┘   └──────────────┘   └─────────────┬──────────────┘ │
│        │                                            │                │
│        │ events (clic, vue, durée)                  │ features       │
│        ▼                                            ▼                │
│  ┌──────────────┐       ┌─────────────────┐   ┌──────────────────┐  │
│  │ Kafka / Pub- │──────►│ Data Lake (GCS) │◄──│ Feature Store    │  │
│  │ Sub Events   │       │ bronze/silver   │   │ (Feast + Redis)  │  │
│  └──────────────┘       └────────┬────────┘   └──────────────────┘  │
│                                  │                     ▲             │
│                                  ▼                     │             │
│                       ┌────────────────────┐           │             │
│                       │ DVC / data version │───────────┘             │
│                       └─────────┬──────────┘                         │
│                                 │                                    │
│   ┌─────────────────────────────▼────────────────────────────────┐   │
│   │ AIRFLOW / Kubeflow — orchestration (DAG journalier)          │   │
│   │   ingest → features → train (MLflow) → eval → register       │   │
│   │           → canary deploy → monitor                          │   │
│   └──────────────────────────────────────────────────────────────┘   │
│                                 │                                    │
│                                 ▼                                    │
│                       ┌────────────────────┐                         │
│                       │ MLflow Registry    │──► image Docker ──►K8s │
│                       │ (staging / prod)   │                         │
│                       └────────────────────┘                         │
└─────────────────────────────────────────────────────────────────────┘
Le pipeline est divisé en 4 couches : (1) data layer — les événements utilisateurs (clics, lectures, scroll) sont collectés via Kafka/Pub-Sub et stockés en lake ; (2) feature layer — Feast calcule les features en offline (batch) et les sert en online (Redis, <10 ms) ; (3) training layer — Airflow orchestre le DAG quotidien, MLflow tracke expérimentations et registre des modèles ; (4) serving layer — une API FastAPI sur Kubernetes avec autoscaling sert les recommandations avec un déploiement canary contrôlé.

2. Structure du projet et versioning

📖 Terme : DVC (Data Version Control)

Définition : DVC est un outil open-source qui étend Git pour versionner fichiers de données, modèles et pipelines trop lourds pour Git. Les fichiers binaires restent sur un stockage distant (S3, GCS), Git ne trace que des métadonnées légères (.dvc).

But : Reproduire exactement un entraînement en pointant un commit Git + un état des données.

Pourquoi ici : Sans DVC, on ne peut pas répondre à la question "avec quelles données exactes ce modèle de prod a-t-il été entraîné ?" — question indispensable pour l'audit, le debug et la conformité RGPD.

arborescence.txt
reco-articles/
├── .github/workflows/ci-cd.yml          # CI/CD GitHub Actions
├── airflow/dags/reco_daily.py           # DAG d'orchestration
├── data/                                # versionné DVC, pas Git
│   ├── raw/events.parquet.dvc
│   └── processed/features.parquet.dvc
├── feature_repo/                        # Feast feature store
│   ├── feature_store.yaml
│   └── features.py
├── src/
│   ├── ingestion/kafka_consumer.py
│   ├── features/build_features.py
│   ├── training/train.py                # MLflow
│   ├── serving/app.py                   # FastAPI
│   └── monitoring/drift.py              # Evidently
├── k8s/
│   ├── deployment.yaml
│   ├── service.yaml
│   └── hpa.yaml                         # Horizontal Pod Autoscaler
├── Dockerfile
├── dvc.yaml                             # pipeline DVC reproductible
├── mlflow.yaml
├── requirements.txt
└── README.md
dvc.yaml
stages:
  ingest:
    cmd: python src/ingestion/extract.py
    deps: [src/ingestion/extract.py]
    outs: [data/raw/events.parquet]

  features:
    cmd: python src/features/build_features.py
    deps:
      - data/raw/events.parquet
      - src/features/build_features.py
    outs: [data/processed/features.parquet]

  train:
    cmd: python src/training/train.py
    deps:
      - data/processed/features.parquet
      - src/training/train.py
    params: [train.lr, train.n_factors, train.epochs]
    outs: [models/reco_model.pkl]
    metrics: [metrics/train_metrics.json]
DVC définit un pipeline reproductible via dvc.yaml. Chaque étape a ses dépendances (code + données d'entrée), ses sorties et ses paramètres. dvc repro ne re-exécute que les étapes dont les dépendances ont changé — un peu comme make mais qui comprend les données. Combiné à Git + un remote S3/GCS, on obtient un contrat exact entre code, données, params et modèle produit.

3. Feature Store : calcul offline et serving online

📖 Terme : Feature Store

Définition : Un Feature Store est une base spécialisée qui stocke, versionne et sert les features ML. Il expose les mêmes features en offline (pour l'entraînement, via batch SQL) et en online (pour l'inférence, via Redis avec latence < 10 ms).

But : Éliminer le train-serving skew (écart entre features d'entraînement et features de prod) en garantissant un calcul unique.

Pourquoi ici : Dans un système de reco, 40% des bugs de production viennent d'un écart train/serving (ex. l'heure est en UTC à l'entraînement et en local en prod). Un feature store définit la feature une seule fois, les deux environnements consomment la même définition.

feature_repo/features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64, String

# ── Entités (clés primaires des features) ──
user = Entity(name="user_id", value_type=ValueType.STRING)
article = Entity(name="article_id", value_type=ValueType.STRING)

# ── Source de données (batch, parquet sur GCS) ──
user_stats_src = FileSource(
    path="gs://reco-data/features/user_stats.parquet",
    timestamp_field="event_ts",
)

# ── Feature View : features utilisateur ──
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=7),        # expiration online
    schema=[
        Field(name="clicks_7d", dtype=Int64),
        Field(name="read_ratio_7d", dtype=Float32),
        Field(name="avg_session_len_s", dtype=Float32),
        Field(name="top_category", dtype=String),
    ],
    source=user_stats_src,
    online=True,                      # servi via Redis à l'inférence
)

# ── Feature View : popularité article ──
article_stats_src = FileSource(
    path="gs://reco-data/features/article_stats.parquet",
    timestamp_field="event_ts",
)

article_stats_fv = FeatureView(
    name="article_stats",
    entities=[article],
    ttl=timedelta(days=30),
    schema=[
        Field(name="views_24h", dtype=Int64),
        Field(name="ctr_24h", dtype=Float32),
        Field(name="avg_read_time_s", dtype=Float32),
        Field(name="category", dtype=String),
    ],
    source=article_stats_src,
    online=True,
)
Ce fichier déclare deux FeatureViews : une pour l'utilisateur (comportement sur 7j) et une pour l'article (popularité sur 24h). Feast matérialise périodiquement les features du lake vers Redis (store online). À l'entraînement, on interroge l'offline store en point-in-time correct (on récupère les features telles qu'elles étaient au moment de l'événement — crucial pour éviter la fuite temporelle). À l'inférence, l'API récupère les mêmes features en < 10 ms via Redis.
📖 Terme : Point-in-time join

Définition : Jointure qui reconstitue l'état des features tel qu'il était à un horodatage passé — par exemple, les features de l'utilisateur à l'instant T où il a cliqué sur un article, et pas ses features d'aujourd'hui.

But : Éviter la fuite temporelle (data leakage) qui rendrait l'entraînement trop optimiste.

Pourquoi ici : Si on entraîne avec les features actuelles sur un événement vieux de 6 mois, le modèle "voit" le futur — en prod, ses prédictions s'effondrent. Feast gère automatiquement ce join temporel via get_historical_features.

4. Entraînement et tracking MLflow

📖 Terme : MLflow Tracking & Registry

Définition : MLflow est une plateforme open-source qui enregistre chaque run d'entraînement (params, métriques, artefacts) et publie les modèles dans un registre avec des stages (None → Staging → Production → Archived).

But : Traçabilité totale des expérimentations et promotion formelle d'un modèle vers la production.

Pourquoi ici : En reco, on teste des dizaines de variantes (matrix factorization, LightFM, two-tower, transformers). Sans tracking, on perd la trace des hyperparams qui ont donné le meilleur NDCG. Le registry évite aussi qu'un data scientist "pousse son .pkl" à la main en prod.

src/training/train.py
import mlflow
import mlflow.sklearn
from feast import FeatureStore
from implicit.als import AlternatingLeastSquares
from sklearn.metrics import ndcg_score
import pandas as pd
import scipy.sparse as sp
import numpy as np

mlflow.set_tracking_uri("http://mlflow.ml-platform.svc:5000")
mlflow.set_experiment("reco-articles")

# ── 1. Récupérer les features depuis Feast (offline) ──
store = FeatureStore(repo_path="feature_repo/")
entity_df = pd.read_parquet("data/raw/clicks.parquet")  # user_id, article_id, event_ts, label

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:clicks_7d",
        "user_stats:read_ratio_7d",
        "user_stats:top_category",
        "article_stats:ctr_24h",
        "article_stats:avg_read_time_s",
        "article_stats:category",
    ],
).to_df()

# ── 2. Matrice interaction user × article (implicit feedback) ──
user_idx = {u: i for i, u in enumerate(training_df["user_id"].unique())}
art_idx  = {a: i for i, a in enumerate(training_df["article_id"].unique())}

rows = training_df["user_id"].map(user_idx)
cols = training_df["article_id"].map(art_idx)
vals = training_df["label"].astype("float32")  # 1 = clic, 0 = skip
matrix = sp.coo_matrix((vals, (rows, cols))).tocsr()

# ── 3. Entraînement ALS avec tracking MLflow ──
with mlflow.start_run(run_name="als_v2") as run:
    params = {"factors": 128, "regularization": 0.01, "iterations": 30}
    mlflow.log_params(params)

    model = AlternatingLeastSquares(**params)
    model.fit(matrix)

    # ── 4. Évaluation hors-ligne (NDCG@10, Recall@20) ──
    test_df = pd.read_parquet("data/raw/clicks_test.parquet")
    y_true, y_score = score_test_set(model, test_df, user_idx, art_idx)

    ndcg10 = ndcg_score(y_true, y_score, k=10)
    recall20 = recall_at_k(y_true, y_score, k=20)

    mlflow.log_metrics({"ndcg@10": ndcg10, "recall@20": recall20})
    mlflow.sklearn.log_model(model, artifact_path="model",
                             registered_model_name="reco-articles")

    # ── 5. Gate qualité : on ne promeut que si le modèle bat la baseline ──
    baseline_ndcg = load_baseline_metric("ndcg@10")
    if ndcg10 > baseline_ndcg * 1.01:   # +1% requis
        client = mlflow.tracking.MlflowClient()
        client.transition_model_version_stage(
            name="reco-articles",
            version=run.info.run_id,
            stage="Staging",
        )
        print(f"✅ Promu en Staging (NDCG@10 = {ndcg10:.4f})")
    else:
        print(f"❌ Pas de promotion (NDCG@10 = {ndcg10:.4f} vs baseline {baseline_ndcg:.4f})")
Ce script illustre le cœur MLOps : (1) les features sont servies depuis Feast avec point-in-time correctness, (2) le run est versionné dans MLflow avec ses params, métriques et le modèle sérialisé comme artefact, (3) un quality gate automatique ne promeut le modèle en Staging que s'il dépasse la baseline de +1%. Plus de "tu te souviens avec quels params on avait eu 0.84 en juillet ?" — tout est dans MLflow.
📖 Terme : NDCG@k (Normalized Discounted Cumulative Gain)

Définition : Métrique standard d'évaluation de recommandations : mesure la qualité du ranking des k premiers items recommandés, en pondérant davantage les bonnes prédictions en tête de liste.

But : Valoriser les recommandations pertinentes placées en haut (où l'utilisateur clique en pratique).

Pourquoi ici : L'accuracy classique est inutile en reco (il y a 10 000 articles, tu en recommandes 10 — 99,9% "d'accuracy" sur les articles non-recommandés n'a aucun sens). NDCG@10 évalue directement ce qui compte : est-ce que les 10 meilleures suggestions sont pertinentes, et dans le bon ordre ?

5. Orchestration Airflow

📖 Terme : DAG Airflow

Définition : Un DAG (Directed Acyclic Graph) est une description Python des étapes d'un pipeline Airflow, de leurs dépendances et de leur planification. Airflow exécute les tâches dans le bon ordre, retry en cas d'échec, et expose une UI de monitoring.

But : Automatiser l'exécution du pipeline avec planification, retry, alerting et observabilité.

Pourquoi ici : Un pipeline de reco se ré-entraîne tous les jours à 3h du matin. Sans orchestrateur, c'est un cron fragile sans logs centralisés. Airflow donne retry automatique, SLAs, alerting Slack et backfill historique en une ligne.

airflow/dags/reco_daily.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

default_args = {
    "owner": "alderi",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "sla": timedelta(hours=2),
}

with DAG(
    dag_id="reco_articles_daily",
    default_args=default_args,
    description="Pipeline MLOps reco — quotidien 03h UTC",
    schedule="0 3 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["mlops", "reco"],
) as dag:

    ingest = BashOperator(
        task_id="ingest_events",
        bash_command="python /opt/src/ingestion/extract.py --date {{ ds }}",
    )

    features = BashOperator(
        task_id="materialize_features",
        bash_command="cd /opt/feature_repo && feast materialize-incremental {{ ds }}",
    )

    train = KubernetesPodOperator(
        task_id="train_model",
        name="reco-train",
        image="gcr.io/alderi/reco-train:latest",
        cmds=["python", "/opt/src/training/train.py"],
        resources={"request_memory": "8Gi", "limit_cpu": "4", "limit_gpu": 1},
        is_delete_operator_pod=True,
    )

    evaluate = PythonOperator(
        task_id="evaluate_model",
        python_callable=evaluate_against_baseline,
    )

    deploy = PythonOperator(
        task_id="canary_deploy",
        python_callable=canary_rollout,     # 5% du trafic sur v+1
        trigger_rule="all_success",
    )

    monitor = PythonOperator(
        task_id="monitor_drift",
        python_callable=check_data_drift,
    )

    ingest >> features >> train >> evaluate >> deploy >> monitor
Ce DAG orchestre 6 étapes séquentielles à 03h UTC chaque jour. L'étape train utilise KubernetesPodOperator pour lancer l'entraînement dans un pod dédié avec GPU — Airflow reste léger, le gros calcul est isolé. Chaque tâche a 2 retries automatiques, une alerte email en cas d'échec et un SLA de 2h pour l'ensemble. Le canary_deploy ne route que 5% du trafic vers le nouveau modèle — si les métriques business dérivent, on annule automatiquement.

6. Déploiement : Docker + Kubernetes

src/serving/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import mlflow.pyfunc
import numpy as np
import os, time, logging

app = FastAPI(title="Reco Articles API", version="2.0")
log = logging.getLogger("reco")

# Chargement du modèle depuis le registre MLflow (stage Production)
MODEL_URI = os.getenv("MODEL_URI", "models:/reco-articles/Production")
model = mlflow.pyfunc.load_model(MODEL_URI)
store = FeatureStore(repo_path="/app/feature_repo")

class RecoRequest(BaseModel):
    user_id: str
    exclude: list[str] = []   # articles déjà lus
    k: int = 10

@app.get("/healthz")
def health():
    return {"status": "ok", "model_uri": MODEL_URI}

@app.post("/recommend")
def recommend(req: RecoRequest):
    t0 = time.perf_counter()

    # 1. Features online (Redis) en < 10 ms
    features = store.get_online_features(
        features=["user_stats:clicks_7d", "user_stats:top_category"],
        entity_rows=[{"user_id": req.user_id}],
    ).to_dict()

    if features["clicks_7d"][0] is None:
        # Fallback cold-start : top articles populaires
        items = get_popular_articles(k=req.k)
    else:
        # 2. Inférence modèle — top-K
        items = model.predict({
            "user_id": req.user_id,
            "features": features,
            "k": req.k + len(req.exclude),
        })
        items = [i for i in items if i not in req.exclude][:req.k]

    latency_ms = (time.perf_counter() - t0) * 1000
    # Prometheus expose reco_latency_ms et reco_requests_total
    RECO_LATENCY.observe(latency_ms)
    RECO_REQUESTS.inc()

    return {"items": items, "latency_ms": round(latency_ms, 2)}
Dockerfile
# Stage 1 — build
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --target=/deps -r requirements.txt

# Stage 2 — runtime (image légère)
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /deps /usr/local/lib/python3.11/site-packages
COPY src/ ./src/
COPY feature_repo/ ./feature_repo/

ENV PYTHONUNBUFFERED=1
ENV MODEL_URI="models:/reco-articles/Production"
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s CMD curl -f http://localhost:8000/healthz || exit 1
CMD ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: reco-api
  labels: {app: reco-api, tier: serving}
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate: {maxSurge: 1, maxUnavailable: 0}
  selector:
    matchLabels: {app: reco-api}
  template:
    metadata: {labels: {app: reco-api, version: v2}}
    spec:
      containers:
        - name: api
          image: gcr.io/alderi/reco-api:v2.0.0
          ports: [{containerPort: 8000}]
          resources:
            requests: {memory: "512Mi", cpu: "250m"}
            limits:   {memory: "2Gi",   cpu: "1"}
          readinessProbe:
            httpGet: {path: /healthz, port: 8000}
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet: {path: /healthz, port: 8000}
            initialDelaySeconds: 30
            periodSeconds: 30
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata: {name: reco-api-hpa}
spec:
  scaleTargetRef: {apiVersion: apps/v1, kind: Deployment, name: reco-api}
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target: {type: Utilization, averageUtilization: 70}
Le service expose /recommend derrière un Deployment Kubernetes avec 3 replicas minimum, autoscale jusqu'à 20 selon le CPU, rolling update zéro downtime, et healthchecks. Le modèle est chargé au démarrage depuis le registre MLflow (stage Production) — changer l'URI suffit à promouvoir une nouvelle version sans rebuild d'image.

7. Monitoring et détection de drift

📖 Terme : Data drift / Concept drift

Définition : Le data drift est une dérive de la distribution des features d'entrée en prod par rapport à l'entraînement (ex. l'audience rajeunit de 3 ans). Le concept drift est une dérive de la relation X → y (ex. suite à un changement éditorial, les utilisateurs cliquent sur d'autres sujets).

But : Détecter une dégradation silencieuse du modèle avant qu'elle n'affecte le business.

Pourquoi ici : La publication d'un nouvel éditorial, un événement médiatique, un changement de saison — tout modifie la distribution. Sans détection automatique, on ne s'aperçoit du problème qu'en voyant le CTR chuter, plusieurs semaines trop tard.

src/monitoring/drift.py
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
import pandas as pd
import requests, os

# Référence : features du dernier entraînement
reference = pd.read_parquet("gs://reco-data/reference/features_v2.parquet")
# Production : features observées hier
current = pd.read_parquet("gs://reco-data/prod/features_last_24h.parquet")

report = Report(metrics=[DataDriftPreset(), TargetDriftPreset()])
report.run(reference_data=reference, current_data=current)

result = report.as_dict()
drift_share = result["metrics"][0]["result"]["drift_share"]

# ── Règle : si > 30% des features ont dérivé, alerter & déclencher re-train ──
if drift_share > 0.30:
    requests.post(
        os.getenv("SLACK_WEBHOOK"),
        json={"text": f"🚨 Data drift détecté : {drift_share:.0%} des features dérivent. Re-entraînement planifié."},
    )
    # Trigger un DAG Airflow de re-training manuel anticipé
    requests.post("http://airflow:8080/api/v1/dags/reco_articles_daily/dagRuns",
                  json={"conf": {"triggered_by": "drift_detector"}})

# ── Sauvegarder le rapport HTML pour audit ──
report.save_html(f"reports/drift_{pd.Timestamp.now().date()}.html")
Evidently compare la distribution actuelle des features à la référence (dernier entraînement). Si plus de 30% des features ont significativement dérivé (test de Kolmogorov-Smirnov pour les features numériques, chi-carré pour les catégorielles), on déclenche une alerte Slack et on force un re-entraînement anticipé via l'API Airflow. Les rapports HTML sont archivés pour audit.
📖 Terme : Shadow deployment & A/B testing

Définition : Le shadow mode route une copie du trafic de prod vers un nouveau modèle sans exposer ses prédictions à l'utilisateur — uniquement pour observer sa latence et ses prédictions. L'A/B testing expose le nouveau modèle à un échantillon (ex. 5 à 50%) d'utilisateurs réels pour mesurer l'impact business.

But : Valider un nouveau modèle sans risque (shadow) puis mesurer son impact réel (A/B).

Pourquoi ici : Les métriques offline (NDCG, Recall) ne reflètent pas toujours le comportement utilisateur réel. Un modèle peut avoir un meilleur NDCG mais un moins bon CTR à cause d'un biais de popularité. L'A/B test est le seul arbitre final.

8. CI/CD ML

.github/workflows/ci-cd.yml
name: reco-articles-ci-cd
on: {push: {branches: [main]}, pull_request: {}}

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.11"}
      - run: pip install -r requirements.txt
      - name: Lint
        run: ruff check src/ && black --check src/
      - name: Type-check
        run: mypy src/
      - name: Unit tests + coverage
        run: pytest tests/ --cov=src --cov-fail-under=80
      - name: Data validation (Great Expectations)
        run: great_expectations checkpoint run reco_checkpoint

  build-and-push:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Build Docker image
        run: docker build -t gcr.io/alderi/reco-api:${{ github.sha }} .
      - name: Scan vulnerabilities (Trivy)
        run: trivy image --exit-code 1 --severity HIGH,CRITICAL gcr.io/alderi/reco-api:${{ github.sha }}
      - name: Push
        run: docker push gcr.io/alderi/reco-api:${{ github.sha }}

  deploy-staging:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Kubernetes (staging)
        run: |
          kubectl set image deployment/reco-api api=gcr.io/alderi/reco-api:${{ github.sha }} -n staging
          kubectl rollout status deployment/reco-api -n staging --timeout=5m
      - name: Smoke test
        run: ./scripts/smoke_test.sh https://reco-staging.alderi.kamtchoua.com
Le pipeline CI/CD enforce 4 gates avant la prod : (1) lint + type-check + tests unitaires avec couverture ≥ 80%, (2) validation des données avec Great Expectations (schéma, valeurs, distributions), (3) scan de sécurité Trivy sur l'image Docker, (4) smoke test post-déploiement. La promotion vers la prod est manuelle (approval GitHub) après A/B test réussi en staging.
Métriques cibles en production (exemple réel) :
  • Latence P99 : < 120 ms (features Redis + inférence)
  • NDCG@10 offline : 0.42 (baseline popularity-based : 0.28)
  • CTR uplift A/B : +18% vs baseline
  • Temps entre deux re-entraînements : 24h (drift-triggered si nécessaire)
  • Re-déploiement automatique : < 15 min (CI + canary)

9. Checklist MLOps en production

Avant de dire qu'un système de reco est "en production", vérifier cette checklist :
  • Versioning : Git pour le code, DVC pour les données, MLflow pour les modèles.
  • Reproductibilité : dvc repro reconstruit exactement le modèle de prod.
  • Train-serving parity : features servies par le même Feast en train et prod.
  • Quality gate : pas de promotion si NDCG < baseline + 1%.
  • Canary / A/B : 5 → 50 → 100% progressif, rollback automatique sur CTR −5%.
  • Monitoring : latence, throughput, data drift, concept drift, fairness.
  • Alerting : Slack / PagerDuty branchés sur les seuils critiques.
  • Retraining automatique : DAG quotidien + trigger drift.
  • Observabilité : Prometheus + Grafana + logs structurés.
  • Sécurité : scan Trivy, secrets K8s, auth JWT sur l'API, RGPD.
📖 Terme : Boucle de feedback

Définition : Mécanisme qui renvoie les interactions utilisateur (clics, temps de lecture, skips) dans le pipeline d'entraînement pour que le modèle apprenne continuellement de ses propres recommandations.

But : Maintenir la pertinence du modèle dans le temps, intégrer automatiquement les nouveaux articles et les nouveaux utilisateurs.

Pourquoi ici : Attention au biais de feedback : si le modèle ne recommande jamais certains articles, il n'a pas de signal d'apprentissage dessus → il les recommandera encore moins. Il faut injecter volontairement de l'exploration (ex. ε-greedy, Thompson sampling, bandits contextuels) pour briser la boucle.

Conclusion

Un pipeline MLOps de recommandation d'articles n'est pas un modèle, c'est un système vivant : ingestion continue, features partagées entre train et serve, expérimentations tracées dans MLflow, orchestration Airflow, déploiements Kubernetes progressifs, monitoring de drift et boucle de feedback. Les concepts présentés ici (feature store, quality gate, canary deploy, drift detection) sont directement transposables à n'importe quel cas d'usage ML en production : pricing, fraude, churn, NLP — les briques restent les mêmes.