Ce cours construit un pipeline MLOps end-to-end pour recommander des articles sur une plateforme web (type blog, e-commerce éditorial ou média). On passe d'un notebook expérimental à un service de recommandation déployé, versionné, monitoré, avec re-entraînement automatique : ingestion, feature store, expérimentations MLflow, orchestration Airflow, conteneurisation Docker, serving FastAPI sur Kubernetes, monitoring de drift et boucle de feedback utilisateur.
Définition : MLOps (Machine Learning Operations) est l'ensemble des pratiques qui industrialisent le cycle de vie d'un modèle ML : versioning des données et du code, expérimentations reproductibles, déploiement continu, monitoring en production et re-entraînement automatique. C'est l'équivalent du DevOps, mais appliqué aux systèmes de Machine Learning.
But : Faire passer un modèle du notebook à la production de manière fiable, traçable et automatisée, puis maintenir sa qualité dans le temps.
Pourquoi ici : Un système de recommandation en production n'est pas un modèle figé : les contenus changent, les utilisateurs évoluent, la distribution des données dérive. Sans MLOps, les recommandations se dégradent silencieusement en quelques semaines. MLOps apporte la rigueur nécessaire pour maintenir la qualité dans la durée.
┌─────────────────────────────────────────────────────────────────────┐
│ PLATEFORME WEB (articles) │
│ ┌────────────┐ ┌──────────────┐ ┌────────────────────────────┐ │
│ │ Front React│──►│ API Gateway │──►│ /recommend (FastAPI + K8s) │ │
│ └────────────┘ └──────────────┘ └─────────────┬──────────────┘ │
│ │ │ │
│ │ events (clic, vue, durée) │ features │
│ ▼ ▼ │
│ ┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Kafka / Pub- │──────►│ Data Lake (GCS) │◄──│ Feature Store │ │
│ │ Sub Events │ │ bronze/silver │ │ (Feast + Redis) │ │
│ └──────────────┘ └────────┬────────┘ └──────────────────┘ │
│ │ ▲ │
│ ▼ │ │
│ ┌────────────────────┐ │ │
│ │ DVC / data version │───────────┘ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌─────────────────────────────▼────────────────────────────────┐ │
│ │ AIRFLOW / Kubeflow — orchestration (DAG journalier) │ │
│ │ ingest → features → train (MLflow) → eval → register │ │
│ │ → canary deploy → monitor │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────┐ │
│ │ MLflow Registry │──► image Docker ──►K8s │
│ │ (staging / prod) │ │
│ └────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Définition : DVC est un outil open-source qui étend Git pour versionner fichiers de données, modèles et pipelines trop lourds pour Git. Les fichiers binaires restent sur un stockage distant (S3, GCS), Git ne trace que des métadonnées légères (.dvc).
But : Reproduire exactement un entraînement en pointant un commit Git + un état des données.
Pourquoi ici : Sans DVC, on ne peut pas répondre à la question "avec quelles données exactes ce modèle de prod a-t-il été entraîné ?" — question indispensable pour l'audit, le debug et la conformité RGPD.
reco-articles/
├── .github/workflows/ci-cd.yml # CI/CD GitHub Actions
├── airflow/dags/reco_daily.py # DAG d'orchestration
├── data/ # versionné DVC, pas Git
│ ├── raw/events.parquet.dvc
│ └── processed/features.parquet.dvc
├── feature_repo/ # Feast feature store
│ ├── feature_store.yaml
│ └── features.py
├── src/
│ ├── ingestion/kafka_consumer.py
│ ├── features/build_features.py
│ ├── training/train.py # MLflow
│ ├── serving/app.py # FastAPI
│ └── monitoring/drift.py # Evidently
├── k8s/
│ ├── deployment.yaml
│ ├── service.yaml
│ └── hpa.yaml # Horizontal Pod Autoscaler
├── Dockerfile
├── dvc.yaml # pipeline DVC reproductible
├── mlflow.yaml
├── requirements.txt
└── README.md
stages:
ingest:
cmd: python src/ingestion/extract.py
deps: [src/ingestion/extract.py]
outs: [data/raw/events.parquet]
features:
cmd: python src/features/build_features.py
deps:
- data/raw/events.parquet
- src/features/build_features.py
outs: [data/processed/features.parquet]
train:
cmd: python src/training/train.py
deps:
- data/processed/features.parquet
- src/training/train.py
params: [train.lr, train.n_factors, train.epochs]
outs: [models/reco_model.pkl]
metrics: [metrics/train_metrics.json]
dvc.yaml. Chaque étape a ses dépendances (code + données d'entrée), ses sorties et ses paramètres. dvc repro ne re-exécute que les étapes dont les dépendances ont changé — un peu comme make mais qui comprend les données. Combiné à Git + un remote S3/GCS, on obtient un contrat exact entre code, données, params et modèle produit.Définition : Un Feature Store est une base spécialisée qui stocke, versionne et sert les features ML. Il expose les mêmes features en offline (pour l'entraînement, via batch SQL) et en online (pour l'inférence, via Redis avec latence < 10 ms).
But : Éliminer le train-serving skew (écart entre features d'entraînement et features de prod) en garantissant un calcul unique.
Pourquoi ici : Dans un système de reco, 40% des bugs de production viennent d'un écart train/serving (ex. l'heure est en UTC à l'entraînement et en local en prod). Un feature store définit la feature une seule fois, les deux environnements consomment la même définition.
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64, String
# ── Entités (clés primaires des features) ──
user = Entity(name="user_id", value_type=ValueType.STRING)
article = Entity(name="article_id", value_type=ValueType.STRING)
# ── Source de données (batch, parquet sur GCS) ──
user_stats_src = FileSource(
path="gs://reco-data/features/user_stats.parquet",
timestamp_field="event_ts",
)
# ── Feature View : features utilisateur ──
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=7), # expiration online
schema=[
Field(name="clicks_7d", dtype=Int64),
Field(name="read_ratio_7d", dtype=Float32),
Field(name="avg_session_len_s", dtype=Float32),
Field(name="top_category", dtype=String),
],
source=user_stats_src,
online=True, # servi via Redis à l'inférence
)
# ── Feature View : popularité article ──
article_stats_src = FileSource(
path="gs://reco-data/features/article_stats.parquet",
timestamp_field="event_ts",
)
article_stats_fv = FeatureView(
name="article_stats",
entities=[article],
ttl=timedelta(days=30),
schema=[
Field(name="views_24h", dtype=Int64),
Field(name="ctr_24h", dtype=Float32),
Field(name="avg_read_time_s", dtype=Float32),
Field(name="category", dtype=String),
],
source=article_stats_src,
online=True,
)
Définition : Jointure qui reconstitue l'état des features tel qu'il était à un horodatage passé — par exemple, les features de l'utilisateur à l'instant T où il a cliqué sur un article, et pas ses features d'aujourd'hui.
But : Éviter la fuite temporelle (data leakage) qui rendrait l'entraînement trop optimiste.
Pourquoi ici : Si on entraîne avec les features actuelles sur un événement vieux de 6 mois, le modèle "voit" le futur — en prod, ses prédictions s'effondrent. Feast gère automatiquement ce join temporel via get_historical_features.
Définition : MLflow est une plateforme open-source qui enregistre chaque run d'entraînement (params, métriques, artefacts) et publie les modèles dans un registre avec des stages (None → Staging → Production → Archived).
But : Traçabilité totale des expérimentations et promotion formelle d'un modèle vers la production.
Pourquoi ici : En reco, on teste des dizaines de variantes (matrix factorization, LightFM, two-tower, transformers). Sans tracking, on perd la trace des hyperparams qui ont donné le meilleur NDCG. Le registry évite aussi qu'un data scientist "pousse son .pkl" à la main en prod.
import mlflow
import mlflow.sklearn
from feast import FeatureStore
from implicit.als import AlternatingLeastSquares
from sklearn.metrics import ndcg_score
import pandas as pd
import scipy.sparse as sp
import numpy as np
mlflow.set_tracking_uri("http://mlflow.ml-platform.svc:5000")
mlflow.set_experiment("reco-articles")
# ── 1. Récupérer les features depuis Feast (offline) ──
store = FeatureStore(repo_path="feature_repo/")
entity_df = pd.read_parquet("data/raw/clicks.parquet") # user_id, article_id, event_ts, label
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:clicks_7d",
"user_stats:read_ratio_7d",
"user_stats:top_category",
"article_stats:ctr_24h",
"article_stats:avg_read_time_s",
"article_stats:category",
],
).to_df()
# ── 2. Matrice interaction user × article (implicit feedback) ──
user_idx = {u: i for i, u in enumerate(training_df["user_id"].unique())}
art_idx = {a: i for i, a in enumerate(training_df["article_id"].unique())}
rows = training_df["user_id"].map(user_idx)
cols = training_df["article_id"].map(art_idx)
vals = training_df["label"].astype("float32") # 1 = clic, 0 = skip
matrix = sp.coo_matrix((vals, (rows, cols))).tocsr()
# ── 3. Entraînement ALS avec tracking MLflow ──
with mlflow.start_run(run_name="als_v2") as run:
params = {"factors": 128, "regularization": 0.01, "iterations": 30}
mlflow.log_params(params)
model = AlternatingLeastSquares(**params)
model.fit(matrix)
# ── 4. Évaluation hors-ligne (NDCG@10, Recall@20) ──
test_df = pd.read_parquet("data/raw/clicks_test.parquet")
y_true, y_score = score_test_set(model, test_df, user_idx, art_idx)
ndcg10 = ndcg_score(y_true, y_score, k=10)
recall20 = recall_at_k(y_true, y_score, k=20)
mlflow.log_metrics({"ndcg@10": ndcg10, "recall@20": recall20})
mlflow.sklearn.log_model(model, artifact_path="model",
registered_model_name="reco-articles")
# ── 5. Gate qualité : on ne promeut que si le modèle bat la baseline ──
baseline_ndcg = load_baseline_metric("ndcg@10")
if ndcg10 > baseline_ndcg * 1.01: # +1% requis
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="reco-articles",
version=run.info.run_id,
stage="Staging",
)
print(f"✅ Promu en Staging (NDCG@10 = {ndcg10:.4f})")
else:
print(f"❌ Pas de promotion (NDCG@10 = {ndcg10:.4f} vs baseline {baseline_ndcg:.4f})")
Définition : Métrique standard d'évaluation de recommandations : mesure la qualité du ranking des k premiers items recommandés, en pondérant davantage les bonnes prédictions en tête de liste.
But : Valoriser les recommandations pertinentes placées en haut (où l'utilisateur clique en pratique).
Pourquoi ici : L'accuracy classique est inutile en reco (il y a 10 000 articles, tu en recommandes 10 — 99,9% "d'accuracy" sur les articles non-recommandés n'a aucun sens). NDCG@10 évalue directement ce qui compte : est-ce que les 10 meilleures suggestions sont pertinentes, et dans le bon ordre ?
Définition : Un DAG (Directed Acyclic Graph) est une description Python des étapes d'un pipeline Airflow, de leurs dépendances et de leur planification. Airflow exécute les tâches dans le bon ordre, retry en cas d'échec, et expose une UI de monitoring.
But : Automatiser l'exécution du pipeline avec planification, retry, alerting et observabilité.
Pourquoi ici : Un pipeline de reco se ré-entraîne tous les jours à 3h du matin. Sans orchestrateur, c'est un cron fragile sans logs centralisés. Airflow donne retry automatique, SLAs, alerting Slack et backfill historique en une ligne.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
default_args = {
"owner": "alderi",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"sla": timedelta(hours=2),
}
with DAG(
dag_id="reco_articles_daily",
default_args=default_args,
description="Pipeline MLOps reco — quotidien 03h UTC",
schedule="0 3 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["mlops", "reco"],
) as dag:
ingest = BashOperator(
task_id="ingest_events",
bash_command="python /opt/src/ingestion/extract.py --date {{ ds }}",
)
features = BashOperator(
task_id="materialize_features",
bash_command="cd /opt/feature_repo && feast materialize-incremental {{ ds }}",
)
train = KubernetesPodOperator(
task_id="train_model",
name="reco-train",
image="gcr.io/alderi/reco-train:latest",
cmds=["python", "/opt/src/training/train.py"],
resources={"request_memory": "8Gi", "limit_cpu": "4", "limit_gpu": 1},
is_delete_operator_pod=True,
)
evaluate = PythonOperator(
task_id="evaluate_model",
python_callable=evaluate_against_baseline,
)
deploy = PythonOperator(
task_id="canary_deploy",
python_callable=canary_rollout, # 5% du trafic sur v+1
trigger_rule="all_success",
)
monitor = PythonOperator(
task_id="monitor_drift",
python_callable=check_data_drift,
)
ingest >> features >> train >> evaluate >> deploy >> monitor
train utilise KubernetesPodOperator pour lancer l'entraînement dans un pod dédié avec GPU — Airflow reste léger, le gros calcul est isolé. Chaque tâche a 2 retries automatiques, une alerte email en cas d'échec et un SLA de 2h pour l'ensemble. Le canary_deploy ne route que 5% du trafic vers le nouveau modèle — si les métriques business dérivent, on annule automatiquement.from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import mlflow.pyfunc
import numpy as np
import os, time, logging
app = FastAPI(title="Reco Articles API", version="2.0")
log = logging.getLogger("reco")
# Chargement du modèle depuis le registre MLflow (stage Production)
MODEL_URI = os.getenv("MODEL_URI", "models:/reco-articles/Production")
model = mlflow.pyfunc.load_model(MODEL_URI)
store = FeatureStore(repo_path="/app/feature_repo")
class RecoRequest(BaseModel):
user_id: str
exclude: list[str] = [] # articles déjà lus
k: int = 10
@app.get("/healthz")
def health():
return {"status": "ok", "model_uri": MODEL_URI}
@app.post("/recommend")
def recommend(req: RecoRequest):
t0 = time.perf_counter()
# 1. Features online (Redis) en < 10 ms
features = store.get_online_features(
features=["user_stats:clicks_7d", "user_stats:top_category"],
entity_rows=[{"user_id": req.user_id}],
).to_dict()
if features["clicks_7d"][0] is None:
# Fallback cold-start : top articles populaires
items = get_popular_articles(k=req.k)
else:
# 2. Inférence modèle — top-K
items = model.predict({
"user_id": req.user_id,
"features": features,
"k": req.k + len(req.exclude),
})
items = [i for i in items if i not in req.exclude][:req.k]
latency_ms = (time.perf_counter() - t0) * 1000
# Prometheus expose reco_latency_ms et reco_requests_total
RECO_LATENCY.observe(latency_ms)
RECO_REQUESTS.inc()
return {"items": items, "latency_ms": round(latency_ms, 2)}
# Stage 1 — build
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --target=/deps -r requirements.txt
# Stage 2 — runtime (image légère)
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /deps /usr/local/lib/python3.11/site-packages
COPY src/ ./src/
COPY feature_repo/ ./feature_repo/
ENV PYTHONUNBUFFERED=1
ENV MODEL_URI="models:/reco-articles/Production"
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s CMD curl -f http://localhost:8000/healthz || exit 1
CMD ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: reco-api
labels: {app: reco-api, tier: serving}
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate: {maxSurge: 1, maxUnavailable: 0}
selector:
matchLabels: {app: reco-api}
template:
metadata: {labels: {app: reco-api, version: v2}}
spec:
containers:
- name: api
image: gcr.io/alderi/reco-api:v2.0.0
ports: [{containerPort: 8000}]
resources:
requests: {memory: "512Mi", cpu: "250m"}
limits: {memory: "2Gi", cpu: "1"}
readinessProbe:
httpGet: {path: /healthz, port: 8000}
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet: {path: /healthz, port: 8000}
initialDelaySeconds: 30
periodSeconds: 30
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata: {name: reco-api-hpa}
spec:
scaleTargetRef: {apiVersion: apps/v1, kind: Deployment, name: reco-api}
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target: {type: Utilization, averageUtilization: 70}
/recommend derrière un Deployment Kubernetes avec 3 replicas minimum, autoscale jusqu'à 20 selon le CPU, rolling update zéro downtime, et healthchecks. Le modèle est chargé au démarrage depuis le registre MLflow (stage Production) — changer l'URI suffit à promouvoir une nouvelle version sans rebuild d'image.Définition : Le data drift est une dérive de la distribution des features d'entrée en prod par rapport à l'entraînement (ex. l'audience rajeunit de 3 ans). Le concept drift est une dérive de la relation X → y (ex. suite à un changement éditorial, les utilisateurs cliquent sur d'autres sujets).
But : Détecter une dégradation silencieuse du modèle avant qu'elle n'affecte le business.
Pourquoi ici : La publication d'un nouvel éditorial, un événement médiatique, un changement de saison — tout modifie la distribution. Sans détection automatique, on ne s'aperçoit du problème qu'en voyant le CTR chuter, plusieurs semaines trop tard.
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
import pandas as pd
import requests, os
# Référence : features du dernier entraînement
reference = pd.read_parquet("gs://reco-data/reference/features_v2.parquet")
# Production : features observées hier
current = pd.read_parquet("gs://reco-data/prod/features_last_24h.parquet")
report = Report(metrics=[DataDriftPreset(), TargetDriftPreset()])
report.run(reference_data=reference, current_data=current)
result = report.as_dict()
drift_share = result["metrics"][0]["result"]["drift_share"]
# ── Règle : si > 30% des features ont dérivé, alerter & déclencher re-train ──
if drift_share > 0.30:
requests.post(
os.getenv("SLACK_WEBHOOK"),
json={"text": f"🚨 Data drift détecté : {drift_share:.0%} des features dérivent. Re-entraînement planifié."},
)
# Trigger un DAG Airflow de re-training manuel anticipé
requests.post("http://airflow:8080/api/v1/dags/reco_articles_daily/dagRuns",
json={"conf": {"triggered_by": "drift_detector"}})
# ── Sauvegarder le rapport HTML pour audit ──
report.save_html(f"reports/drift_{pd.Timestamp.now().date()}.html")
Définition : Le shadow mode route une copie du trafic de prod vers un nouveau modèle sans exposer ses prédictions à l'utilisateur — uniquement pour observer sa latence et ses prédictions. L'A/B testing expose le nouveau modèle à un échantillon (ex. 5 à 50%) d'utilisateurs réels pour mesurer l'impact business.
But : Valider un nouveau modèle sans risque (shadow) puis mesurer son impact réel (A/B).
Pourquoi ici : Les métriques offline (NDCG, Recall) ne reflètent pas toujours le comportement utilisateur réel. Un modèle peut avoir un meilleur NDCG mais un moins bon CTR à cause d'un biais de popularité. L'A/B test est le seul arbitre final.
name: reco-articles-ci-cd
on: {push: {branches: [main]}, pull_request: {}}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: {python-version: "3.11"}
- run: pip install -r requirements.txt
- name: Lint
run: ruff check src/ && black --check src/
- name: Type-check
run: mypy src/
- name: Unit tests + coverage
run: pytest tests/ --cov=src --cov-fail-under=80
- name: Data validation (Great Expectations)
run: great_expectations checkpoint run reco_checkpoint
build-and-push:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t gcr.io/alderi/reco-api:${{ github.sha }} .
- name: Scan vulnerabilities (Trivy)
run: trivy image --exit-code 1 --severity HIGH,CRITICAL gcr.io/alderi/reco-api:${{ github.sha }}
- name: Push
run: docker push gcr.io/alderi/reco-api:${{ github.sha }}
deploy-staging:
needs: build-and-push
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes (staging)
run: |
kubectl set image deployment/reco-api api=gcr.io/alderi/reco-api:${{ github.sha }} -n staging
kubectl rollout status deployment/reco-api -n staging --timeout=5m
- name: Smoke test
run: ./scripts/smoke_test.sh https://reco-staging.alderi.kamtchoua.com
dvc repro reconstruit exactement le modèle de prod.Définition : Mécanisme qui renvoie les interactions utilisateur (clics, temps de lecture, skips) dans le pipeline d'entraînement pour que le modèle apprenne continuellement de ses propres recommandations.
But : Maintenir la pertinence du modèle dans le temps, intégrer automatiquement les nouveaux articles et les nouveaux utilisateurs.
Pourquoi ici : Attention au biais de feedback : si le modèle ne recommande jamais certains articles, il n'a pas de signal d'apprentissage dessus → il les recommandera encore moins. Il faut injecter volontairement de l'exploration (ex. ε-greedy, Thompson sampling, bandits contextuels) pour briser la boucle.
Un pipeline MLOps de recommandation d'articles n'est pas un modèle, c'est un système vivant : ingestion continue, features partagées entre train et serve, expérimentations tracées dans MLflow, orchestration Airflow, déploiements Kubernetes progressifs, monitoring de drift et boucle de feedback. Les concepts présentés ici (feature store, quality gate, canary deploy, drift detection) sont directement transposables à n'importe quel cas d'usage ML en production : pricing, fraude, churn, NLP — les briques restent les mêmes.