bug fixes

This commit is contained in:
virusdefender
2017-10-06 17:46:14 +08:00
committed by zema1
parent a324d55364
commit 93bd77d8d8
16 changed files with 91 additions and 94 deletions

View File

@@ -14,7 +14,7 @@ from judge.languages import languages
from options.options import SysOptions
from problem.models import Problem, ProblemRuleType
from submission.models import JudgeStatus, Submission
from utils.cache import judge_cache, default_cache
from utils.cache import cache
from utils.constants import CacheKey
logger = logging.getLogger(__name__)
@@ -22,31 +22,28 @@ logger = logging.getLogger(__name__)
# 继续处理在队列中的问题
def process_pending_task():
if judge_cache.llen(CacheKey.waiting_queue):
if cache.llen(CacheKey.waiting_queue):
# 防止循环引入
from judge.tasks import judge_task
data = json.loads(judge_cache.rpop(CacheKey.waiting_queue).decode("utf-8"))
data = json.loads(cache.rpop(CacheKey.waiting_queue).decode("utf-8"))
judge_task.delay(**data)
class JudgeDispatcher(object):
def __init__(self, submission_id, problem_id):
self.token = hashlib.sha256(SysOptions.judge_server_token.encode("utf-8")).hexdigest()
self.redis_conn = judge_cache
self.submission = Submission.objects.get(pk=submission_id)
self.submission = Submission.objects.get(id=submission_id)
self.contest_id = self.submission.contest_id
if self.contest_id:
self.problem = Problem.objects.select_related("contest") \
.get(id=problem_id, contest_id=self.contest_id)
self.problem = Problem.objects.select_related("contest").get(id=problem_id, contest_id=self.contest_id)
self.contest = self.problem.contest
else:
self.problem = Problem.objects.get(id=problem_id)
def _request(self, url, data=None):
kwargs = {"headers": {"X-Judge-Server-Token": self.token,
"Content-Type": "application/json"}}
kwargs = {"headers": {"X-Judge-Server-Token": self.token}}
if data:
kwargs["data"] = json.dumps(data)
kwargs["json"] = data
try:
return requests.post(url, **kwargs).json()
except Exception as e:
@@ -55,7 +52,6 @@ class JudgeDispatcher(object):
@staticmethod
def choose_judge_server():
with transaction.atomic():
# TODO: use more reasonable way
servers = JudgeServer.objects.select_for_update().all().order_by("task_number")
servers = [s for s in servers if s.status == "normal"]
if servers:
@@ -65,10 +61,10 @@ class JudgeDispatcher(object):
return server
@staticmethod
def release_judge_res(judge_server_id):
def release_judge_server(judge_server_id):
with transaction.atomic():
# 使用原子操作, 同时因为use和release中间间隔了判题过程,需要重新查询一下
server = JudgeServer.objects.select_for_update().get(id=judge_server_id)
server = JudgeServer.objects.get(id=judge_server_id)
server.used_instance_number = F("task_number") - 1
server.save()
@@ -94,7 +90,7 @@ class JudgeDispatcher(object):
server = self.choose_judge_server()
if not server:
data = {"submission_id": self.submission.id, "problem_id": self.problem.id}
self.redis_conn.lpush(CacheKey.waiting_queue, json.dumps(data))
cache.lpush(CacheKey.waiting_queue, json.dumps(data))
return
sub_config = list(filter(lambda item: self.submission.language == item["name"], languages))[0]
@@ -138,7 +134,7 @@ class JudgeDispatcher(object):
else:
self.submission.result = JudgeStatus.PARTIALLY_ACCEPTED
self.submission.save()
self.release_judge_res(server.id)
self.release_judge_server(server.id)
self.update_problem_status()
if self.contest_id:
@@ -223,7 +219,7 @@ class JudgeDispatcher(object):
if self.contest_id and self.contest.status != ContestStatus.CONTEST_UNDERWAY:
return
if self.contest.real_time_rank:
default_cache.delete(CacheKey.contest_rank_cache + str(self.contest_id))
cache.delete(CacheKey.contest_rank_cache + str(self.contest_id))
with transaction.atomic():
if self.contest.rule_type == ContestRuleType.ACM:
acm_rank, _ = ACMContestRank.objects.select_for_update(). \