diff --git a/.env b/.env deleted file mode 100644 index 585477b..0000000 --- a/.env +++ /dev/null @@ -1 +0,0 @@ -VITE_BASE_URL= diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ea73ad1 --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +VITE_BASE_URL= +SECRET_KEY="tg07Cp0daCpcbeeFw+lI4RG2lEuT8oq5xOwB5ETs9YaJTylG1euC4ogjTK4zym/d" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 diff --git a/analysis.py b/analysis.py index 38f7b2d..9f0d88e 100644 --- a/analysis.py +++ b/analysis.py @@ -1,20 +1,130 @@ -from datetime import datetime +from datetime import timedelta, timezone, datetime import io +from typing import Annotated import base64 -from fastapi import APIRouter +from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import JSONResponse from pydantic import BaseModel, Field +import jwt +from jwt.exceptions import InvalidTokenError from sqlmodel import Session, func, select from sqlmodel.sql.expression import SelectOfScalar -from db import Chemistry, Player, engine +from db import Chemistry, Player, engine, User import networkx as nx - +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from pydantic_settings import BaseSettings, SettingsConfigDict +from passlib.context import CryptContext import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt -analysis_router = APIRouter(prefix="/analysis") + +class Config(BaseSettings): + secret_key: str = "" + access_token_expire_minutes: int = 30 + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", extra="ignore" + ) + + +config = Config() + + +class Token(BaseModel): + access_token: str + token_type: str + + +class TokenData(BaseModel): + username: str | None = None + + +pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") + + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +analysis_router = APIRouter(prefix="/analysis", dependencies=[Depends(oauth2_scheme)]) + + +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + + +def get_password_hash(password): + return pwd_context.hash(password) + + +def get_user(username: str): + with Session(engine) as session: + return session.exec(select(User).where(User.username == username)).one_or_none() + + +def authenticate_user(username: str, password: str): + user = get_user(username) + if not user: + return False + if not verify_password(password, user.hashed_password): + return False + return user + + +def create_access_token(data: dict, expires_delta: timedelta | None = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.now(timezone.utc) + expires_delta + else: + expire = datetime.now(timezone.utc) + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, config.secret_key, algorithm="HS256") + return encoded_jwt + + +async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, config.secret_key, algorithms=["HS256"]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) + except InvalidTokenError: + raise credentials_exception + user = get_user(fake_users_db, username=token_data.username) + if user is None: + raise credentials_exception + return user + + +async def get_current_active_user( + current_user: Annotated[User, Depends(get_current_user)], +): + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive user") + return current_user + + +@analysis_router.post("/token") +async def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], +) -> Token: + user = authenticate_user(form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=config.access_token_expire_minutes) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + return Token(access_token=access_token, token_type="bearer") C = Chemistry diff --git a/db.py b/db.py index 2ba4ce4..c5d9c2b 100644 --- a/db.py +++ b/db.py @@ -1,5 +1,13 @@ from datetime import datetime, timezone -from sqlmodel import ARRAY, Column, Relationship, SQLModel, Field, create_engine, String +from sqlmodel import ( + ARRAY, + Column, + Relationship, + SQLModel, + Field, + create_engine, + String, +) with open("db.secrets", "r") as f: db_secrets = f.readline().strip() @@ -54,4 +62,13 @@ class MVPRanking(SQLModel, table=True): mvps: list[str] = Field(sa_column=Column(ARRAY(String))) +class User(SQLModel, table=True): + username: str = Field(default=None, primary_key=True) + email: str | None = None + full_name: str | None = None + disabled: bool | None = None + hashed_password: str | None = None + player_id: int | None = Field(default=None, foreign_key="player.id") + + SQLModel.metadata.create_all(engine) diff --git a/main.py b/main.py index 1cc2085..301239f 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from sqlmodel import ( ) from fastapi.middleware.cors import CORSMiddleware from analysis import analysis_router +from security import login_for_access_token, read_users_me, read_own_items app = FastAPI(title="cutt") @@ -92,5 +93,8 @@ class SPAStaticFiles(StaticFiles): api_router.include_router(player_router) api_router.include_router(team_router) api_router.include_router(analysis_router) +api_router.add_api_route("/token", endpoint=login_for_access_token, methods=["POST"]) +api_router.add_api_route("/users/me/", endpoint=read_users_me, methods=["GET"]) +api_router.add_api_route("/users/me/items/", endpoint=read_own_items, methods=["GET"]) app.include_router(api_router) app.mount("/", SPAStaticFiles(directory="dist", html=True), name="site") diff --git a/pyproject.toml b/pyproject.toml index c0cfe4b..c4c068c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,11 +6,14 @@ author = "julius" readme = "README.md" requires-python = ">=3.13" dependencies = [ + "argon2-cffi>=23.1.0", "fastapi[standard]>=0.115.7", "matplotlib>=3.10.0", "networkx>=3.4.2", + "passlib>=1.7.4", "psycopg>=3.2.4", "pydantic-settings>=2.7.1", + "pyjwt>=2.10.1", "pyqt6>=6.8.0", "sqlmodel>=0.0.22", "uvicorn>=0.34.0", diff --git a/security.py b/security.py new file mode 100644 index 0000000..df250a1 --- /dev/null +++ b/security.py @@ -0,0 +1,130 @@ +from datetime import timedelta, timezone, datetime +from typing import Annotated +from fastapi import Depends, HTTPException, status +from pydantic import BaseModel +import jwt +from jwt.exceptions import InvalidTokenError +from sqlmodel import Session, select +from db import engine, User +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from pydantic_settings import BaseSettings, SettingsConfigDict +from passlib.context import CryptContext + + +class Config(BaseSettings): + secret_key: str = "" + access_token_expire_minutes: int = 30 + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", extra="ignore" + ) + + +config = Config() + + +class Token(BaseModel): + access_token: str + token_type: str + + +class TokenData(BaseModel): + username: str | None = None + + +pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") + + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/token") + + +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + + +def get_password_hash(password): + return pwd_context.hash(password) + + +def get_user(username: str | None): + if username: + with Session(engine) as session: + return session.exec( + select(User).where(User.username == username) + ).one_or_none() + + +def authenticate_user(username: str, password: str): + user = get_user(username) + if not user: + return False + if not verify_password(password, user.hashed_password): + return False + return user + + +def create_access_token(data: dict, expires_delta: timedelta | None = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.now(timezone.utc) + expires_delta + else: + expire = datetime.now(timezone.utc) + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, config.secret_key, algorithm="HS256") + return encoded_jwt + + +async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, config.secret_key, algorithms=["HS256"]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) + except InvalidTokenError: + raise credentials_exception + user = get_user(username=token_data.username) + if user is None: + raise credentials_exception + return user + + +async def get_current_active_user( + current_user: Annotated[User, Depends(get_current_user)], +): + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive user") + return current_user + + +async def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], +) -> Token: + user = authenticate_user(form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=config.access_token_expire_minutes) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + return Token(access_token=access_token, token_type="bearer") + + +async def read_users_me( + current_user: Annotated[User, Depends(get_current_active_user)], +): + return current_user + + +async def read_own_items( + current_user: Annotated[User, Depends(get_current_active_user)], +): + return [{"item_id": "Foo", "owner": current_user.username}]