JWT (JSON Web Token) est le standard pour sécuriser les APIs REST. Ce tutoriel construit une API complète avec inscription, connexion, access tokens, refresh tokens et protection des routes par rôles.
Définition : Un standard (RFC 7519) pour créer des tokens compacts et auto-contenus, composés de trois parties encodées en Base64 séparées par des points : header.payload.signature.
But : Transmettre des informations d'authentification et d'autorisation sans état de session côté serveur. Le client stocke et renvoie le token à chaque requête.
Pourquoi ici : Chaque requête API REST doit pouvoir être servie par n'importe quel serveur—les JWT éliminent le besoin de partager l'état de session.
Définition : Access token = court-vécu (30 min), utilisé pour les requêtes. Refresh token = long-vécu (7 jours), stocké sécurisé, utilisé uniquement pour demander un nouvel access token.
But : Si un access token est volé, il n'est valable que 30 min. Le refresh token reste secret et ne circule pas dans chaque requête (moins de risque).
Pourquoi ici : Pattern standard appelé "token rotation"—si tu ne le fais pas, un token volé = accès permanent à ton compte.
Définition : Un standard d'autorisation (RFC 6749) pour déléguer l'accès aux ressources. FastAPI utilise OAuth2PasswordBearer pour extraire les tokens du header Authorization.
But : Fournir un cadre interopérable pour l'authentification—clients, serveurs et proxies se comprennent.
Pourquoi ici : Les IDEs et outils (Swagger/docs FastAPI) reconnaissent OAuth2 et ajoutent automatiquement les boutons de login.
api-jwt/
├── main.py # Point d'entrée FastAPI
├── database.py # Configuration SQLAlchemy
├── models.py # Modèles ORM
├── schemas.py # Schémas Pydantic (validation)
├── auth.py # Logique JWT
├── routes/
│ ├── auth.py # /auth/register, /auth/login, /auth/refresh
│ └── users.py # /users/me, /users/ (admin)
└── requirements.txt
Définition : Un pattern où les dépendances d'une fonction (BD, token validation, config) sont injectées plutôt que créées à l'intérieur.
But : Rendre le code testable et modulaire. FastAPI utilise Depends() pour injecter automatiquement.
Pourquoi ici : Depends(get_current_user) = FastAPI appelle la fonction et passe le résultat automatiquement—magie du framework.
Définition : Un composant qui intercepte chaque requête avant le handler et chaque réponse avant de la retourner au client.
But : Ajouter du comportement transversal (logging, ajout de headers, timing) sans modifier chaque route.
Pourquoi ici : Le middleware authentification serait un exemple—intercepte tout et vérifie le token.
fastapi==0.111.0
uvicorn[standard]==0.29.0
sqlalchemy==2.0.30
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.9
pydantic[email]==2.7.0
Définition : Hashing = transformation irréversible d'une donnée en empreinte fixe. Bcrypt = algorithm de hashing de mots de passe avec "salt" intégré.
But : Stocker les mots de passe de manière sécurisée—si la BD est volée, les mots de passe ne peuvent pas être récupérés (contrairement à MD5 ou SHA qui sont trop rapides).
Pourquoi ici : MD5/SHA sont obsolètes : on peut tester des milliards de combinaisons par seconde. Bcrypt ajoute un délai intentionnel (cost factor) pour ralentir les attaques brute-force.
Définition : Une chaîne aléatoire ajoutée au mot de passe avant hashing, unique pour chaque utilisateur.
But : Empêcher les rainbow tables—deux utilisateurs avec le même mot de passe n'auront pas le même hash.
Pourquoi ici : Bcrypt gère le salt automatiquement, c'est caché mais c'est essentiel pour la sécurité.
Définition : Une classe pour valider et parser les données (requêtes HTTP, réponses JSON) avec les types Python.
But : Validation automatique (est-ce un email valide ? un nombre ?), conversion de type, génération de documentation OpenAPI.
Pourquoi ici : Si tu déclares email: EmailStr, Pydantic rejette les requêtes avec emails invalides avant que ton code ne les voit.
Définition : ORM (Object-Relational Mapping) = abstraction entre les objets Python et les tables SQL. SQLAlchemy = la plus populaire pour Python.
But : Écrire User.query.filter(...) au lieu de SQL brut—le code devient portable (change de PostgreSQL à MySQL = aucune modification).
Pourquoi ici : Les ORMs préviennent les SQL injections (les requêtes sont paramétrées) et rendent le code plus lisible.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = "sqlite:///./app.db" # PostgreSQL en prod
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
yield pour injecter la session dans FastAPI : avant yield = ouverture, après = fermeture garantie.
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Enum
from sqlalchemy.sql import func
from database import Base
import enum
class Role(enum.Enum):
USER = "user"
ADMIN = "admin"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True)
hashed_password = Column(String, nullable=False)
role = Column(Enum(Role), default=Role.USER)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
# ── Configuration ──
SECRET_KEY = "votre-cle-secrete-256-bits-minimum" # os.getenv("SECRET_KEY") en prod
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Gère le hashing et la vérification
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") # Extrait le token du header
# ── Gestion des mots de passe ──
def hash_password(password: str) -> str:
return pwd_context.hash(password) # Chaque appel génère un hash différent (salt aléatoire)
def verify_password(plain_password: str, hashed: str) -> bool:
return pwd_context.verify(plain_password, hashed) # Bcrypt extrait le salt du hash et compare
# ── Création de tokens JWT ──
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
payload = data.copy() # Copie pour ne pas modifier l'original
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=30)) # Expire dans 30 min
payload.update({
"exp": expire, # exp = claim standard JWT d'expiration
"type": "access" # Marque ce token comme access (vs refresh)
})
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) # Crée le JWT signé
def create_refresh_token(data: dict) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) # Expire dans 7 jours
payload.update({"exp": expire, "type": "refresh"})
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# ── Vérification du token et récupération de l'utilisateur ──
def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
token_type: str = payload.get("type")
if user_id is None or token_type != "access":
raise credentials_exception
except JWTError:
raise credentials_exception
return {"user_id": user_id, "role": payload.get("role")}
jwt.encode(payload, SECRET_KEY, algorithm="HS256") crée le JWT : le payload est sérialisé en JSON, encodé en Base64, puis signé avec la clé secrète. Le serveur n'a jamais besoin d'une base de données de tokens—il suffit de vérifier la signature. Si quelqu'un modifie le token ou la signature, jwt.decode rejette immédiatement.
jwt.decode(token, SECRET_KEY, algorithms=[...]) vérifie la signature. Si le token a été modifié ou est expiré, une exception JWTError est levée. On vérifie aussi que token_type == "access"—un refresh token ne peut pas être utilisé pour accéder aux routes.
# ── Décorateur de vérification de rôle ──
def require_role(*roles: str):
def checker(current_user = Depends(get_current_user)):
if current_user["role"] not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôle requis : {roles}"
)
return current_user
return checker
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from database import get_db
from models import User
from auth import hash_password, verify_password, create_access_token, create_refresh_token
from pydantic import BaseModel, EmailStr
router = APIRouter(prefix="/auth", tags=["Authentification"])
# ── Schémas ──
class RegisterRequest(BaseModel):
username: str
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer" # Standard : le client ajoute "Bearer" avant le token dans les headers
# ── POST /auth/register ──
# Pydantic valide automatiquement : email doit être valide, password présent, etc.
@router.post("/register", status_code=status.HTTP_201_CREATED)
def register(data: RegisterRequest, db: Session = Depends(get_db)):
# Vérifier que l'email n'existe pas déjà
if db.query(User).filter(User.email == data.email).first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email déjà utilisé"
)
user = User(
username=data.username,
email=data.email,
hashed_password=hash_password(data.password)
)
db.add(user)
db.commit()
db.refresh(user) # Actualise l'objet user depuis la BD (récupère l'ID auto-généré)
return {"message": "Compte créé avec succès", "user_id": user.id}
db.query(User).filter(...).first() = requête SQL SELECT. L'ORM SQLAlchemy construit la requête et retourne un objet Python User. db.add(user) prépare l'insertion, db.commit() l'exécute—pattern standard ORM. db.refresh(user) récupère les valeurs auto-générées (notamment id).
# ── POST /auth/login ──
@router.post("/login", response_model=TokenResponse)
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect"
)
if not user.is_active:
raise HTTPException(status_code=403, detail="Compte désactivé")
token_data = {"sub": str(user.id), "role": user.role.value} # "sub" = claim standard pour l'ID utilisateur
return TokenResponse(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data)
)
Le login utilise OAuth2PasswordRequestForm = FastAPI parse le formulaire d'authentification standard (username et password du body). On cherche l'utilisateur par email, vérifie le password avec bcrypt, puis génère les deux tokens. La réponse est sérialisée automatiquement en JSON par Pydantic.
# ── POST /auth/refresh ──
@router.post("/refresh", response_model=TokenResponse)
def refresh_token(refresh_token: str, db: Session = Depends(get_db)):
from jose import JWTError, jwt
from auth import SECRET_KEY, ALGORITHM
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Token invalide")
token_data = {"sub": payload["sub"], "role": payload["role"]}
return TokenResponse(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data)
)
except JWTError:
raise HTTPException(status_code=401, detail="Refresh token expiré ou invalide")
from fastapi import APIRouter, Depends
from auth import get_current_user, require_role
router = APIRouter(prefix="/users", tags=["Utilisateurs"])
# Route accessible par tout utilisateur connecté
@router.get("/me")
def get_profile(current_user = Depends(get_current_user)):
return {
"user_id": current_user["user_id"],
"role": current_user["role"],
"message": "Vous êtes connecté"
}
# Route réservée aux admins
@router.get("/")
def list_users(current_user = Depends(require_role("admin"))):
return ["Liste de tous les utilisateurs (admin seulement)"]
get_current_user avant d'exécuter le handler, injectant l'utilisateur authentifié. Si le token manque ou est invalide, une exception 401 est retournée avant que le handler ne s'exécute. require_role("admin") est un wrapper : il appelle get_current_user, puis vérifie que le rôle est "admin"—sinon une exception 403.
# 1. Inscription
curl -X POST http://localhost:8000/auth/register \
-H "Content-Type: application/json" \
-d '{"username":"alderi","email":"alderi@example.com","password":"MonMotDePasse123!"}'
# 2. Connexion → récupérer les tokens
TOKEN=$(curl -s -X POST http://localhost:8000/auth/login \
-d "username=alderi@example.com&password=MonMotDePasse123!" \
| python3 -c "import sys,json; print(json.load(sys.stdin)['access_token'])")
# 3. Accéder à une route protégée
curl http://localhost:8000/users/me \
-H "Authorization: Bearer $TOKEN"
# {"user_id":"1","role":"user","message":"Vous êtes connecté"}
# 4. Tester une route admin (rejetée)
curl http://localhost:8000/users/ \
-H "Authorization: Bearer $TOKEN"
# {"detail":"Accès refusé. Rôle requis : ('admin',)"}