cutt/security.py

134 lines
3.9 KiB
Python
Raw Normal View History

2025-02-14 20:10:21 +01:00
from datetime import timedelta, timezone, datetime
from typing import Annotated
from fastapi import Depends, HTTPException, Response, status
2025-02-14 20:10:21 +01:00
from pydantic import BaseModel
import jwt
from jwt.exceptions import InvalidTokenError
from sqlmodel import Session, select
from db import engine, User
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic_settings import BaseSettings, SettingsConfigDict
from passlib.context import CryptContext
class Config(BaseSettings):
secret_key: str = ""
access_token_expire_minutes: int = 30
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore"
)
config = Config()
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
2025-02-16 16:38:55 +01:00
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/token")
2025-02-14 20:10:21 +01:00
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(username: str | None):
if username:
with Session(engine) as session:
return session.exec(
select(User).where(User.username == username)
).one_or_none()
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.secret_key, algorithm="HS256")
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, config.secret_key, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], response: Response
2025-02-14 20:10:21 +01:00
) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=config.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
response.set_cookie(
"Authorization", value=f"Bearer {access_token}", httponly=True, samesite="none"
)
2025-02-14 20:10:21 +01:00
return Token(access_token=access_token, token_type="bearer")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return [{"item_id": "Foo", "owner": current_user.username}]