feat: roll back refresh tokens, use single token only

This commit is contained in:
2025-03-07 18:24:25 +01:00
parent 8b092fed51
commit d3f5c3cb82
5 changed files with 62 additions and 71 deletions

View File

@@ -1,9 +1,9 @@
from datetime import timedelta, timezone, datetime
from typing import Annotated
from fastapi import Depends, HTTPException, Response, status
from fastapi import Depends, HTTPException, Request, Response, status
from pydantic import BaseModel, ValidationError
import jwt
from jwt.exceptions import InvalidTokenError
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
from sqlmodel import Session, select
from db import engine, User
from fastapi.security import (
@@ -18,7 +18,7 @@ from sqlalchemy.exc import OperationalError
class Config(BaseSettings):
secret_key: str = ""
access_token_expire_minutes: int = 30
access_token_expire_minutes: int = 15
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore"
)
@@ -29,7 +29,6 @@ config = Config()
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
@@ -81,15 +80,15 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
expire = datetime.now(timezone.utc) + timedelta(
seconds=config.access_token_expire_minutes
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.secret_key, algorithm="HS256")
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]
):
async def get_current_user(security_scopes: SecurityScopes, request: Request):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
@@ -99,13 +98,22 @@ async def get_current_user(
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
access_token = request.cookies.get("access_token")
if not access_token:
raise credentials_exception
try:
payload = jwt.decode(token, config.secret_key, algorithms=["HS256"])
payload = jwt.decode(access_token, config.secret_key, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(username=username, scopes=token_scopes)
except ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": authenticate_value},
)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(username=token_data.username)
@@ -139,17 +147,24 @@ async def login_for_access_token(
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=config.access_token_expire_minutes)
allowed_scopes = set(user.scopes.split())
requested_scopes = set(form_data.scopes)
access_token = create_access_token(
data={"sub": user.username, "scopes": list(allowed_scopes)},
expires_delta=access_token_expires,
data={"sub": user.username, "scopes": list(allowed_scopes)}
)
response.set_cookie(
"Authorization", value=f"Bearer {access_token}", httponly=True, samesite="none"
"access_token",
value=access_token,
httponly=True,
samesite="strict",
max_age=15,
)
return Token(access_token=access_token, token_type="bearer")
return Token(access_token=access_token)
async def logout(response: Response):
response.set_cookie("access_token", "", expires=0, httponly=True, samesite="strict")
return
async def read_users_me(