Files
cutt/cutt/analysis.py

518 lines
17 KiB
Python

import io
import itertools
import random
import base64
from typing import Annotated
from fastapi import APIRouter, HTTPException, Security, status
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from sqlmodel import Session, func, select
from sqlmodel.sql.expression import SelectOfScalar
from cutt.db import (
Chemistry,
MVPRanking,
Player,
PlayerTeamLink,
PlayerType,
Team,
engine,
)
import networkx as nx
import numpy as np
import matplotlib
from cutt.security import TeamScopedRequest, verify_team_scope
from cutt.demo import demo_players
matplotlib.use("agg")
import matplotlib.pyplot as plt
analysis_router = APIRouter(prefix="/analysis", tags=["analysis"])
C = Chemistry
R = MVPRanking
PT = PlayerType
P = Player
def sociogram_json():
nodes = []
necessary_nodes = set()
edges = []
players = {}
with Session(engine) as session:
for p in session.exec(select(P)).fetchall():
nodes.append({"id": p.display_name, "label": p.display_name})
players[p.id] = p.display_name
subquery = (
select(C.user, func.max(C.time).label("latest")).group_by(C.user).subquery()
)
statement2 = select(C).join(
subquery, (C.user == subquery.c.user) & (C.time == subquery.c.latest)
)
for c in session.exec(statement2):
# G.add_node(c.user)
necessary_nodes.add(c.user)
for p in [players[p_id] for p_id in c.love]:
# G.add_edge(c.user, p)
# p_id = session.exec(select(P.id).where(P.name == p)).one()
necessary_nodes.add(p)
edges.append({"from": players[c.user], "to": p, "relation": "likes"})
for p in [players[p_id] for p_id in c.hate]:
edges.append({"from": players[c.user], "to": p, "relation": "dislikes"})
# nodes = [n for n in nodes if n["name"] in necessary_nodes]
return JSONResponse({"nodes": nodes, "edges": edges})
def graph_json(
request: Annotated[TeamScopedRequest, Security(verify_team_scope)],
networkx_graph: bool = False,
):
nodes = []
edges = []
player_map = {}
if request.team_id == 42:
players = [request.user] + demo_players
random.seed(42)
for p in players:
nodes.append({"id": p.display_name, "label": p.display_name})
for p, other in itertools.permutations(players, 2):
value = random.random()
if value > 0.5:
edges.append(
{
"id": f"{p.display_name}->{other.display_name}",
"source": p.display_name,
"target": other.display_name,
"size": max(value, 0.3),
"data": {
"relation": 2,
"origSize": max(value, 0.3),
"origFill": "#bed4ff",
},
}
)
elif value < 0.1:
edges.append(
{
"id": f"{p.display_name}-x>{other.display_name}",
"source": p.display_name,
"target": other.display_name,
"size": 0.3,
"data": {"relation": 0, "origSize": 0.3, "origFill": "#ff7c7c"},
"fill": "#ff7c7c",
}
)
G = nx.DiGraph()
G.add_nodes_from([n["id"] for n in nodes])
G.add_weighted_edges_from(
[
(
e["source"],
e["target"],
e["size"] if e["data"]["relation"] == 2 else -e["size"],
)
for e in edges
]
)
in_degrees = G.in_degree(weight="weight")
nodes = [
dict(node, **{"data": {"inDegree": in_degrees[node["id"]]}})
for node in nodes
]
if networkx_graph:
return G
return JSONResponse({"nodes": nodes, "edges": edges})
with Session(engine) as session:
players = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.disabled == False)
).all()
if not players:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="no players found in your team",
)
for p in players:
player_map[p.id] = p.display_name
nodes.append({"id": p.display_name, "label": p.display_name})
subquery = (
select(C.user, func.max(C.time).label("latest"))
.where(C.team == request.team_id)
.group_by(C.user)
.subquery()
)
statement2 = select(C).join(
subquery, (C.user == subquery.c.user) & (C.time == subquery.c.latest)
)
for c in session.exec(statement2):
if c.user not in player_map:
continue
user = player_map[c.user]
for i, p_id in enumerate(c.love):
if p_id not in player_map:
continue
p = player_map[p_id]
weight = 0.9**i
edges.append(
{
"id": f"{user}->{p}",
"source": user,
"target": p,
"size": weight,
"data": {
"relation": 2,
"origSize": weight,
"origFill": "#bed4ff",
},
}
)
for p_id in c.hate:
if p_id not in player_map:
continue
p = player_map[p_id]
edges.append(
{
"id": f"{user}-x>{p}",
"source": user,
"target": p,
"size": 0.5,
"data": {"relation": 0, "origSize": 0.3, "origFill": "#ff7c7c"},
"fill": "#ff7c7c",
}
)
if not edges:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="no entries found"
)
G = nx.DiGraph()
G.add_nodes_from([n["id"] for n in nodes])
G.add_weighted_edges_from(
[
(
e["source"],
e["target"],
e["size"] if e["data"]["relation"] == 2 else -e["size"],
)
for e in edges
]
)
in_degrees = G.in_degree(weight="weight")
nodes = [
dict(node, **{"data": {"inDegree": in_degrees[node["id"]]}}) for node in nodes
]
if networkx_graph:
return G
return JSONResponse({"nodes": nodes, "edges": edges})
def sociogram_data(show: int | None = 2):
G = nx.DiGraph()
with Session(engine) as session:
players = {}
for p in session.exec(select(P)).fetchall():
G.add_node(p.display_name)
players[p.id] = p.display_name
subquery = (
select(C.user, func.max(C.time).label("latest")).group_by(C.user).subquery()
)
statement2 = (
select(C)
# .where(C.user.in_(["Kruse", "Franz", "ck"]))
.join(subquery, (C.user == subquery.c.user) & (C.time == subquery.c.latest))
)
for c in session.exec(statement2):
if show >= 1:
for i, p_id in enumerate(c.love):
p = players[p_id]
G.add_edge(c.user, p, group="love", rank=i, popularity=1 - 0.08 * i)
if show <= 1:
for i, p_id in enumerate(c.hate):
p = players[p_id]
G.add_edge(c.user, p, group="hate", rank=8, popularity=-0.16)
return G
class Params(BaseModel):
node_size: int | None = Field(default=2400, alias="nodeSize")
font_size: int | None = Field(default=10, alias="fontSize")
arrow_size: int | None = Field(default=20, alias="arrowSize")
edge_width: float | None = Field(default=1, alias="edgeWidth")
distance: float | None = 0.2
weighting: bool | None = True
popularity: bool | None = True
show: int | None = 2
ARROWSTYLE = {"love": "-|>", "hate": "-|>"}
EDGESTYLE = {"love": "-", "hate": ":"}
EDGECOLOR = {"love": "#404040", "hate": "#cc0000"}
async def render_sociogram(params: Params):
plt.figure(figsize=(16, 10), facecolor="none")
ax = plt.gca()
ax.set_facecolor("none") # Set the axis face color to none (transparent)
ax.axis("off") # Turn off axis ticks and frames
G = sociogram_data(show=params.show)
pos = nx.spring_layout(G, scale=2, k=params.distance, iterations=50, seed=None)
nodes = nx.draw_networkx_nodes(
G,
pos,
node_color=[
v for k, v in G.in_degree(weight="popularity" if params.weighting else None)
]
if params.popularity
else "#99ccff",
edgecolors="#404040",
linewidths=0,
# node_shape="8",
node_size=params.node_size,
cmap="coolwarm",
alpha=0.86,
)
if params.popularity:
cbar = plt.colorbar(nodes)
cbar.ax.set_xlabel("popularity")
nx.draw_networkx_labels(G, pos, font_size=params.font_size)
nx.draw_networkx_edges(
G,
pos,
arrows=True,
edge_color=[EDGECOLOR[G.edges()[*edge]["group"]] for edge in G.edges()],
arrowsize=params.arrow_size,
node_size=params.node_size,
width=params.edge_width,
style=[EDGESTYLE[G.edges()[*edge]["group"]] for edge in G.edges()],
arrowstyle=[ARROWSTYLE[G.edges()[*edge]["group"]] for edge in G.edges()],
connectionstyle="arc3,rad=0.12",
alpha=[1 - 0.08 * G.edges()[*edge]["rank"] for edge in G.edges()]
if params.weighting
else 1,
)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight", dpi=300, transparent=True)
buf.seek(0)
encoded_image = base64.b64encode(buf.read()).decode("UTF-8")
plt.close()
return {"image": encoded_image}
translate_tablename = {
R.__tablename__: "🏆",
C.__tablename__: "🧪",
PT.__tablename__: "🃏",
}
def last_submissions(
request: Annotated[TeamScopedRequest, Security(verify_team_scope)],
):
times = {}
with Session(engine) as session:
player_ids = session.exec(
select(P.id)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.disabled == False)
).all()
for survey in [C, PT, R]:
subquery = (
select(survey.user, func.max(survey.time).label("latest"))
.where(survey.team == request.team_id)
.where(survey.user.in_(player_ids))
.group_by(survey.user)
.subquery()
)
statement2 = select(survey).join(
subquery,
(survey.user == subquery.c.user) & (survey.time == subquery.c.latest),
)
for r in session.exec(statement2):
if r.time.date() not in times:
times[r.time.date()] = {}
times[r.time.date()][r.user] = (
times[r.time.date()].get(r.user, "")
+ " "
+ translate_tablename[survey.__tablename__]
)
return times
def mvp(
request: Annotated[TeamScopedRequest, Security(verify_team_scope)], mixed=False
):
if request.team_id == 42:
ranks = {}
random.seed(42)
players = [request.user] + demo_players
for p in players:
random.shuffle(players)
for i, p in enumerate(players):
ranks[p.id] = ranks.get(p.id, []) + [i + 1]
return [
[
{
"p_id": p_id,
"rank": f"{np.mean(v):.02f}",
"std": f"{np.std(v):.02f}",
"n": len(v),
}
for i, (p_id, v) in enumerate(ranks.items())
]
]
with Session(engine) as session:
players = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.disabled == False)
).all()
if not players:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
player_map = {p.id: p for p in players}
subquery = (
select(R.user, func.max(R.time).label("latest"))
.where(R.team == request.team_id)
.group_by(R.user)
.subquery()
)
statement2 = select(R).join(
subquery, (R.user == subquery.c.user) & (R.time == subquery.c.latest)
)
if mixed:
all_ranks = []
for gender in ["fmp", "mmp"]:
ranks = {}
for r in session.exec(statement2):
mvps = [
p_id
for p_id in r.mvps
if p_id in player_map and player_map[p_id].gender == gender
]
for i, p_id in enumerate(mvps):
p = player_map[p_id]
ranks[p_id] = ranks.get(p_id, []) + [i + 1]
all_ranks.append(ranks)
else:
ranks = {}
for r in session.exec(statement2):
for i, p_id in enumerate(r.mvps):
if p_id not in player_map:
continue
p = player_map[p_id]
ranks[p_id] = ranks.get(p_id, []) + [i + 1]
all_ranks = [ranks]
if not all_ranks:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="no entries found"
)
return [
[
{
"p_id": p_id,
"rank": f"{np.mean(v):.02f}",
"std": f"{np.std(v):.02f}",
"n": len(v),
}
for p_id, v in ranks.items()
]
for ranks in all_ranks
]
async def turnout(
request: Annotated[
TeamScopedRequest, Security(verify_team_scope, scopes=["analysis"])
],
):
player_map = {}
with Session(engine) as session:
players = session.exec(
select(P)
.join(PlayerTeamLink)
.join(Team)
.where(Team.id == request.team_id, P.disabled == False)
).all()
if not players:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
for p in players:
player_map[p.id] = p.display_name
subquery = (
select(C.user, func.max(C.time).label("latest"))
.where(C.team == request.team_id)
.group_by(C.user)
.subquery()
)
statement2 = select(C.user).join(
subquery, (C.user == subquery.c.user) & (C.time == subquery.c.latest)
)
chemistry_turnout = session.exec(statement2).all()
chemistry_missing = set(player_map) - set(chemistry_turnout)
chemistry_missing = [player_map[i] for i in chemistry_missing]
subquery = (
select(R.user, func.max(R.time).label("latest"))
.where(R.team == request.team_id)
.group_by(R.user)
.subquery()
)
statement2 = select(R.user).join(
subquery, (R.user == subquery.c.user) & (R.time == subquery.c.latest)
)
mvp_turnout = session.exec(statement2).all()
mvp_missing = set(player_map) - set(mvp_turnout)
mvp_missing = [player_map[i] for i in mvp_missing]
return JSONResponse(
{
"players": len(player_map),
"chemistry": {
"turnout": len(chemistry_turnout),
"missing": sorted(list(chemistry_missing)),
},
"MVP": {
"turnout": len(mvp_turnout),
"missing": sorted(list(mvp_missing)),
},
}
)
# analysis_router.add_api_route("/json", endpoint=sociogram_json, methods=["GET"])
analysis_router.add_api_route(
"/graph_json/{team_id}", endpoint=graph_json, methods=["GET"]
)
# analysis_router.add_api_route("/image", endpoint=render_sociogram, methods=["POST"])
analysis_router.add_api_route(
"/mvp/{team_id}",
endpoint=mvp,
methods=["GET"],
name="MVPs",
description="Request Most Valuable Players stats",
)
analysis_router.add_api_route("/turnout/{team_id}", endpoint=turnout, methods=["GET"])
analysis_router.add_api_route(
"/times/{team_id}", endpoint=last_submissions, methods=["GET"]
)
if __name__ == "__main__":
with Session(engine) as session:
statement: SelectOfScalar[P] = select(func.count(P.id))
print("players in DB: ", session.exec(statement).first())
G = sociogram_data()
pos = nx.spring_layout(G, scale=1, k=2, iterations=50, seed=42)