import hashlib import json import logging from datetime import timedelta from urllib.parse import urljoin import requests from django.db import IntegrityError, transaction from django.db.models import F from django.utils import timezone from account.models import User from conf.models import JudgeServer from contest.models import ACMContestRank, ContestRuleType, ContestStatus, OIContestRank from options.options import SysOptions from problem.models import Problem, ProblemRuleType from problem.utils import parse_problem_template from submission.models import JudgeStatus, Submission, is_accepted from utils.cache import cache from utils.constants import CacheKey from utils.websocket import push_submission_update logger = logging.getLogger(__name__) # 继续处理在队列中的问题 def process_pending_task(): if cache.llen(CacheKey.waiting_queue): # 防止循环引入 from judge.tasks import judge_task tmp_data = cache.rpop(CacheKey.waiting_queue) if tmp_data: data = json.loads(tmp_data.decode("utf-8")) judge_task.send(**data) class ChooseJudgeServer: def __init__(self): self.server = None def __enter__(self) -> [JudgeServer, None]: with transaction.atomic(): cutoff = timezone.now() - timedelta(seconds=6) server = ( JudgeServer.objects .select_for_update(skip_locked=True) .filter( is_disabled=False, last_heartbeat__gte=cutoff, task_number__lte=F("cpu_core") * 2, ) .order_by("task_number") .first() ) if server: server.task_number = F("task_number") + 1 server.save(update_fields=["task_number"]) self.server = server return server return None def __exit__(self, exc_type, exc_val, exc_tb): if self.server: JudgeServer.objects.filter(id=self.server.id).update(task_number=F("task_number") - 1) class DispatcherBase(object): def __init__(self): self.token = hashlib.sha256(SysOptions.judge_server_token.encode("utf-8")).hexdigest() def _request(self, url, data=None): kwargs = {"headers": {"X-Judge-Server-Token": self.token}} if data: kwargs["json"] = data try: return requests.post(url, **kwargs).json() except Exception as e: logger.exception(e) class JudgeDispatcher(DispatcherBase): def __init__(self, submission_id, problem_id): super().__init__() self.submission = Submission.objects.get(id=submission_id) self.contest_id = self.submission.contest_id self.last_result = self.submission.result if self.submission.info else None if self.contest_id: self.problem = Problem.objects.select_related("contest").get(id=problem_id, contest_id=self.contest_id) self.contest = self.problem.contest else: self.problem = Problem.objects.get(id=problem_id) def _compute_statistic_info(self, resp_data): # 用时和内存占用保存为多个测试点中最长的那个 self.submission.statistic_info["time_cost"] = max([x["cpu_time"] for x in resp_data]) self.submission.statistic_info["memory_cost"] = max([x["memory"] for x in resp_data]) # sum up the score in OI mode if self.problem.rule_type == ProblemRuleType.OI: score = 0 try: for i in range(len(resp_data)): if resp_data[i]["result"] == JudgeStatus.ACCEPTED: resp_data[i]["score"] = self.problem.test_case_score[i]["score"] score += resp_data[i]["score"] else: resp_data[i]["score"] = 0 except IndexError: logger.error(f"Index Error raised when summing up the score in problem {self.problem.id}") self.submission.statistic_info["score"] = 0 return self.submission.statistic_info["score"] = score def judge(self): language = self.submission.language sub_config = list(filter(lambda item: language == item["name"], SysOptions.languages))[0] if language in self.problem.template: template = parse_problem_template(self.problem.template[language]) code = f"{template['prepend']}\n{self.submission.code}\n{template['append']}" else: code = self.submission.code data = { "language_config": sub_config["config"], "src": code, "max_cpu_time": self.problem.time_limit, "max_memory": 1024 * 1024 * self.problem.memory_limit, "test_case_id": self.problem.test_case_id, "output": False, "io_mode": self.problem.io_mode } with ChooseJudgeServer() as server: if not server: data = {"submission_id": self.submission.id, "problem_id": self.problem.id} cache.lpush(CacheKey.waiting_queue, json.dumps(data)) # 推送排队状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": JudgeStatus.PENDING, "status": "pending", } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") return Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.JUDGING) # 推送判题中状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": JudgeStatus.JUDGING, "status": "judging", } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") resp = self._request(urljoin(server.service_url, "/judge"), data=data) if not resp: Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.SYSTEM_ERROR) # 推送系统错误状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": JudgeStatus.SYSTEM_ERROR, "status": "error", } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") return if resp["err"]: self.submission.result = JudgeStatus.COMPILE_ERROR self.submission.statistic_info["err_info"] = resp["data"] self.submission.statistic_info["score"] = 0 else: resp["data"].sort(key=lambda x: int(x["test_case"])) self.submission.info = resp self._compute_statistic_info(resp["data"]) error_test_case = list(filter(lambda case: case["result"] != 0, resp["data"])) # ACM模式下,多个测试点全部正确则AC,否则取第一个错误的测试点的状态 # OI模式下, 若多个测试点全部正确则AC, 若全部错误则取第一个错误测试点状态,否则为部分正确 if not error_test_case: self.submission.result = JudgeStatus.ACCEPTED elif self.problem.rule_type == ProblemRuleType.ACM or len(error_test_case) == len(resp["data"]): self.submission.result = error_test_case[0]["result"] else: self.submission.result = JudgeStatus.PARTIALLY_ACCEPTED if self.submission.result == JudgeStatus.ACCEPTED: ast_rules = self.problem.ast_rules if ast_rules and language in ast_rules: from ast_checker.checker import check_ast passed, results = check_ast(self.submission.code, language, ast_rules[language]) if not passed: self.submission.result = JudgeStatus.AST_CHECK_FAILED self.submission.statistic_info["ast_results"] = results self.submission.save(update_fields=["result", "info", "statistic_info"]) # 推送判题完成状态 try: push_submission_update( submission_id=str(self.submission.id), user_id=self.submission.user_id, data={ "type": "submission_update", "submission_id": str(self.submission.id), "result": self.submission.result, "status": "finished", "time_cost": self.submission.statistic_info.get("time_cost"), "memory_cost": self.submission.statistic_info.get("memory_cost"), "score": self.submission.statistic_info.get("score", 0), } ) except Exception as e: logger.error(f"Failed to push submission update: {str(e)}") if self.contest_id: if self.contest.status != ContestStatus.CONTEST_UNDERWAY or \ User.objects.get(id=self.submission.user_id).is_contest_admin(self.contest): logger.info( "Contest debug mode, id: " + str(self.contest_id) + ", submission id: " + self.submission.id) return with transaction.atomic(): self.update_contest_problem_status() self.update_contest_rank() else: if self.last_result: self.update_problem_status_rejudge() else: self.update_problem_status() # 至此判题结束,尝试处理任务队列中剩余的任务 process_pending_task() def update_problem_status_rejudge(self): result = str(self.submission.result) problem_id = str(self.problem.id) with transaction.atomic(): # update problem status problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) if not is_accepted(self.last_result) and is_accepted(self.submission.result): problem.accepted_number = F("accepted_number") + 1 problem_info = problem.statistic_info problem_info[self.last_result] = problem_info.get(self.last_result, 1) - 1 problem_info[result] = problem_info.get(result, 0) + 1 problem.save(update_fields=["accepted_number", "statistic_info"]) profile = User.objects.select_for_update().get(id=self.submission.user_id).userprofile if problem.rule_type == ProblemRuleType.ACM: acm_problems_status = profile.acm_problems_status.get("problems", {}) if not is_accepted(acm_problems_status[problem_id]["status"]): acm_problems_status[problem_id]["status"] = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if is_accepted(self.submission.result): profile.accepted_number += 1 profile.acm_problems_status["problems"] = acm_problems_status profile.save(update_fields=["accepted_number", "acm_problems_status"]) else: oi_problems_status = profile.oi_problems_status.get("problems", {}) score = self.submission.statistic_info["score"] if not is_accepted(oi_problems_status[problem_id]["status"]): # minus last time score, add this tim score profile.add_score(this_time_score=score, last_time_score=oi_problems_status[problem_id]["score"]) oi_problems_status[problem_id]["score"] = score oi_problems_status[problem_id]["status"] = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if is_accepted(self.submission.result): profile.accepted_number += 1 profile.oi_problems_status["problems"] = oi_problems_status profile.save(update_fields=["accepted_number", "oi_problems_status"]) def update_problem_status(self): result = str(self.submission.result) problem_id = str(self.problem.id) with transaction.atomic(): # update problem status problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) problem.submission_number = F("submission_number") + 1 if is_accepted(self.submission.result): problem.accepted_number = F("accepted_number") + 1 problem_info = problem.statistic_info problem_info[result] = problem_info.get(result, 0) + 1 problem.save(update_fields=["accepted_number", "submission_number", "statistic_info"]) # update_userprofile user = User.objects.select_for_update().get(id=self.submission.user_id) user_profile = user.userprofile user_profile.submission_number = F("submission_number") + 1 profile_status = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if problem.rule_type == ProblemRuleType.ACM: acm_problems_status = user_profile.acm_problems_status.get("problems", {}) if problem_id not in acm_problems_status: acm_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id} if is_accepted(self.submission.result): user_profile.accepted_number += 1 elif not is_accepted(acm_problems_status[problem_id]["status"]): acm_problems_status[problem_id]["status"] = profile_status if is_accepted(self.submission.result): user_profile.accepted_number += 1 user_profile.acm_problems_status["problems"] = acm_problems_status user_profile.save(update_fields=["submission_number", "accepted_number", "acm_problems_status"]) else: oi_problems_status = user_profile.oi_problems_status.get("problems", {}) score = self.submission.statistic_info["score"] if problem_id not in oi_problems_status: user_profile.add_score(score) oi_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id, "score": score} if is_accepted(self.submission.result): user_profile.accepted_number += 1 elif not is_accepted(oi_problems_status[problem_id]["status"]): # minus last time score, add this time score user_profile.add_score(this_time_score=score, last_time_score=oi_problems_status[problem_id]["score"]) oi_problems_status[problem_id]["score"] = score oi_problems_status[problem_id]["status"] = profile_status if is_accepted(self.submission.result): user_profile.accepted_number += 1 user_profile.oi_problems_status["problems"] = oi_problems_status user_profile.save(update_fields=["submission_number", "accepted_number", "oi_problems_status"]) def update_contest_problem_status(self): with transaction.atomic(): user = User.objects.select_for_update().get(id=self.submission.user_id) user_profile = user.userprofile problem_id = str(self.problem.id) profile_status = JudgeStatus.ACCEPTED if is_accepted(self.submission.result) else self.submission.result if self.contest.rule_type == ContestRuleType.ACM: contest_problems_status = user_profile.acm_problems_status.get("contest_problems", {}) if problem_id not in contest_problems_status: contest_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id} elif not is_accepted(contest_problems_status[problem_id]["status"]): contest_problems_status[problem_id]["status"] = profile_status else: # 如果已AC, 直接跳过 不计入任何计数器 return user_profile.acm_problems_status["contest_problems"] = contest_problems_status user_profile.save(update_fields=["acm_problems_status"]) elif self.contest.rule_type == ContestRuleType.OI: contest_problems_status = user_profile.oi_problems_status.get("contest_problems", {}) score = self.submission.statistic_info["score"] if problem_id not in contest_problems_status: contest_problems_status[problem_id] = {"status": profile_status, "_id": self.problem._id, "score": score} else: contest_problems_status[problem_id]["score"] = score contest_problems_status[problem_id]["status"] = profile_status user_profile.oi_problems_status["contest_problems"] = contest_problems_status user_profile.save(update_fields=["oi_problems_status"]) problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) result = str(self.submission.result) problem_info = problem.statistic_info problem_info[result] = problem_info.get(result, 0) + 1 problem.submission_number = F("submission_number") + 1 if is_accepted(self.submission.result): problem.accepted_number = F("accepted_number") + 1 problem.save(update_fields=["submission_number", "accepted_number", "statistic_info"]) def update_contest_rank(self): if self.contest.rule_type == ContestRuleType.OI or self.contest.real_time_rank: cache.delete(f"{CacheKey.contest_rank_cache}:{self.contest.id}") def get_rank(model): return model.objects.select_for_update().get(user_id=self.submission.user_id, contest=self.contest) if self.contest.rule_type == ContestRuleType.ACM: model = ACMContestRank func = self._update_acm_contest_rank else: model = OIContestRank func = self._update_oi_contest_rank try: rank = get_rank(model) except model.DoesNotExist: try: model.objects.create(user_id=self.submission.user_id, contest=self.contest) rank = get_rank(model) except IntegrityError: rank = get_rank(model) func(rank) def _update_acm_contest_rank(self, rank): info = rank.submission_info.get(str(self.submission.problem_id)) # 因前面更改过,这里需要重新获取 problem = Problem.objects.select_for_update().get(contest_id=self.contest_id, id=self.problem.id) # 此题提交过 if info: if info["is_ac"]: return rank.submission_number += 1 if is_accepted(self.submission.result): rank.accepted_number += 1 info["is_ac"] = True info["ac_time"] = (self.submission.create_time - self.contest.start_time).total_seconds() rank.total_time += info["ac_time"] + info["error_number"] * 20 * 60 if problem.accepted_number == 1: info["is_first_ac"] = True elif self.submission.result != JudgeStatus.COMPILE_ERROR: info["error_number"] += 1 # 第一次提交 else: rank.submission_number += 1 info = {"is_ac": False, "ac_time": 0, "error_number": 0, "is_first_ac": False} if is_accepted(self.submission.result): rank.accepted_number += 1 info["is_ac"] = True info["ac_time"] = (self.submission.create_time - self.contest.start_time).total_seconds() rank.total_time += info["ac_time"] if problem.accepted_number == 1: info["is_first_ac"] = True elif self.submission.result != JudgeStatus.COMPILE_ERROR: info["error_number"] = 1 rank.submission_info[str(self.submission.problem_id)] = info rank.save(update_fields=["submission_info", "total_time", "accepted_number", "submission_number"]) def _update_oi_contest_rank(self, rank): problem_id = str(self.submission.problem_id) current_score = self.submission.statistic_info["score"] last_score = rank.submission_info.get(problem_id) if last_score: rank.total_score = rank.total_score - last_score + current_score else: rank.total_score = rank.total_score + current_score rank.submission_info[problem_id] = current_score rank.save(update_fields=["submission_info", "total_score", "submission_number"])