This course builds an end-to-end MLOps pipeline for recommending articles on a web platform (blog, editorial e-commerce, or media site). We move from an experimental notebook to a fully deployed, versioned and monitored recommendation service with automated re-training: ingestion, feature store, MLflow experiments, Airflow orchestration, Docker containerisation, FastAPI serving on Kubernetes, drift monitoring and a user-feedback loop.
Definition: MLOps (Machine Learning Operations) is the set of practices that industrialise the full lifecycle of an ML model: data and code versioning, reproducible experiments, continuous deployment, production monitoring and automated retraining. It is the DevOps equivalent for ML systems.
Purpose: Move a model from notebook to production reliably, traceably and automatically, then maintain its quality over time.
Why it matters here: A production recommender is never a frozen model: content changes, users evolve, data distribution drifts. Without MLOps, recommendations silently degrade within weeks. MLOps brings the discipline needed to sustain quality long-term.
┌─────────────────────────────────────────────────────────────────────┐
│ WEB PLATFORM (articles) │
│ ┌────────────┐ ┌──────────────┐ ┌────────────────────────────┐ │
│ │ React Front│──►│ API Gateway │──►│ /recommend (FastAPI + K8s) │ │
│ └────────────┘ └──────────────┘ └─────────────┬──────────────┘ │
│ │ │ │
│ │ events (click, view, dwell) │ features │
│ ▼ ▼ │
│ ┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Kafka / Pub- │──────►│ Data Lake (GCS) │◄──│ Feature Store │ │
│ │ Sub Events │ │ bronze/silver │ │ (Feast + Redis) │ │
│ └──────────────┘ └────────┬────────┘ └──────────────────┘ │
│ │ ▲ │
│ ▼ │ │
│ ┌────────────────────┐ │ │
│ │ DVC / data version │───────────┘ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌─────────────────────────────▼────────────────────────────────┐ │
│ │ AIRFLOW / Kubeflow — orchestration (daily DAG) │ │
│ │ ingest → features → train (MLflow) → eval → register │ │
│ │ → canary deploy → monitor │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────┐ │
│ │ MLflow Registry │──► Docker image ──►K8s │
│ │ (staging / prod) │ │
│ └────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Definition: DVC is an open-source tool that extends Git to version data files, models and pipelines too heavy for Git. Binary files stay on a remote store (S3, GCS), Git only tracks light metadata (.dvc).
Purpose: Exactly reproduce a training run by pointing to a Git commit + data state.
Why it matters here: Without DVC you cannot answer "which exact dataset produced this production model?" — essential for audit, debugging and GDPR compliance.
reco-articles/
├── .github/workflows/ci-cd.yml # GitHub Actions CI/CD
├── airflow/dags/reco_daily.py # orchestration DAG
├── data/ # versioned with DVC, not Git
│ ├── raw/events.parquet.dvc
│ └── processed/features.parquet.dvc
├── feature_repo/ # Feast feature store
│ ├── feature_store.yaml
│ └── features.py
├── src/
│ ├── ingestion/kafka_consumer.py
│ ├── features/build_features.py
│ ├── training/train.py # MLflow
│ ├── serving/app.py # FastAPI
│ └── monitoring/drift.py # Evidently
├── k8s/
│ ├── deployment.yaml
│ ├── service.yaml
│ └── hpa.yaml # Horizontal Pod Autoscaler
├── Dockerfile
├── dvc.yaml # reproducible DVC pipeline
├── mlflow.yaml
├── requirements.txt
└── README.md
stages:
ingest:
cmd: python src/ingestion/extract.py
deps: [src/ingestion/extract.py]
outs: [data/raw/events.parquet]
features:
cmd: python src/features/build_features.py
deps:
- data/raw/events.parquet
- src/features/build_features.py
outs: [data/processed/features.parquet]
train:
cmd: python src/training/train.py
deps:
- data/processed/features.parquet
- src/training/train.py
params: [train.lr, train.n_factors, train.epochs]
outs: [models/reco_model.pkl]
metrics: [metrics/train_metrics.json]
dvc.yaml. Each stage has its dependencies (code + input data), outputs and parameters. dvc repro only re-runs the stages whose dependencies changed — a bit like make but data-aware. Combined with Git + an S3/GCS remote, you get a tight contract between code, data, parameters and the resulting model.Definition: A Feature Store is a specialised database that stores, versions and serves ML features. It exposes the same features both offline (batch SQL, for training) and online (Redis, < 10 ms, for inference).
Purpose: Eliminate train-serving skew (mismatch between training features and production features) by guaranteeing a single source of truth.
Why it matters here: In a recommender system, 40% of production bugs come from train/serving drift (e.g. timestamps in UTC during training but local at serve time). A feature store defines a feature once, and both environments consume the same definition.
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64, String
# ── Entities (primary keys of features) ──
user = Entity(name="user_id", value_type=ValueType.STRING)
article = Entity(name="article_id", value_type=ValueType.STRING)
# ── Data source (batch, parquet on GCS) ──
user_stats_src = FileSource(
path="gs://reco-data/features/user_stats.parquet",
timestamp_field="event_ts",
)
# ── Feature View: user features ──
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=7), # online expiration
schema=[
Field(name="clicks_7d", dtype=Int64),
Field(name="read_ratio_7d", dtype=Float32),
Field(name="avg_session_len_s", dtype=Float32),
Field(name="top_category", dtype=String),
],
source=user_stats_src,
online=True, # served via Redis at inference time
)
# ── Feature View: article popularity ──
article_stats_src = FileSource(
path="gs://reco-data/features/article_stats.parquet",
timestamp_field="event_ts",
)
article_stats_fv = FeatureView(
name="article_stats",
entities=[article],
ttl=timedelta(days=30),
schema=[
Field(name="views_24h", dtype=Int64),
Field(name="ctr_24h", dtype=Float32),
Field(name="avg_read_time_s", dtype=Float32),
Field(name="category", dtype=String),
],
source=article_stats_src,
online=True,
)
Definition: A join that reconstructs feature state as of a past timestamp — e.g., a user's features at the instant they clicked an article, not today's features.
Purpose: Avoid temporal leakage that would make training overly optimistic.
Why it matters here: If you train with today's features on a 6-month-old event, the model "sees the future" — its production predictions then collapse. Feast handles this temporal join automatically via get_historical_features.
Definition: MLflow is an open-source platform that records every training run (params, metrics, artifacts) and publishes models to a registry with stages (None → Staging → Production → Archived).
Purpose: Full traceability of experiments and formal promotion of a model to production.
Why it matters here: In recsys you test dozens of variants (matrix factorization, LightFM, two-tower, transformers). Without tracking you lose the hyperparameters that gave the best NDCG. The registry also prevents the classic "push the .pkl by hand to prod".
import mlflow
import mlflow.sklearn
from feast import FeatureStore
from implicit.als import AlternatingLeastSquares
from sklearn.metrics import ndcg_score
import pandas as pd
import scipy.sparse as sp
import numpy as np
mlflow.set_tracking_uri("http://mlflow.ml-platform.svc:5000")
mlflow.set_experiment("reco-articles")
# ── 1. Fetch features from Feast (offline) ──
store = FeatureStore(repo_path="feature_repo/")
entity_df = pd.read_parquet("data/raw/clicks.parquet") # user_id, article_id, event_ts, label
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:clicks_7d",
"user_stats:read_ratio_7d",
"user_stats:top_category",
"article_stats:ctr_24h",
"article_stats:avg_read_time_s",
"article_stats:category",
],
).to_df()
# ── 2. User × article interaction matrix (implicit feedback) ──
user_idx = {u: i for i, u in enumerate(training_df["user_id"].unique())}
art_idx = {a: i for i, a in enumerate(training_df["article_id"].unique())}
rows = training_df["user_id"].map(user_idx)
cols = training_df["article_id"].map(art_idx)
vals = training_df["label"].astype("float32") # 1 = click, 0 = skip
matrix = sp.coo_matrix((vals, (rows, cols))).tocsr()
# ── 3. ALS training with MLflow tracking ──
with mlflow.start_run(run_name="als_v2") as run:
params = {"factors": 128, "regularization": 0.01, "iterations": 30}
mlflow.log_params(params)
model = AlternatingLeastSquares(**params)
model.fit(matrix)
# ── 4. Offline evaluation (NDCG@10, Recall@20) ──
test_df = pd.read_parquet("data/raw/clicks_test.parquet")
y_true, y_score = score_test_set(model, test_df, user_idx, art_idx)
ndcg10 = ndcg_score(y_true, y_score, k=10)
recall20 = recall_at_k(y_true, y_score, k=20)
mlflow.log_metrics({"ndcg@10": ndcg10, "recall@20": recall20})
mlflow.sklearn.log_model(model, artifact_path="model",
registered_model_name="reco-articles")
# ── 5. Quality gate: promote only if new model beats the baseline ──
baseline_ndcg = load_baseline_metric("ndcg@10")
if ndcg10 > baseline_ndcg * 1.01: # +1% required
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="reco-articles",
version=run.info.run_id,
stage="Staging",
)
print(f"✅ Promoted to Staging (NDCG@10 = {ndcg10:.4f})")
else:
print(f"❌ No promotion (NDCG@10 = {ndcg10:.4f} vs baseline {baseline_ndcg:.4f})")
Definition: Standard recsys evaluation metric: measures the quality of the ranking of the top-k recommended items, giving more weight to correct predictions at the top of the list.
Purpose: Reward relevant recommendations placed near the top (where users actually click).
Why it matters here: Classical accuracy is useless in recsys (there are 10,000 articles, you recommend 10 — 99.9% "accuracy" on non-recommended items means nothing). NDCG@10 directly evaluates what matters: are the top 10 suggestions relevant, and in the right order?
Definition: A DAG (Directed Acyclic Graph) is a Python description of an Airflow pipeline — its steps, their dependencies, their schedule. Airflow runs tasks in the right order, retries on failure, and exposes a monitoring UI.
Purpose: Automate pipeline execution with scheduling, retries, alerting and observability.
Why it matters here: A recsys pipeline retrains every day at 3am. Without an orchestrator, you have a fragile cron with scattered logs. Airflow gives you automatic retries, SLAs, Slack alerting and historical backfill in one file.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
default_args = {
"owner": "alderi",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"sla": timedelta(hours=2),
}
with DAG(
dag_id="reco_articles_daily",
default_args=default_args,
description="Reco MLOps pipeline — daily 03:00 UTC",
schedule="0 3 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["mlops", "reco"],
) as dag:
ingest = BashOperator(
task_id="ingest_events",
bash_command="python /opt/src/ingestion/extract.py --date {{ ds }}",
)
features = BashOperator(
task_id="materialize_features",
bash_command="cd /opt/feature_repo && feast materialize-incremental {{ ds }}",
)
train = KubernetesPodOperator(
task_id="train_model",
name="reco-train",
image="gcr.io/alderi/reco-train:latest",
cmds=["python", "/opt/src/training/train.py"],
resources={"request_memory": "8Gi", "limit_cpu": "4", "limit_gpu": 1},
is_delete_operator_pod=True,
)
evaluate = PythonOperator(
task_id="evaluate_model",
python_callable=evaluate_against_baseline,
)
deploy = PythonOperator(
task_id="canary_deploy",
python_callable=canary_rollout, # 5% of traffic on v+1
trigger_rule="all_success",
)
monitor = PythonOperator(
task_id="monitor_drift",
python_callable=check_data_drift,
)
ingest >> features >> train >> evaluate >> deploy >> monitor
train step uses KubernetesPodOperator to launch training in a dedicated pod with GPU — Airflow stays lightweight while heavy compute is isolated. Each task has 2 automatic retries, an email alert on failure and an SLA of 2h for the whole DAG. The canary_deploy routes only 5% of traffic to the new model — if business metrics regress, it auto-rolls back.from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import mlflow.pyfunc
import numpy as np
import os, time, logging
app = FastAPI(title="Reco Articles API", version="2.0")
log = logging.getLogger("reco")
# Load the model from MLflow registry (Production stage)
MODEL_URI = os.getenv("MODEL_URI", "models:/reco-articles/Production")
model = mlflow.pyfunc.load_model(MODEL_URI)
store = FeatureStore(repo_path="/app/feature_repo")
class RecoRequest(BaseModel):
user_id: str
exclude: list[str] = [] # already-read articles
k: int = 10
@app.get("/healthz")
def health():
return {"status": "ok", "model_uri": MODEL_URI}
@app.post("/recommend")
def recommend(req: RecoRequest):
t0 = time.perf_counter()
# 1. Online features (Redis) in < 10 ms
features = store.get_online_features(
features=["user_stats:clicks_7d", "user_stats:top_category"],
entity_rows=[{"user_id": req.user_id}],
).to_dict()
if features["clicks_7d"][0] is None:
# Cold-start fallback: popular articles
items = get_popular_articles(k=req.k)
else:
# 2. Model inference — top-K
items = model.predict({
"user_id": req.user_id,
"features": features,
"k": req.k + len(req.exclude),
})
items = [i for i in items if i not in req.exclude][:req.k]
latency_ms = (time.perf_counter() - t0) * 1000
# Prometheus exposes reco_latency_ms and reco_requests_total
RECO_LATENCY.observe(latency_ms)
RECO_REQUESTS.inc()
return {"items": items, "latency_ms": round(latency_ms, 2)}
# Stage 1 — build
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --target=/deps -r requirements.txt
# Stage 2 — runtime (lightweight image)
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /deps /usr/local/lib/python3.11/site-packages
COPY src/ ./src/
COPY feature_repo/ ./feature_repo/
ENV PYTHONUNBUFFERED=1
ENV MODEL_URI="models:/reco-articles/Production"
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s CMD curl -f http://localhost:8000/healthz || exit 1
CMD ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: reco-api
labels: {app: reco-api, tier: serving}
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate: {maxSurge: 1, maxUnavailable: 0}
selector:
matchLabels: {app: reco-api}
template:
metadata: {labels: {app: reco-api, version: v2}}
spec:
containers:
- name: api
image: gcr.io/alderi/reco-api:v2.0.0
ports: [{containerPort: 8000}]
resources:
requests: {memory: "512Mi", cpu: "250m"}
limits: {memory: "2Gi", cpu: "1"}
readinessProbe:
httpGet: {path: /healthz, port: 8000}
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet: {path: /healthz, port: 8000}
initialDelaySeconds: 30
periodSeconds: 30
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata: {name: reco-api-hpa}
spec:
scaleTargetRef: {apiVersion: apps/v1, kind: Deployment, name: reco-api}
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target: {type: Utilization, averageUtilization: 70}
/recommend behind a Kubernetes Deployment with 3 minimum replicas, autoscaling up to 20 based on CPU, zero-downtime rolling updates and health checks. The model is loaded at startup from the MLflow registry (Production stage) — changing the URI is enough to promote a new version without rebuilding the image.Definition: Data drift is a shift in the input-feature distribution in production vs training (e.g. the audience grows 3 years younger). Concept drift is a shift in the X → y relationship (e.g. after an editorial shift, users click on different topics).
Purpose: Catch a silent model degradation before it hits business KPIs.
Why it matters here: A new editorial line, a news event, a season change — all modify the distribution. Without automated detection, you only realise it once CTR collapses, several weeks too late.
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
import pandas as pd
import requests, os
# Reference: features from the latest training run
reference = pd.read_parquet("gs://reco-data/reference/features_v2.parquet")
# Current: features observed yesterday in production
current = pd.read_parquet("gs://reco-data/prod/features_last_24h.parquet")
report = Report(metrics=[DataDriftPreset(), TargetDriftPreset()])
report.run(reference_data=reference, current_data=current)
result = report.as_dict()
drift_share = result["metrics"][0]["result"]["drift_share"]
# ── Rule: if more than 30% of features drifted, alert & trigger retrain ──
if drift_share > 0.30:
requests.post(
os.getenv("SLACK_WEBHOOK"),
json={"text": f"🚨 Data drift detected: {drift_share:.0%} of features drifting. Retrain scheduled."},
)
# Trigger an ad-hoc Airflow DAG run for retraining
requests.post("http://airflow:8080/api/v1/dags/reco_articles_daily/dagRuns",
json={"conf": {"triggered_by": "drift_detector"}})
# ── Save HTML report for audit ──
report.save_html(f"reports/drift_{pd.Timestamp.now().date()}.html")
Definition: Shadow mode routes a copy of production traffic to a new model without exposing its predictions to users — we only observe latency and predictions. A/B testing exposes the new model to a sample of real users (e.g. 5–50%) to measure business impact.
Purpose: Validate a new model risk-free (shadow) then measure its actual impact (A/B).
Why it matters here: Offline metrics (NDCG, Recall) don't always reflect real user behaviour. A model can have better NDCG but worse CTR due to a popularity bias. The A/B test is the only final arbiter.
name: reco-articles-ci-cd
on: {push: {branches: [main]}, pull_request: {}}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: {python-version: "3.11"}
- run: pip install -r requirements.txt
- name: Lint
run: ruff check src/ && black --check src/
- name: Type-check
run: mypy src/
- name: Unit tests + coverage
run: pytest tests/ --cov=src --cov-fail-under=80
- name: Data validation (Great Expectations)
run: great_expectations checkpoint run reco_checkpoint
build-and-push:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t gcr.io/alderi/reco-api:${{ github.sha }} .
- name: Scan vulnerabilities (Trivy)
run: trivy image --exit-code 1 --severity HIGH,CRITICAL gcr.io/alderi/reco-api:${{ github.sha }}
- name: Push
run: docker push gcr.io/alderi/reco-api:${{ github.sha }}
deploy-staging:
needs: build-and-push
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes (staging)
run: |
kubectl set image deployment/reco-api api=gcr.io/alderi/reco-api:${{ github.sha }} -n staging
kubectl rollout status deployment/reco-api -n staging --timeout=5m
- name: Smoke test
run: ./scripts/smoke_test.sh https://reco-staging.alderi.kamtchoua.com
dvc repro rebuilds the exact production model.Definition: A mechanism that feeds user interactions (clicks, read time, skips) back into the training pipeline so the model keeps learning from its own recommendations.
Purpose: Keep the model relevant over time and integrate new articles and new users automatically.
Why it matters here: Watch out for the feedback bias: if the model never recommends certain articles, it never gets a training signal on them → it will recommend them even less. You must inject exploration (ε-greedy, Thompson sampling, contextual bandits) to break the loop.
An MLOps pipeline for article recommendation is not a model — it is a living system: continuous ingestion, shared features between train and serve, experiments tracked in MLflow, Airflow orchestration, progressive Kubernetes rollouts, drift monitoring and a feedback loop. The concepts covered here (feature store, quality gate, canary deploy, drift detection) transfer directly to any production ML use case: pricing, fraud, churn, NLP — the building blocks remain the same.