Files
OnlineJudge/contest/views/oj.py
2026-06-04 05:31:23 -06:00

205 lines
7.8 KiB
Python

import io
import xlsxwriter
from asgiref.sync import sync_to_async
from django.http import HttpResponse
from django.utils.timezone import now
from account.decorators import (
check_contest_password,
check_contest_permission,
login_required,
)
from account.models import AdminType
from problem.models import Problem
from utils.api import AsyncAPIView, validate_serializer
from utils.constants import CONTEST_PASSWORD_SESSION_KEY, ContestStatus
from utils.shortcuts import check_is_id, datetime2str
from ..models import ACMContestRank, Contest, ContestAnnouncement
from ..serializers import ACMContestRankSerializer, ContestAnnouncementSerializer, ContestPasswordVerifySerializer, ContestSerializer
# DEPRECATED: 前端未调用 (2026-05-26)
class ContestAnnouncementListAPI(AsyncAPIView):
@check_contest_permission(check_type="announcements")
async def get(self, request):
contest_id = request.GET.get("contest_id")
if not contest_id:
return self.error("Invalid parameter, contest_id is required")
qs = ContestAnnouncement.objects.select_related("created_by").filter(
contest_id=contest_id, visible=True
)
max_id = request.GET.get("max_id")
if max_id:
qs = qs.filter(id__gt=max_id)
data = await self.async_serialize_data(ContestAnnouncementSerializer, [item async for item in qs], many=True)
return self.success(data)
class ContestAPI(AsyncAPIView):
async def get(self, request):
id = request.GET.get("id")
if not id or not check_is_id(id):
return self.error("Invalid parameter, id is required")
try:
contest = await (
Contest.objects.select_related("created_by")
.filter(id=id, visible=True)
.afirst()
)
if contest is None:
raise Contest.DoesNotExist
except Contest.DoesNotExist:
return self.error("Contest does not exist")
data = await self.async_serialize_data(ContestSerializer, contest)
data["now"] = datetime2str(now())
return self.success(data)
class ContestListAPI(AsyncAPIView):
async def get(self, request):
contests = Contest.objects.select_related("created_by").filter(visible=True)
keyword = request.GET.get("keyword")
status = request.GET.get("status")
tag = request.GET.get("tag")
if keyword:
contests = contests.filter(title__icontains=keyword)
if tag:
contests = contests.filter(tag=tag)
if status:
cur = now()
if status == ContestStatus.CONTEST_NOT_START:
contests = contests.filter(start_time__gt=cur)
elif status == ContestStatus.CONTEST_ENDED:
contests = contests.filter(end_time__lt=cur)
else:
contests = contests.filter(start_time__lte=cur, end_time__gte=cur)
return self.success(await self.async_paginate_data(request, contests, ContestSerializer))
class ContestPasswordVerifyAPI(AsyncAPIView):
@login_required
@validate_serializer(ContestPasswordVerifySerializer)
async def post(self, request):
data = request.data
try:
contest = await Contest.objects.aget(
id=data["contest_id"], visible=True, password__isnull=False
)
except Contest.DoesNotExist:
return self.error("Contest does not exist")
if not check_contest_password(data["password"], contest.password):
return self.error("Wrong password or password expired")
if CONTEST_PASSWORD_SESSION_KEY not in request.session:
request.session[CONTEST_PASSWORD_SESSION_KEY] = {}
request.session[CONTEST_PASSWORD_SESSION_KEY][contest.id] = data["password"]
request.session.modified = True
return self.success(True)
class ContestAccessAPI(AsyncAPIView):
@login_required
async def get(self, request):
contest_id = request.GET.get("contest_id")
if not contest_id:
return self.error()
try:
contest = await Contest.objects.aget(
id=contest_id, visible=True, password__isnull=False
)
except Contest.DoesNotExist:
return self.error("Contest does not exist")
session_pass = request.session.get(CONTEST_PASSWORD_SESSION_KEY, {}).get(
contest.id
)
return self.success(
{"access": check_contest_password(session_pass, contest.password)}
)
class ContestRankAPI(AsyncAPIView):
def get_rank(self):
return (
ACMContestRank.objects.filter(
contest=self.contest,
user__admin_type__in=[AdminType.REGULAR_USER, AdminType.STUDENT_ADMIN],
user__is_disabled=False,
)
.select_related("user")
.order_by("-accepted_number", "total_time")
)
def column_string(self, n):
string = ""
while n > 0:
n, remainder = divmod(n - 1, 26)
string = chr(65 + remainder) + string
return string
def _build_xlsx(self, data, contest_problems):
problem_id_to_col = {p.id: i for i, p in enumerate(contest_problems)}
f = io.BytesIO()
workbook = xlsxwriter.Workbook(f)
worksheet = workbook.add_worksheet()
worksheet.write("A1", "User ID")
worksheet.write("B1", "Username")
worksheet.write("C1", "Real Name")
worksheet.write("D1", "AC")
worksheet.write("E1", "Total Submission")
worksheet.write("F1", "Total Time")
for i, p in enumerate(contest_problems):
worksheet.write(self.column_string(7 + i) + "1", p.title)
for index, item in enumerate(data):
worksheet.write_string(index + 1, 0, str(item["user"]["id"]))
worksheet.write_string(index + 1, 1, item["user"]["username"])
worksheet.write_string(
index + 1, 2, item["user"]["real_name"] or ""
)
worksheet.write_string(index + 1, 3, str(item["accepted_number"]))
worksheet.write_string(index + 1, 4, str(item["submission_number"]))
worksheet.write_string(index + 1, 5, str(item["total_time"]))
for k, v in item["submission_info"].items():
worksheet.write_string(
index + 1, 6 + problem_id_to_col[int(k)], str(v["is_ac"])
)
workbook.close()
f.seek(0)
return f.read()
@check_contest_permission(check_type="ranks")
async def get(self, request):
download_csv = request.GET.get("download_csv")
is_contest_admin = (
request.user.is_authenticated
and request.user.is_contest_admin(self.contest)
)
qs = self.get_rank()
if download_csv:
rank_list = [item async for item in qs]
data = await self.async_serialize_data(
ACMContestRankSerializer, rank_list, many=True, is_contest_admin=is_contest_admin
)
contest_problems = await sync_to_async(
lambda: list(Problem.objects.filter(contest=self.contest, visible=True).order_by("_id"))
)()
xlsx_bytes = await sync_to_async(self._build_xlsx)(data, contest_problems)
response = HttpResponse(xlsx_bytes)
response["Content-Disposition"] = (
f"attachment; filename=content-{self.contest.id}-rank.xlsx"
)
response["Content-Type"] = "application/xlsx"
return response
page_qs = await self.async_paginate_data(request, qs)
page_qs["results"] = await self.async_serialize_data(
ACMContestRankSerializer,
page_qs["results"], many=True, is_contest_admin=is_contest_admin
)
return self.success(page_qs)