cutt/cutt/security.py
2025-03-26 17:37:02 +01:00

399 lines
12 KiB
Python

from datetime import timedelta, timezone, datetime
from typing import Annotated
from fastapi import Depends, HTTPException, Request, Response, status
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, ValidationError
import jwt
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
from sqlmodel import Session, select
from cutt.db import PlayerTeamLink, Team, TokenDB, engine, Player
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from pydantic_settings import BaseSettings, SettingsConfigDict
from passlib.context import CryptContext
from sqlalchemy.exc import OperationalError
P = Player
class Config(BaseSettings):
secret_key: str = ""
access_token_expire_minutes: int = 15
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore"
)
config = Config()
class Token(BaseModel):
access_token: str
class TokenData(BaseModel):
username: str | None = None
scopes: list[str] = []
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
class CookieOAuth2(OAuth2PasswordBearer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def __call__(self, request: Request):
cookie_token = request.cookies.get("access_token")
if cookie_token:
return cookie_token
else:
header_token = await super().__call__(request)
if header_token:
return header_token
else:
raise HTTPException(status_code=401)
oauth2_scheme = CookieOAuth2(
tokenUrl="api/token",
scopes={
"analysis": "Access the results.",
"admin": "Maintain DB etc.",
},
)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(username: str | None):
if username:
try:
with Session(engine) as session:
return session.exec(
select(Player).where(Player.username == username)
).one_or_none()
except OperationalError:
return
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(
minutes=config.access_token_expire_minutes
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.secret_key, algorithm="HS256")
return encoded_jwt
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
security_scopes: SecurityScopes,
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
# access_token = request.cookies.get("access_token")
access_token = token
if not access_token:
raise credentials_exception
try:
payload = jwt.decode(access_token, config.secret_key, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(username=username, scopes=token_scopes)
except ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token expired",
headers={"WWW-Authenticate": authenticate_value},
)
except (InvalidTokenError, ValidationError):
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
allowed_scopes = set(user.scopes.split())
for scope in security_scopes.scopes:
if scope not in allowed_scopes or scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: Annotated[Player, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
class TeamScopedRequest(BaseModel):
user: Player
team_id: int
async def verify_team_scope(
team_id: int, user: Annotated[Player, Depends(get_current_active_user)]
):
allowed_scopes = set(user.scopes.split())
if f"team:{team_id}" not in allowed_scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
)
else:
return TeamScopedRequest(user=user, team_id=team_id)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], response: Response
) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
allowed_scopes = set(user.scopes.split())
requested_scopes = set(form_data.scopes)
access_token = create_access_token(
data={"sub": user.username, "scopes": list(allowed_scopes)}
)
response.set_cookie(
"access_token",
value=access_token,
httponly=True,
samesite="strict",
max_age=config.access_token_expire_minutes * 60,
)
return Token(access_token=access_token)
async def logout(response: Response):
response.set_cookie("access_token", "", expires=0, httponly=True, samesite="strict")
return {"message": "Successfully logged out"}
def set_password_token(username: str):
user = get_user(username)
if user:
expire = timedelta(days=30)
token = create_access_token(
data={
"sub": "set password",
"username": username,
"name": user.display_name,
},
expires_delta=expire,
)
return token
def register_token(team_id: int):
with Session(engine) as session:
team = session.exec(select(Team).where(Team.id == team_id)).one()
if team:
expire = timedelta(days=30)
token = create_access_token(
data={"sub": "register", "team_id": team_id, "name": team.name},
expires_delta=expire,
)
return token
def verify_one_time_token(token: str):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="could not validate token",
)
with Session(engine) as session:
token_in_db = session.exec(
select(TokenDB).where(TokenDB.token == token).where(TokenDB.used == False)
).one_or_none()
if token_in_db:
try:
payload = jwt.decode(token, config.secret_key, algorithms=["HS256"])
return payload
except ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="access token expired",
)
except (InvalidTokenError, ValidationError):
raise credentials_exception
elif session.exec(
select(TokenDB).where(TokenDB.token == token).where(TokenDB.used == True)
).one_or_none():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="token already used",
)
else:
raise credentials_exception
def invalidate_one_time_token(token: str):
with Session(engine) as session:
token_in_db = session.exec(select(TokenDB).where(TokenDB.token == token)).one()
token_in_db.used = True
session.add(token_in_db)
session.commit()
class FirstPassword(BaseModel):
token: str
password: str
async def set_first_password(req: FirstPassword):
payload = verify_one_time_token(req.token)
action: str = payload.get("sub")
if action != "set password":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="wrong type of token.",
)
username: str = payload.get("username")
with Session(engine) as session:
user = get_user(username)
if user:
user.hashed_password = get_password_hash(req.password)
session.add(user)
session.commit()
invalidate_one_time_token(req.token)
return Response("password set successfully", status_code=status.HTTP_200_OK)
class ChangedPassword(BaseModel):
current_password: str
new_password: str
async def change_password(
request: ChangedPassword,
user: Annotated[Player, Depends(get_current_active_user)],
):
if (
request.new_password
and user.hashed_password
and verify_password(request.current_password, user.hashed_password)
):
with Session(engine) as session:
user.hashed_password = get_password_hash(request.new_password)
session.add(user)
session.commit()
return PlainTextResponse(
"password changed successfully",
status_code=status.HTTP_200_OK,
media_type="text/plain",
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="wrong password",
)
class RegisterRequest(BaseModel):
token: str
team_id: int
display_name: str
username: str
password: str
email: str | None
number: str | None
async def register(req: RegisterRequest):
payload = verify_one_time_token(req.token)
action: str = payload.get("sub")
if action != "register":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="wrong type of token.",
)
team_id: int = payload.get("team_id")
if team_id != req.team_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="wrong team",
)
with Session(engine) as session:
if session.exec(select(P).where(P.username == req.username)).one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="username exists",
)
stmt = (
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == team_id, P.display_name == req.display_name)
)
if session.exec(stmt).one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="the name is already taken on this team",
)
team = session.exec(select(Team).where(Team.id == team_id)).one()
new_player = Player(
username=req.username,
display_name=req.display_name,
email=req.email if req.email else None,
number=req.number,
disabled=False,
teams=[team],
)
session.add(new_player)
session.commit()
invalidate_one_time_token(req.token)
return PlainTextResponse(f"added {new_player.display_name}")
async def read_player_me(
current_user: Annotated[Player, Depends(get_current_active_user)],
):
return current_user.model_dump(exclude={"hashed_password", "disabled"})
async def read_own_items(
current_user: Annotated[Player, Depends(get_current_active_user)],
):
return [{"item_id": "Foo", "owner": current_user.username}]