Files
OnlineJudge/problem/views/oj.py
2026-05-26 02:19:26 -06:00

340 lines
12 KiB
Python

import random
from datetime import datetime
from django.core.cache import cache
from django.db.models import BooleanField, Case, Count, Q, Value, When
from django.db.models.functions import ExtractYear
from account.decorators import check_contest_permission
from account.models import User
from contest.models import ContestRuleType
from submission.models import JudgeStatus, Submission
from utils.api import APIView
from utils.constants import CacheKey
from ..models import Problem, ProblemTag
from ..serializers import (
ProblemListSerializer,
ProblemSafeSerializer,
ProblemSerializer,
TagSerializer,
)
class ProblemTagAPI(APIView):
def get(self, request):
keyword = request.GET.get("keyword", "")
cache_key = f"{CacheKey.problem_tags}:{keyword}"
cached = cache.get(cache_key)
if cached is not None:
return self.success(cached)
qs = ProblemTag.objects
if keyword:
qs = ProblemTag.objects.filter(name__icontains=keyword)
tags = qs.annotate(problem_count=Count("problem")).filter(problem_count__gt=0)
data = TagSerializer(tags, many=True).data
cache.set(cache_key, data, 3600)
return self.success(data)
class PickOneAPI(APIView):
def get(self, request):
problems = Problem.objects.filter(contest_id__isnull=True, visible=True)
count = problems.count()
if count == 0:
return self.error("No problem to pick")
return self.success(problems[random.randint(0, count - 1)]._id)
class ProblemAPI(APIView):
@staticmethod
def _add_problem_status(request, queryset_values):
if request.user.is_authenticated:
profile = request.user.userprofile
acm_problems_status = profile.acm_problems_status.get("problems", {})
# paginate data
results = queryset_values.get("results")
if results is not None:
problems = results
else:
problems = [queryset_values]
for problem in problems:
problem["my_status"] = acm_problems_status.get(
str(problem["id"]), {}
).get("status")
def get(self, request):
# 问题详情页
problem_id = request.GET.get("problem_id")
if problem_id:
try:
problem = Problem.objects.select_related("created_by").get(
_id__iexact=problem_id, contest_id__isnull=True, visible=True
)
problem_data = ProblemSerializer(problem).data
self._add_problem_status(request, problem_data)
if request.user.is_authenticated:
failed_statuses = [
JudgeStatus.WRONG_ANSWER,
JudgeStatus.CPU_TIME_LIMIT_EXCEEDED,
JudgeStatus.REAL_TIME_LIMIT_EXCEEDED,
JudgeStatus.MEMORY_LIMIT_EXCEEDED,
JudgeStatus.RUNTIME_ERROR,
JudgeStatus.COMPILE_ERROR,
]
problem_data["my_failed_count"] = Submission.objects.filter(
user_id=request.user.id,
problem_id=problem.id,
result__in=failed_statuses,
).count()
else:
problem_data["my_failed_count"] = 0
return self.success(problem_data)
except Problem.DoesNotExist:
return self.error("Problem does not exist")
limit = request.GET.get("limit")
if not limit:
return self.error("Limit is needed")
problems = (
Problem.objects.select_related("created_by")
.prefetch_related("tags")
.filter(contest_id__isnull=True, visible=True)
.order_by("-create_time")
)
author = request.GET.get("author")
if author:
problems = problems.filter(created_by__username=author)
# 按照标签筛选
tag_text = request.GET.get("tag")
if tag_text:
problems = problems.filter(tags__name=tag_text)
# 搜索的情况
keyword = request.GET.get("keyword", "").strip()
if keyword:
problems = problems.filter(
Q(title__icontains=keyword) | Q(_id__icontains=keyword)
)
# 难度筛选
difficulty = request.GET.get("difficulty")
if difficulty:
problems = problems.filter(difficulty=difficulty)
# 排序
sort = request.GET.get("sort")
if sort == "flowchart":
problems = problems.order_by("-allow_flowchart", "-show_flowchart", "-create_time")
elif sort == "ast":
problems = problems.annotate(
_has_ast=Case(
When(ast_rules__isnull=False, then=Value(True)),
default=Value(False),
output_field=BooleanField(),
)
).order_by("-_has_ast", "-create_time")
elif sort:
problems = problems.order_by(sort)
# 根据profile 为做过的题目添加标记
data = self.paginate_data(request, problems, ProblemListSerializer)
self._add_problem_status(request, data)
return self.success(data)
class ContestProblemAPI(APIView):
def _add_problem_status(self, request, queryset_values):
if request.user.is_authenticated:
profile = request.user.userprofile
if self.contest.rule_type == ContestRuleType.ACM:
problems_status = profile.acm_problems_status.get(
"contest_problems", {}
)
else:
problems_status = profile.oi_problems_status.get("contest_problems", {})
for problem in queryset_values:
problem["my_status"] = problems_status.get(str(problem["id"]), {}).get(
"status"
)
@check_contest_permission(check_type="problems")
def get(self, request):
problem_id = request.GET.get("problem_id")
if problem_id:
try:
problem = Problem.objects.select_related("created_by").get(
_id__iexact=problem_id, contest=self.contest, visible=True
)
except Problem.DoesNotExist:
return self.error("Problem does not exist.")
if self.contest.problem_details_permission(request.user):
problem_data = ProblemSerializer(problem).data
self._add_problem_status(
request,
[
problem_data,
],
)
else:
problem_data = ProblemSafeSerializer(problem).data
return self.success(problem_data)
contest_problems = Problem.objects.select_related("created_by").prefetch_related("tags").filter(
contest=self.contest, visible=True
)
if self.contest.problem_details_permission(request.user):
data = ProblemListSerializer(contest_problems, many=True).data
self._add_problem_status(request, data)
else:
data = ProblemSafeSerializer(contest_problems, many=True).data
return self.success(data)
class ProblemSolvedPeopleCount(APIView):
def get(self, request):
problem_id = request.GET.get("problem_id")
rate = "0"
if not request.user.is_authenticated:
return self.success(rate)
submission_count = Submission.objects.filter(
user_id=request.user.id,
problem_id=problem_id,
result__in=[JudgeStatus.ACCEPTED, JudgeStatus.AST_CHECK_FAILED],
).count()
if submission_count == 0:
return self.success(rate)
today = datetime.today()
years_ago = datetime(today.year - 2, today.month, today.day, 0, 0)
total_count = User.objects.filter(
is_disabled=False, last_login__gte=years_ago
).count()
accepted_count = Submission.objects.filter(
problem_id=problem_id,
result__in=[JudgeStatus.ACCEPTED, JudgeStatus.AST_CHECK_FAILED],
create_time__gte=years_ago,
).aggregate(user_count=Count("user_id", distinct=True))["user_count"]
if accepted_count < total_count:
rate = "%.2f" % ((total_count - accepted_count) / total_count * 100)
else:
rate = "0"
return self.success(rate)
class SimilarProblemAPI(APIView):
def get(self, request):
problem_display_id = request.GET.get("problem_id")
if not problem_display_id:
return self.error("problem_id is required")
try:
problem = Problem.objects.get(_id__iexact=problem_display_id, contest__isnull=True)
except Problem.DoesNotExist:
return self.error("Problem not found")
tag_ids = list(problem.tags.values_list("id", flat=True))
if not tag_ids:
return self.success([])
exclude_ids = [problem_display_id]
if request.user.is_authenticated:
profile = request.user.userprofile
ac_display_ids = [
v["_id"]
for v in profile.acm_problems_status.get("problems", {}).values()
if v.get("status") == JudgeStatus.ACCEPTED
]
exclude_ids.extend(ac_display_ids)
similar = (
Problem.objects.select_related("created_by")
.prefetch_related("tags")
.filter(tags__in=tag_ids, visible=True, contest__isnull=True)
.exclude(_id__in=exclude_ids)
.distinct()
.order_by("difficulty")[:5]
)
return self.success(ProblemListSerializer(similar, many=True).data)
class ProblemAuthorAPI(APIView):
def get(self, request):
show_all = request.GET.get("all", "0") == "1"
cache_key = f"{CacheKey.problem_authors}{'_all' if show_all else '_only_visible'}"
cached_data = cache.get(cache_key)
if cached_data:
return self.success(cached_data)
problem_filter = {"contest_id__isnull": True, "created_by__is_disabled": False}
if not show_all:
problem_filter["visible"] = True
authors = (
Problem.objects.filter(**problem_filter)
.values("created_by__username")
.annotate(problem_count=Count("id"))
.order_by("-problem_count")
)
result = [
{
"username": author["created_by__username"],
"problem_count": author["problem_count"],
}
for author in authors
]
cache.set(cache_key, result, 7200)
class ProblemYearlyACRateAPI(APIView):
def get(self, request):
problem_id = request.GET.get("problem_id")
if not problem_id:
return self.error("problem_id is required")
cache_key = f"{CacheKey.problem_yearly_ac}:{problem_id}"
cached = cache.get(cache_key)
if cached is not None:
return self.success(cached)
try:
problem = Problem.objects.get(
_id__iexact=problem_id, contest_id__isnull=True, visible=True
)
except Problem.DoesNotExist:
return self.error("Problem does not exist")
rows = (
Submission.objects.filter(
problem_id=problem.id,
contest_id__isnull=True,
)
.exclude(result__in=[JudgeStatus.PENDING, JudgeStatus.JUDGING])
.annotate(year=ExtractYear("create_time"))
.values("year")
.annotate(
total=Count("id"),
accepted=Count("id", filter=Q(result__in=[JudgeStatus.ACCEPTED, JudgeStatus.AST_CHECK_FAILED])),
)
.order_by("year")
)
data = [
{
"year": row["year"],
"total": row["total"],
"accepted": row["accepted"],
"ac_rate": round(row["accepted"] / row["total"] * 100, 2)
if row["total"] > 0
else 0.0,
}
for row in rows
]
cache.set(cache_key, data, 3600)
return self.success(data)