import hashlib import json import logging from datetime import timedelta from urllib.parse import urljoin import requests from django.db import IntegrityError, transaction from django.db.models import F from django.utils import timezone from account.models import User from conf.models import JudgeServer from contest.models import ACMContestRank, ContestStatus from options.options import SysOptions from problem.models import Problem, ProblemRuleType from problem.utils import parse_problem_template from submission.models import JudgeStatus, Submission, is_accepted from utils.cache import cache from utils.constants import CacheKey from utils.websocket import push_submission_update logger = logging.getLogger(__name__) # 继续处理在队列中的问题 def process_pending_task(): if cache.llen(CacheKey.waiting_queue): # 防止循环引入 from judge.tasks import judge_task tmp_data = cache.rpop(CacheKey.waiting_queue) if tmp_data: data = json.loads(tmp_data.decode("utf-8")) judge_task.send(**data) class ChooseJudgeServer: def __init__(self): self.server = None def __enter__(self) -> [JudgeServer, None]: with transaction.atomic(): cutoff = timezone.now() - timedelta(seconds=6) server = ( JudgeServer.objects .select_for_update(skip_locked=True) .filter( is_disabled=False, last_heartbeat__gte=cutoff, task_number__lte=F("cpu_core") * 2, ) .order_by("task_number") .first() ) if server: server.task_number = F("task_number") + 1 server.save(update_fields=["task_number"]) self.server = server return server return None def __exit__(self, exc_type, exc_val, exc_tb): if self.server: JudgeServer.objects.filter(id=self.server.id).update(task_number=F("task_number") - 1) class DispatcherBase(object): def __init__(self): self.token = hashlib.sha256(SysOptions.judge_server_token.encode("utf-8")).hexdigest() def _request(self, url, data=None): kwargs = {"headers": {"X-Judge-Server-Token": self.token}} if data: kwargs["json"] = data try: return requests.post(url, **kwargs).json() except Exception as e: logger.exception(e) class JudgeDispatcher(DispatcherBase): def __init__(self, submission_id, problem_id): super().__init__() self.submission = Submission.objects.get(id=submission_id) self.contest_id = self.submission.contest_id self.last_result = self.submission.result if self.submission.info else None if self.contest_id: self.problem = Problem.objects.select_related("contest").get(id=problem_id, contest_id=self.contest_id) self.contest = self.problem.contest else: self.problem = Problem.objects.get(id=problem_id) def _compute_statistic_info(self, resp_data): # 用时和内存占用保存为多个测试点中最长的那个 self.submission.statistic_info["time_cost"] = max([x["cpu_time"] for x in resp_data]) self.submission.statistic_info["memory_cost"] = max([x["memory"] for x in resp_data]) # sum up the score in OI mode if self.problem.rule_type == ProblemRuleType.OI: score = 0 try: for i in range(len(resp_data)): if resp_data[i]["result"] == JudgeStatus.ACCEPTED: resp_data[i]["score"] = self.problem.test_case_score[i]["score"] score += resp_data[i]["score"] else: resp_data[i]["score"] = 0 except IndexError: logger.error(f"Index Error raised when summing up the score in problem {self.problem.id}") self.submission.statistic_info["score"] = 0 return self.submission.statistic_info["score"] = score def judge(self): language = self.submission.language sub_config = list(filter(lambda item: language == item["name"], SysOptions.languages))[0] if language in self.problem.template: template = parse_problem_template(self.problem.template[language]) code = f"{template['prepend']}\n{self.submission.code}\n{template['append']}" else: code = self.submission.code data = { "language_config": sub_config["config"], "src": code, "max_cpu_time": self.problem.time_limit, "max_memory": 1024 * 1024 * self.problem.memory_limit, "test_case_id": self.problem.test_case_id, "output": False, "io_mode": self.problem.io_mode } with ChooseJudgeServer() as server: if not server: data = {"submission_id": self.submission.id, "problem_id": self.problem.id} cache.lpush(CacheKey.waiting_queue, json.dumps(data)) # 推送排队状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": JudgeStatus.PENDING, "status": "pending", } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") return Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.JUDGING) # 推送判题中状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": JudgeStatus.JUDGING, "status": "judging", } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") resp = self._request(urljoin(server.service_url, "/judge"), data=data) if not resp: Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.SYSTEM_ERROR) # 推送系统错误状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": JudgeStatus.SYSTEM_ERROR, "status": "error", } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") return if resp["err"]: self.submission.result = JudgeStatus.COMPILE_ERROR self.submission.statistic_info["err_info"] = resp["data"] self.submission.statistic_info["score"] = 0 else: resp["data"].sort(key=lambda x: int(x["test_case"])) self.submission.info = resp self._compute_statistic_info(resp["data"]) error_test_case = list(filter(lambda case: case["result"] != 0, resp["data"])) # ACM模式下,多个测试点全部正确则AC,否则取第一个错误的测试点的状态 # OI模式下, 若多个测试点全部正确则AC, 若全部错误则取第一个错误测试点状态,否则为部分正确 if not error_test_case: self.submission.result = JudgeStatus.ACCEPTED elif self.problem.rule_type == ProblemRuleType.ACM or len(error_test_case) == len(resp["data"]): self.submission.result = error_test_case[0]["result"] else: self.submission.result = JudgeStatus.PARTIALLY_ACCEPTED if self.submission.result == JudgeStatus.ACCEPTED: ast_rules = self.problem.ast_rules if ast_rules and language in ast_rules: from ast_checker.checker import check_ast passed, results = check_ast(self.submission.code, language, ast_rules[language]) if not passed: self.submission.result = JudgeStatus.AST_CHECK_FAILED self.submission.statistic_info["ast_results"] = results self.submission.save(update_fields=["result", "info", "statistic_info"]) # 推送判题完成状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": self.submission.result, "status": "finished", "time_cost": self.submission.statistic_info.get("time_cost"), "memory_cost": self.submission.statistic_info.get("memory_cost"), "score": self.submission.statistic_info.get("score", 0), } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") if self.contest_id: if self.contest.status != ContestStatus.CONTEST_UNDERWAY or \ User.objects.get(id=self.submission.user_id).is_contest_admin(self.contest): logger.info( "Contest debug mode, id: " + str(self.contest_id) + ", submission id: " + self.submission.id) return with transaction.atomic(): self.update_contest_problem_status() self.update_contest_rank() else: if self.last_result: self.update_problem_status_rejudge() else: self.update_problem_status() # 至此判题结束,尝试处理任务队列中剩余的任务 process_pending_task() def update_problem_status_rejudge(self): result = str(self.submission.result) problem_id = str(self.problem.id) with transaction.atomic(): # update problem status problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) if not is_accepted(self.last_result) and is_accepted(self.submission.result): problem.accepted_number = F("accepted_number") + 1 problem_info = problem.statistic_info problem_info[self.last_result] = problem_info.get(self.last_result, 1) - 1 problem_info[result] = problem_info.get(result, 0) + 1 problem.save(update_fields=["accepted_number", "statistic_info"]) profile = User.objects.select_for_update().get(id=self.submission.user_id).userprofile if problem.rule_type == ProblemRuleType.ACM: acm_problems_status = profile.acm_problems_status.get("problems", {}) if not is_accepted(acm_problems_status[problem_id]["status"]): acm_problems_status[problem_id]["status"] = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if is_accepted(self.submission.result): profile.accepted_number += 1 profile.acm_problems_status["problems"] = acm_problems_status profile.save(update_fields=["accepted_number", "acm_problems_status"]) else: oi_problems_status = profile.oi_problems_status.get("problems", {}) score = self.submission.statistic_info["score"] if not is_accepted(oi_problems_status[problem_id]["status"]): # minus last time score, add this tim score profile.add_score(this_time_score=score, last_time_score=oi_problems_status[problem_id]["score"]) oi_problems_status[problem_id]["score"] = score oi_problems_status[problem_id]["status"] = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if is_accepted(self.submission.result): profile.accepted_number += 1 profile.oi_problems_status["problems"] = oi_problems_status profile.save(update_fields=["accepted_number", "oi_problems_status"]) def update_problem_status(self): result = str(self.submission.result) problem_id = str(self.problem.id) with transaction.atomic(): # update problem status problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) problem.submission_number = F("submission_number") + 1 if is_accepted(self.submission.result): problem.accepted_number = F("accepted_number") + 1 problem_info = problem.statistic_info problem_info[result] = problem_info.get(result, 0) + 1 problem.save(update_fields=["accepted_number", "submission_number", "statistic_info"]) # update_userprofile user = User.objects.select_for_update().get(id=self.submission.user_id) user_profile = user.userprofile user_profile.submission_number = F("submission_number") + 1 profile_status = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if problem.rule_type == ProblemRuleType.ACM: acm_problems_status = user_profile.acm_problems_status.get("problems", {}) if problem_id not in acm_problems_status: acm_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id} if is_accepted(self.submission.result): user_profile.accepted_number += 1 elif not is_accepted(acm_problems_status[problem_id]["status"]): acm_problems_status[problem_id]["status"] = profile_status if is_accepted(self.submission.result): user_profile.accepted_number += 1 user_profile.acm_problems_status["problems"] = acm_problems_status user_profile.save(update_fields=["submission_number", "accepted_number", "acm_problems_status"]) else: oi_problems_status = user_profile.oi_problems_status.get("problems", {}) score = self.submission.statistic_info["score"] if problem_id not in oi_problems_status: user_profile.add_score(score) oi_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id, "score": score} if is_accepted(self.submission.result): user_profile.accepted_number += 1 elif not is_accepted(oi_problems_status[problem_id]["status"]): # minus last time score, add this time score user_profile.add_score(this_time_score=score, last_time_score=oi_problems_status[problem_id]["score"]) oi_problems_status[problem_id]["score"] = score oi_problems_status[problem_id]["status"] = profile_status if is_accepted(self.submission.result): user_profile.accepted_number += 1 user_profile.oi_problems_status["problems"] = oi_problems_status user_profile.save(update_fields=["submission_number", "accepted_number", "oi_problems_status"]) def update_contest_problem_status(self): with transaction.atomic(): user = User.objects.select_for_update().get(id=self.submission.user_id) user_profile = user.userprofile problem_id = str(self.problem.id) profile_status = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result contest_problems_status = user_profile.acm_problems_status.get("contest_problems", {}) if problem_id not in contest_problems_status: contest_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id} elif not is_accepted(contest_problems_status[problem_id]["status"]): contest_problems_status[problem_id]["status"] = profile_status else: # 如果已AC, 直接跳过 不计入任何计数器 return user_profile.acm_problems_status["contest_problems"] = contest_problems_status user_profile.save(update_fields=["acm_problems_status"]) problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) result = str(self.submission.result) problem_info = problem.statistic_info problem_info[result] = problem_info.get(result, 0) + 1 problem.submission_number = F("submission_number") + 1 if is_accepted(self.submission.result): problem.accepted_number = F("accepted_number") + 1 problem.save(update_fields=["submission_number", "accepted_number", "statistic_info"]) def update_contest_rank(self): def get_rank(): return ACMContestRank.objects.select_for_update().get(user_id=self.submission.user_id, contest=self.contest) try: rank = get_rank() except ACMContestRank.DoesNotExist: try: ACMContestRank.objects.create(user_id=self.submission.user_id, contest=self.contest) rank = get_rank() except IntegrityError: rank = get_rank() self._update_acm_contest_rank(rank) def _update_acm_contest_rank(self, rank): info = rank.submission_info.get(str(self.submission.problem_id)) # 因前面更改过,这里需要重新获取 problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) # 此题提交过 if info: if info["is_ac"]: return rank.submission_number += 1 if is_accepted(self.submission.result): rank.accepted_number += 1 info["is_ac"] = True info["ac_time"] = (self.submission.create_time - self.contest.start_time).total_seconds() rank.total_time += info["ac_time"] + info["error_number"] * 20 * 60 if problem.accepted_number == 1: info["is_first_ac"] = True elif self.submission.result != JudgeStatus.COMPILE_ERROR: info["error_number"] += 1 # 第一次提交 else: rank.submission_number += 1 info = {"is_ac": False, "ac_time": 0, "error_number": 0, "is_first_ac": False} if is_accepted(self.submission.result): rank.accepted_number += 1 info["is_ac"] = True info["ac_time"] = (self.submission.create_time - self.contest.start_time).total_seconds() rank.total_time += info["ac_time"] if problem.accepted_number == 1: info["is_first_ac"] = True elif self.submission.result != JudgeStatus.COMPILE_ERROR: info["error_number"] = 1 rank.submission_info[str(self.submission.problem_id)] = info rank.save(update_fields=["submission_info", "total_time", "accepted_number", "submission_number"])