Files
cutt/cutt/player.py
2026-01-03 13:59:53 +01:00

344 lines
10 KiB
Python

from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Security, status
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel
from sqlmodel import Session, select
from cutt.db import Player, PlayerTeamLink, Team, engine
from cutt.security import (
TeamScopedRequest,
change_password,
get_current_active_user,
read_player_me,
verify_team_scope,
)
from cutt.demo import demo_players
P = Player
player_router = APIRouter(prefix="/player", tags=["player"])
def update_team_manager(scopes_str: str, team_id: int, state: bool = True):
scopes = set(scopes_str.split())
if state:
scopes.add(f"team:{team_id}")
else:
scopes.remove(f"team:{team_id}")
print("new scopestr", " ".join(scopes))
return " ".join(sorted(scopes))
class PlayerRequest(BaseModel):
display_name: str
username: str
gender: str | None
number: str
email: str | None
is_manager: bool | None
class AddPlayerRequest(PlayerRequest): ...
DEMO_TEAM_REQUEST = HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="DEMO Team, nothing happens",
)
def add_player(
r: AddPlayerRequest,
request: Annotated[TeamScopedRequest, Depends(verify_team_scope)],
):
with Session(engine) as session:
if session.exec(select(P).where(P.username == r.username)).one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="username not available"
)
stmt = (
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.display_name == r.display_name)
)
if session.exec(stmt).one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="the name is already taken on this team",
)
team = session.exec(select(Team).where(Team.id == request.team_id)).one()
new_player = Player(
username=r.username,
display_name=r.display_name,
gender=r.gender if r.gender else None,
email=r.email if r.email else None,
number=r.number,
disabled=False,
teams=[team],
)
session.add(new_player)
session.commit()
return PlainTextResponse(f"added {new_player.display_name}")
class ModifyPlayerRequest(PlayerRequest):
id: int
def modify_player(
r: ModifyPlayerRequest,
request: Annotated[TeamScopedRequest, Depends(verify_team_scope)],
):
if request.team_id == 42:
raise DEMO_TEAM_REQUEST
with Session(engine) as session:
player = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.id == r.id, P.username == r.username)
).one_or_none()
if player:
print(r)
player.display_name = r.display_name.strip()
player.number = r.number.strip()
player.gender = r.gender.strip() if r.gender else None
player.email = r.email.strip() if r.email else None
if r.is_manager is not None:
player.scopes = update_team_manager(
player.scopes, request.team_id, r.is_manager
)
session.add(player)
session.commit()
return PlainTextResponse("modification successful")
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="no such player found in your team",
)
class DisablePlayerRequest(BaseModel):
player_id: int
def remove_player_from_team(
r: DisablePlayerRequest,
request: Annotated[TeamScopedRequest, Depends(verify_team_scope)],
):
if request.team_id == 42:
raise DEMO_TEAM_REQUEST
with Session(engine) as session:
player = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.id == r.player_id)
).one_or_none()
if player:
team = session.exec(select(Team).where(Team.id == request.team_id)).one()
player.teams.remove(team)
session.add(team)
session.commit()
return PlainTextResponse(f"removed {player.display_name} from {team.name}.")
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="no such player found in your team",
)
def disable_player_team(
r: DisablePlayerRequest,
request: Annotated[TeamScopedRequest, Depends(verify_team_scope)],
):
if request.team_id == 42:
raise DEMO_TEAM_REQUEST
with Session(engine) as session:
player = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.id == r.player_id)
).one_or_none()
if player:
player.disabled = True
session.add(player)
session.commit()
return PlainTextResponse(f"disabled {player.display_name}")
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="no such player found in your team",
)
def disable_player(player_id: int):
with Session(engine) as session:
player = session.exec(select(P).where(P.id == player_id)).one_or_none()
if player:
player.disabled = True
session.add(player)
session.commit()
return PlainTextResponse(f"disabled {player.display_name}")
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="no such player found in your team",
)
def add_player_to_team(player_id: int, team_id: int):
with Session(engine) as session:
player = session.exec(select(P).where(P.id == player_id)).one()
team = session.exec(select(Team).where(Team.id == team_id)).one()
if player and team:
team.players.append(player)
session.add(team)
session.commit()
return PlainTextResponse(
f"added {player.display_name} ({player.username}) to {team.name}"
)
def add_players(players: list[P]):
with Session(engine) as session:
for player in players:
session.add(player)
session.commit()
async def list_all_players():
with Session(engine) as session:
return session.exec(select(P)).all()
async def list_players(
team_id: int, user: Annotated[Player, Depends(get_current_active_user)]
):
if team_id == 42:
return [
user.model_dump(
include={"id", "display_name", "gender", "username", "number", "email"}
)
] + demo_players
allowed_scopes = set(user.scopes.split())
team_manager_scope = f"team:{team_id}"
is_team_manager = team_manager_scope in allowed_scopes
with Session(engine) as session:
current_user = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == team_id, P.disabled == False, P.id == user.id)
).one_or_none()
if not current_user and not is_team_manager:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="you're not in this team",
)
players = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == team_id, P.disabled == False)
.order_by(P.display_name)
).all()
if players:
players_dump = []
for player in players:
if not player.disabled:
player_dump = player.model_dump(
include={
"id",
"display_name",
"username",
"gender",
"number",
}
)
if is_team_manager:
player_dump["email"] = player.email
if team_manager_scope in player.scopes:
player_dump["is_manager"] = True
players_dump.append(player_dump)
return players_dump
def read_teams_me(user: Annotated[P, Depends(get_current_active_user)]):
allowed_scopes = set(user.scopes.split())
team_ids = {
int(scope.split(":")[1])
for scope in allowed_scopes
if scope.startswith("team:")
}
with Session(engine) as session:
member_in = [p.teams for p in session.exec(select(P).where(P.id == user.id))][0]
team_ids -= {team.id for team in member_in}
team_manager_in = session.exec(select(Team).where(Team.id.in_(team_ids))).all()
return (
member_in
+ list(team_manager_in)
+ [
{
"country": "nowhere",
"id": 42,
"location": "everywhere",
"name": "DEMO",
}
]
)
player_router.add_api_route(
"/{team_id}",
endpoint=add_player,
methods=["POST"],
)
player_router.add_api_route(
"/{team_id}",
endpoint=modify_player,
methods=["PUT"],
)
player_router.add_api_route(
"/{team_id}",
endpoint=remove_player_from_team,
methods=["DELETE"],
)
player_router.add_api_route(
"/{team_id}/list",
endpoint=list_players,
methods=["GET"],
)
player_router.add_api_route(
"/list",
endpoint=list_all_players,
methods=["GET"],
dependencies=[Security(get_current_active_user, scopes=["admin"])],
)
player_router.add_api_route(
"/add/{team_id}/{player_id}",
endpoint=add_player_to_team,
methods=["GET"],
dependencies=[Security(get_current_active_user, scopes=["admin"])],
)
player_router.add_api_route(
"/disable/{player_id}",
endpoint=disable_player,
methods=["DELETE"],
dependencies=[Security(get_current_active_user, scopes=["admin"])],
)
player_router.add_api_route("/me", endpoint=read_player_me, methods=["GET"])
player_router.add_api_route("/me/teams", endpoint=read_teams_me, methods=["GET"])
player_router.add_api_route(
"/change_password", endpoint=change_password, methods=["POST"]
)