From d8966ed48af556a4234f54f4c6e637c84ed2830f Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sat, 28 Nov 2015 20:32:49 +0800 Subject: [PATCH 01/24] dev test --- judge/judger/__init__.py => db1.sqlite3 | 0 judge/__init__.py | 1 - judge/{judger => }/client.py | 0 judge/{judger => }/compiler.py | 0 judge/{judger => }/judge_exceptions.py | 0 judge/judger/run.py | 94 ------------------- judge/judger_controller/README.md | 1 - judge/judger_controller/celery.py | 9 -- judge/judger_controller/settings.py | 39 -------- judge/judger_controller/tasks.py | 45 --------- judge/{judger => }/language.py | 0 judge/{judger => }/logger.py | 0 judge/{judger => }/result.py | 0 judge/run.py | 52 ++++++++++ .../__init__.py => server.py} | 0 judge/{judger => }/settings.py | 0 judge/{judger => }/utils.py | 0 judge_dispatcher/__init__.py | 1 + judge_dispatcher/judge.py | 16 ++++ judge_dispatcher/models.py | 15 +++ judge_dispatcher/rpc_client.py | 24 +++++ judge_dispatcher/settings.py | 9 ++ 22 files changed, 117 insertions(+), 189 deletions(-) rename judge/judger/__init__.py => db1.sqlite3 (100%) rename judge/{judger => }/client.py (100%) rename judge/{judger => }/compiler.py (100%) rename judge/{judger => }/judge_exceptions.py (100%) delete mode 100644 judge/judger/run.py delete mode 100644 judge/judger_controller/README.md delete mode 100644 judge/judger_controller/celery.py delete mode 100644 judge/judger_controller/settings.py delete mode 100644 judge/judger_controller/tasks.py rename judge/{judger => }/language.py (100%) rename judge/{judger => }/logger.py (100%) rename judge/{judger => }/result.py (100%) create mode 100644 judge/run.py rename judge/{judger_controller/__init__.py => server.py} (100%) rename judge/{judger => }/settings.py (100%) rename judge/{judger => }/utils.py (100%) create mode 100644 judge_dispatcher/__init__.py create mode 100644 judge_dispatcher/judge.py create mode 100644 judge_dispatcher/models.py create mode 100644 judge_dispatcher/rpc_client.py create mode 100644 judge_dispatcher/settings.py diff --git a/judge/judger/__init__.py b/db1.sqlite3 similarity index 100% rename from judge/judger/__init__.py rename to db1.sqlite3 diff --git a/judge/__init__.py b/judge/__init__.py index 9bad579..e69de29 100644 --- a/judge/__init__.py +++ b/judge/__init__.py @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/judge/judger/client.py b/judge/client.py similarity index 100% rename from judge/judger/client.py rename to judge/client.py diff --git a/judge/judger/compiler.py b/judge/compiler.py similarity index 100% rename from judge/judger/compiler.py rename to judge/compiler.py diff --git a/judge/judger/judge_exceptions.py b/judge/judge_exceptions.py similarity index 100% rename from judge/judger/judge_exceptions.py rename to judge/judge_exceptions.py diff --git a/judge/judger/run.py b/judge/judger/run.py deleted file mode 100644 index 8aeb547..0000000 --- a/judge/judger/run.py +++ /dev/null @@ -1,94 +0,0 @@ -# coding=utf-8 -import sys -import json -import MySQLdb - -from client import JudgeClient -from language import languages -from compiler import compile_ -from result import result -from settings import judger_workspace, submission_db -from logger import logger - - -# 简单的解析命令行参数 -# 参数有 -solution_id -time_limit -memory_limit -test_case_id -# 获取到的值是['xxx.py', '-solution_id', '1111', '-time_limit', '1000', '-memory_limit', '100', '-test_case_id', 'aaaa'] -args = sys.argv -submission_id = args[2] -time_limit = args[4] -memory_limit = args[6] -test_case_id = args[8] - - -def db_conn(): - return MySQLdb.connect(db=submission_db["db"], - user=submission_db["user"], - passwd=submission_db["password"], - host=submission_db["host"], - port=submission_db["port"], charset="utf8") - - -conn = db_conn() -cur = conn.cursor() -cur.execute("select language, code from submission where id = %s", (submission_id,)) -data = cur.fetchall() -if not data: - exit() -language_code = data[0][0] -code = data[0][1] - -conn.close() - -# 将代码写入文件 -language = languages[language_code] -src_path = judger_workspace + "run/" + language["src_name"] -f = open(src_path, "w") -f.write(code.encode("utf8")) -f.close() - -# 编译 -try: - exe_path = compile_(language, src_path, judger_workspace + "run/") -except Exception as e: - print e - conn = db_conn() - cur = conn.cursor() - cur.execute("update submission set result=%s, info=%s where id=%s", - (result["compile_error"], str(e), submission_id)) - conn.commit() - exit() - -# 运行 -try: - client = JudgeClient(language_code=language_code, - exe_path=exe_path, - max_cpu_time=int(time_limit), - max_real_time=int(time_limit) * 2, - max_memory=int(memory_limit), - test_case_dir=judger_workspace + "test_case/" + test_case_id + "/") - judge_result = {"result": result["accepted"], "info": client.run(), "accepted_answer_time": None} - - for item in judge_result["info"]: - if item["result"]: - judge_result["result"] = item["result"] - break - else: - l = sorted(judge_result["info"], key=lambda k: k["cpu_time"]) - judge_result["accepted_answer_time"] = l[-1]["cpu_time"] - -except Exception as e: - logger.error(e) - conn = db_conn() - cur = conn.cursor() - cur.execute("update submission set result=%s, info=%s where id=%s", (result["system_error"], str(e), submission_id)) - conn.commit() - exit() - -conn = db_conn() -cur = conn.cursor() -cur.execute("update submission set result=%s, info=%s, accepted_answer_time=%s where id=%s", - (judge_result["result"], json.dumps(judge_result["info"]), judge_result["accepted_answer_time"], - submission_id)) -conn.commit() -conn.close() diff --git a/judge/judger_controller/README.md b/judge/judger_controller/README.md deleted file mode 100644 index 9dd5603..0000000 --- a/judge/judger_controller/README.md +++ /dev/null @@ -1 +0,0 @@ -celery -A judge.controller worker -l DEBUG \ No newline at end of file diff --git a/judge/judger_controller/celery.py b/judge/judger_controller/celery.py deleted file mode 100644 index 4c64ab0..0000000 --- a/judge/judger_controller/celery.py +++ /dev/null @@ -1,9 +0,0 @@ -# coding=utf-8 -from __future__ import absolute_import -from celery import Celery, platforms -from .settings import redis_config - -app = Celery("judge", broker='redis://%s:%s/%s' % (redis_config["host"], redis_config["port"], redis_config["db"]), - include=["judge.judger_controller.tasks"]) - -platforms.C_FORCE_ROOT =True diff --git a/judge/judger_controller/settings.py b/judge/judger_controller/settings.py deleted file mode 100644 index 4d48340..0000000 --- a/judge/judger_controller/settings.py +++ /dev/null @@ -1,39 +0,0 @@ -# coding=utf-8 -""" -注意: -此文件包含 celery 的部分配置,但是 celery 并不是运行在docker 中的,所以本配置文件中的 redis和 MySQL 的地址就应该是 -运行 redis 和 MySQL 的 docker 容器的地址了。怎么获取这个地址见帮助文档。测试用例的路径和源代码路径同理。 -""" -import os -# 这个redis 是 celery 使用的,包括存储队列信息还有部分统计信息 -redis_config = { - "host": os.environ.get("REDIS_PORT_6379_TCP_ADDR"), - "port": 6379, - "db": 0 -} - - -# 判题的 docker 容器的配置参数 -docker_config = { - "image_name": "judger", - "docker_path": "docker", - "shell": True -} - - -# 测试用例的路径,是主机上的实际路径 -test_case_dir = "/root/test_case/" -# 源代码路径,也就是 manage.py 所在的实际路径 -source_code_dir = "/root/qduoj/" -# 日志文件夹路径 -log_dir = "/root/log/" - - -# 存储提交信息的数据库,是 celery 使用的,与 oj.settings/local_settings 等区分,那是 web 服务器访问的地址 -submission_db = { - "host": os.environ.get("submission_db_host"), - "port": 3306, - "db": "oj_submission", - "user": "root", - "password": "root" -} diff --git a/judge/judger_controller/tasks.py b/judge/judger_controller/tasks.py deleted file mode 100644 index 2219d96..0000000 --- a/judge/judger_controller/tasks.py +++ /dev/null @@ -1,45 +0,0 @@ -# coding=utf-8 -import json -import redis -import MySQLdb -import subprocess -from ..judger.result import result -from ..judger_controller.celery import app -from settings import docker_config, source_code_dir, test_case_dir, log_dir, submission_db, redis_config - - -@app.task -def judge(submission_id, time_limit, memory_limit, test_case_id): - try: - command = "%s run --privileged --rm " \ - "--link mysql " \ - "-v %s:/var/judger/test_case/:ro " \ - "-v %s:/var/judger/code/:ro " \ - "-v %s:/var/judger/code/log/ " \ - "--device /dev/null:/dev/null " \ - "%s " \ - "python judge/judger/run.py " \ - "--solution_id %s --time_limit %s --memory_limit %s --test_case_id %s" % \ - (docker_config["docker_path"], - test_case_dir, - source_code_dir, - log_dir, - docker_config["image_name"], - submission_id, str(time_limit), str(memory_limit), test_case_id) - subprocess.call(command, shell=docker_config["shell"]) - except Exception as e: - conn = MySQLdb.connect(db=submission_db["db"], - user=submission_db["user"], - passwd=submission_db["password"], - host=submission_db["host"], - port=submission_db["port"], - charset="utf8") - - cur = conn.cursor() - cur.execute("update submission set result=%s, info=%s where id=%s", - (result["system_error"], str(e), submission_id)) - conn.commit() - conn.close() - r = redis.Redis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) - r.decr("judge_queue_length") - r.lpush("queue", submission_id) diff --git a/judge/judger/language.py b/judge/language.py similarity index 100% rename from judge/judger/language.py rename to judge/language.py diff --git a/judge/judger/logger.py b/judge/logger.py similarity index 100% rename from judge/judger/logger.py rename to judge/logger.py diff --git a/judge/judger/result.py b/judge/result.py similarity index 100% rename from judge/judger/result.py rename to judge/result.py diff --git a/judge/run.py b/judge/run.py new file mode 100644 index 0000000..2118521 --- /dev/null +++ b/judge/run.py @@ -0,0 +1,52 @@ +# coding=utf-8 +import sys +import json +import MySQLdb + +from client import JudgeClient +from language import languages +from compiler import compile_ +from result import result +from settings import judger_workspace, submission_db +from logger import logger + + +class JudgeInstanceRunner(object): + def __init__(self): + pass + + def run(self, submission_id, language_code, code, time_limit, memory_limit, test_case_id): + language = languages[language_code] + # 将代码写入文件 + src_path = judger_workspace + "run/" + submission_id + "/" + language["src_name"] + f = open(src_path, "w") + f.write(code.encode("utf8")) + f.close() + + # 编译 + try: + exe_path = compile_(language, src_path, judger_workspace + "run/" + submission_id + "/") + except Exception as e: + return {"code": 1, "data": str(e)} + + # 运行 + try: + client = JudgeClient(language_code=language_code, + exe_path=exe_path, + max_cpu_time=int(time_limit), + max_real_time=int(time_limit) * 2, + max_memory=int(memory_limit), + test_case_dir=judger_workspace + "test_case/" + test_case_id + "/") + judge_result = {"result": result["accepted"], "info": client.run(), "accepted_answer_time": None} + + for item in judge_result["info"]: + if item["result"]: + judge_result["result"] = item["result"] + break + else: + l = sorted(judge_result["info"], key=lambda k: k["cpu_time"]) + judge_result["accepted_answer_time"] = l[-1]["cpu_time"] + return {"code": 0, "data": judge_result} + + except Exception as e: + return {"code": 1, "data": str(e)} \ No newline at end of file diff --git a/judge/judger_controller/__init__.py b/judge/server.py similarity index 100% rename from judge/judger_controller/__init__.py rename to judge/server.py diff --git a/judge/judger/settings.py b/judge/settings.py similarity index 100% rename from judge/judger/settings.py rename to judge/settings.py diff --git a/judge/judger/utils.py b/judge/utils.py similarity index 100% rename from judge/judger/utils.py rename to judge/utils.py diff --git a/judge_dispatcher/__init__.py b/judge_dispatcher/__init__.py new file mode 100644 index 0000000..9bad579 --- /dev/null +++ b/judge_dispatcher/__init__.py @@ -0,0 +1 @@ +# coding=utf-8 diff --git a/judge_dispatcher/judge.py b/judge_dispatcher/judge.py new file mode 100644 index 0000000..8e4e1dd --- /dev/null +++ b/judge_dispatcher/judge.py @@ -0,0 +1,16 @@ +# coding=utf-8 +import socket +import redis + +from .rpc_client import TimeoutServerProxy +from .settings import redis_config +from .models import JudgeServer + + +class JudgeDispatcher(object): + def __init__(self): + self.redis = redis.StrictRedis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) + + def judge(self): + pass + diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py new file mode 100644 index 0000000..29f5226 --- /dev/null +++ b/judge_dispatcher/models.py @@ -0,0 +1,15 @@ +# coding=utf-8 +from django.db import models + + +class JudgeServer(models.Model): + ip = models.IPAddressField() + port = models.IntegerField() + # 这个服务器最大可能运行的判题实例数量 + max_instance_number = models.IntegerField() + left_instance_number = models.IntegerField() + # status 为 false 的时候代表不使用这个服务器 + status = models.BooleanField(default=True) + + class Meta: + db_table = "judge_server" diff --git a/judge_dispatcher/rpc_client.py b/judge_dispatcher/rpc_client.py new file mode 100644 index 0000000..c095cb5 --- /dev/null +++ b/judge_dispatcher/rpc_client.py @@ -0,0 +1,24 @@ +# coding=utf-8 +import xmlrpclib +import httplib + + +class TimeoutHTTPConnection(httplib.HTTPConnection): + def __init__(self, host, timeout=10): + httplib.HTTPConnection.__init__(self, host, timeout=timeout) + + +class TimeoutTransport(xmlrpclib.Transport): + def __init__(self, timeout=10, *args, **kwargs): + xmlrpclib.Transport.__init__(self, *args, **kwargs) + self.timeout = timeout + + def make_connection(self, host): + conn = TimeoutHTTPConnection(host, self.timeout) + return conn + + +class TimeoutServerProxy(xmlrpclib.ServerProxy): + def __init__(self, uri, timeout=10, *args, **kwargs): + kwargs['transport'] = TimeoutTransport(timeout=timeout, use_datetime=kwargs.get('use_datetime', 0)) + xmlrpclib.ServerProxy.__init__(self, uri, *args, **kwargs) diff --git a/judge_dispatcher/settings.py b/judge_dispatcher/settings.py new file mode 100644 index 0000000..332cefe --- /dev/null +++ b/judge_dispatcher/settings.py @@ -0,0 +1,9 @@ +# coding=utf-8 +import os + + +redis_config = { + "host": os.environ.get("REDIS_PORT_6379_TCP_ADDR"), + "port": 6379, + "db": 0 +} \ No newline at end of file From 3e3770f6695ff14e7137371c3b9d9eea7ba587d8 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 29 Nov 2015 01:05:29 +0800 Subject: [PATCH 02/24] =?UTF-8?q?=E5=AE=8C=E5=96=84=20rpc=20=E9=80=9A?= =?UTF-8?q?=E4=BF=A1=20server=EF=BC=9B=E5=B0=86=E5=88=A4=E9=A2=98=E9=99=90?= =?UTF-8?q?=E5=88=B6=E5=9C=A8=E4=B8=80=E4=B8=AA=E5=AE=B9=E5=99=A8=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge/{run.py => runner.py} | 35 ++++++++++++++++++++++------------- judge/server.py | 12 ++++++++++++ 2 files changed, 34 insertions(+), 13 deletions(-) rename judge/{run.py => runner.py} (64%) diff --git a/judge/run.py b/judge/runner.py similarity index 64% rename from judge/run.py rename to judge/runner.py index 2118521..49b1ea9 100644 --- a/judge/run.py +++ b/judge/runner.py @@ -1,14 +1,12 @@ # coding=utf-8 -import sys -import json -import MySQLdb +import os +import shutil from client import JudgeClient from language import languages from compiler import compile_ from result import result -from settings import judger_workspace, submission_db -from logger import logger +from settings import judger_workspace class JudgeInstanceRunner(object): @@ -17,16 +15,26 @@ class JudgeInstanceRunner(object): def run(self, submission_id, language_code, code, time_limit, memory_limit, test_case_id): language = languages[language_code] - # 将代码写入文件 - src_path = judger_workspace + "run/" + submission_id + "/" + language["src_name"] - f = open(src_path, "w") - f.write(code.encode("utf8")) - f.close() + + judge_base_path = os.path.join(judger_workspace, "run", submission_id) + + try: + os.mkdir(judge_base_path) + + # 将代码写入文件 + src_path = os.path.join(judge_base_path, language["src_name"]) + f = open(src_path, "w") + f.write(code.encode("utf8")) + f.close() + except Exception as e: + shutil.rmtree(judge_base_path, ignore_errors=True) + return {"code": 2, "data": str(e)} # 编译 try: - exe_path = compile_(language, src_path, judger_workspace + "run/" + submission_id + "/") + exe_path = compile_(language, src_path, judge_base_path) except Exception as e: + shutil.rmtree(judge_base_path, ignore_errors=True) return {"code": 1, "data": str(e)} # 运行 @@ -47,6 +55,7 @@ class JudgeInstanceRunner(object): l = sorted(judge_result["info"], key=lambda k: k["cpu_time"]) judge_result["accepted_answer_time"] = l[-1]["cpu_time"] return {"code": 0, "data": judge_result} - except Exception as e: - return {"code": 1, "data": str(e)} \ No newline at end of file + return {"code": 2, "data": str(e)} + finally: + shutil.rmtree(judge_base_path, ignore_errors=True) \ No newline at end of file diff --git a/judge/server.py b/judge/server.py index 9bad579..477cc1e 100644 --- a/judge/server.py +++ b/judge/server.py @@ -1 +1,13 @@ # coding=utf-8 +import SocketServer +from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler +from runner import JudgeInstanceRunner + + +class AsyncXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer): + pass + + +server = AsyncXMLRPCServer(('0.0.0.0', 8080), SimpleXMLRPCRequestHandler, allow_none=True) +server.register_instance(JudgeInstanceRunner()) +server.serve_forever() \ No newline at end of file From 236102b6accc56a80df1e81b03429c043a8446c3 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 29 Nov 2015 01:24:15 +0800 Subject: [PATCH 03/24] =?UTF-8?q?=E5=88=A4=E9=A2=98=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E4=B8=AD=E8=BF=94=E5=9B=9E=20server=20=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=E4=BE=BF=E4=BA=8E=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge/runner.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/judge/runner.py b/judge/runner.py index 49b1ea9..4b5349e 100644 --- a/judge/runner.py +++ b/judge/runner.py @@ -1,5 +1,6 @@ # coding=utf-8 import os +import socket import shutil from client import JudgeClient @@ -15,7 +16,7 @@ class JudgeInstanceRunner(object): def run(self, submission_id, language_code, code, time_limit, memory_limit, test_case_id): language = languages[language_code] - + host_name = socket.gethostname() judge_base_path = os.path.join(judger_workspace, "run", submission_id) try: @@ -28,14 +29,14 @@ class JudgeInstanceRunner(object): f.close() except Exception as e: shutil.rmtree(judge_base_path, ignore_errors=True) - return {"code": 2, "data": str(e)} + return {"code": 2, "data": {"error": str(e), "server": host_name}} # 编译 try: exe_path = compile_(language, src_path, judge_base_path) except Exception as e: shutil.rmtree(judge_base_path, ignore_errors=True) - return {"code": 1, "data": str(e)} + return {"code": 1, "data": {"error": str(e), "server": host_name}} # 运行 try: @@ -45,7 +46,8 @@ class JudgeInstanceRunner(object): max_real_time=int(time_limit) * 2, max_memory=int(memory_limit), test_case_dir=judger_workspace + "test_case/" + test_case_id + "/") - judge_result = {"result": result["accepted"], "info": client.run(), "accepted_answer_time": None} + judge_result = {"result": result["accepted"], "info": client.run(), + "accepted_answer_time": None, "server": host_name} for item in judge_result["info"]: if item["result"]: @@ -56,6 +58,6 @@ class JudgeInstanceRunner(object): judge_result["accepted_answer_time"] = l[-1]["cpu_time"] return {"code": 0, "data": judge_result} except Exception as e: - return {"code": 2, "data": str(e)} + return {"code": 2, "data": {"error": str(e), "server": host_name}} finally: shutil.rmtree(judge_base_path, ignore_errors=True) \ No newline at end of file From 3311a4c8990faeab67241de2a8ab01d5a8f29a58 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 29 Nov 2015 21:29:26 +0800 Subject: [PATCH 04/24] =?UTF-8?q?rpc=20=E9=80=9A=E4=BF=A1=E5=92=8C?= =?UTF-8?q?=E5=88=A4=E9=A2=98=E5=88=9D=E6=AD=A5=E6=B5=8B=E8=AF=95=E9=80=9A?= =?UTF-8?q?=E8=BF=87=EF=BC=9B=E5=88=A4=E9=A2=98=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?UTF-8?q?=E4=B8=8D=E5=86=8D=E4=BE=9D=E8=B5=96=20redis=20=E5=92=8C=20mysq?= =?UTF-8?q?l=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- contest/models.py | 2 +- judge/language.py | 8 ++++---- judge/runner.py | 1 + judge_dispatcher/judge.py | 16 ---------------- judge_dispatcher/models.py | 2 +- judge_dispatcher/settings.py | 9 --------- judge_dispatcher/tasks.py | 34 ++++++++++++++++++++++++++++++++++ monitor/views.py | 6 +++--- oj/__init__.py | 7 ++++++- oj/celery.py | 17 +++++++++++++++++ oj/local_settings.py | 5 +++++ oj/server_settings.py | 5 +++++ oj/settings.py | 12 ++++++++++++ submission/models.py | 2 +- submission/views.py | 12 ++++++------ 15 files changed, 96 insertions(+), 42 deletions(-) delete mode 100644 judge_dispatcher/judge.py delete mode 100644 judge_dispatcher/settings.py create mode 100644 judge_dispatcher/tasks.py create mode 100644 oj/celery.py diff --git a/contest/models.py b/contest/models.py index 2213f43..4e3631b 100644 --- a/contest/models.py +++ b/contest/models.py @@ -7,7 +7,7 @@ from problem.models import AbstractProblem from group.models import Group from utils.models import RichTextField from jsonfield import JSONField -from judge.judger.result import result +from judge.result import result GROUP_CONTEST = 0 diff --git a/judge/language.py b/judge/language.py index c7a14eb..e640656 100644 --- a/judge/language.py +++ b/judge/language.py @@ -7,16 +7,16 @@ languages = { "src_name": "main.c", "code": 1, "syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k", - "compile_command": "gcc -DONLINE_JUDGE -O2 -w -std=c99 {src_path} -lm -o {exe_path}main", - "execute_command": "{exe_path}main" + "compile_command": "gcc -DONLINE_JUDGE -O2 -w -std=c99 {src_path} -lm -o {exe_path}/main", + "execute_command": "{exe_path}/main" }, 2: { "name": "cpp", "src_name": "main.cpp", "code": 2, "syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k", - "compile_command": "g++ -DONLINE_JUDGE -O2 -w -std=c++11 {src_path} -lm -o {exe_path}main", - "execute_command": "{exe_path}main" + "compile_command": "g++ -DONLINE_JUDGE -O2 -w -std=c++11 {src_path} -lm -o {exe_path}/main", + "execute_command": "{exe_path}/main" }, 3: { "name": "java", diff --git a/judge/runner.py b/judge/runner.py index 4b5349e..9eb78b0 100644 --- a/judge/runner.py +++ b/judge/runner.py @@ -21,6 +21,7 @@ class JudgeInstanceRunner(object): try: os.mkdir(judge_base_path) + os.chmod(judge_base_path, 0777) # 将代码写入文件 src_path = os.path.join(judge_base_path, language["src_name"]) diff --git a/judge_dispatcher/judge.py b/judge_dispatcher/judge.py deleted file mode 100644 index 8e4e1dd..0000000 --- a/judge_dispatcher/judge.py +++ /dev/null @@ -1,16 +0,0 @@ -# coding=utf-8 -import socket -import redis - -from .rpc_client import TimeoutServerProxy -from .settings import redis_config -from .models import JudgeServer - - -class JudgeDispatcher(object): - def __init__(self): - self.redis = redis.StrictRedis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) - - def judge(self): - pass - diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 29f5226..67401c2 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -3,7 +3,7 @@ from django.db import models class JudgeServer(models.Model): - ip = models.IPAddressField() + ip = models.GenericIPAddressField() port = models.IntegerField() # 这个服务器最大可能运行的判题实例数量 max_instance_number = models.IntegerField() diff --git a/judge_dispatcher/settings.py b/judge_dispatcher/settings.py deleted file mode 100644 index 332cefe..0000000 --- a/judge_dispatcher/settings.py +++ /dev/null @@ -1,9 +0,0 @@ -# coding=utf-8 -import os - - -redis_config = { - "host": os.environ.get("REDIS_PORT_6379_TCP_ADDR"), - "port": 6379, - "db": 0 -} \ No newline at end of file diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py new file mode 100644 index 0000000..b5bfcd4 --- /dev/null +++ b/judge_dispatcher/tasks.py @@ -0,0 +1,34 @@ +# coding=utf-8 +import json + +from celery import shared_task +from rpc_client import TimeoutServerProxy + +from judge.result import result +from submission.models import Submission + + +@shared_task +def create_judge_task(submission_id, code, language_code, time_limit, memory_limit, test_case_id): + submission = Submission.objects.get(id=submission_id) + try: + s = TimeoutServerProxy('http://121.42.198.156:8080', timeout=20) + data = s.run(submission_id, language_code, code, time_limit, memory_limit, test_case_id) + print data + # 编译错误 + if data["code"] == 1: + submission.result = result["compile_error"] + submission.info = data["data"]["error"] + # system error + elif data["code"] == 2: + submission.result = result["system_error"] + submission.info = data["data"]["error"] + elif data["code"] == 0: + submission.result = data["data"]["result"] + submission.info = json.dumps(data["data"]["info"]) + submission.accepted_answer_time = data["data"]["accepted_answer_time"] + except Exception as e: + submission.result = result["system_error"] + submission.info = str(e) + finally: + submission.save() diff --git a/monitor/views.py b/monitor/views.py index 15c62b4..f502579 100644 --- a/monitor/views.py +++ b/monitor/views.py @@ -2,15 +2,15 @@ import redis import datetime from rest_framework.views import APIView -from judge.judger.result import result -from judge.judger_controller.settings import redis_config +from judge.result import result +from django.conf import settings from utils.shortcuts import success_response from submission.models import Submission class QueueLengthMonitorAPIView(APIView): def get(self, request): - r = redis.Redis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) + r = redis.Redis(host=settings.redis_config["host"], port=settings.redis_config["port"], db=settings.redis_config["db"]) waiting_number = r.get("judge_queue_length") if waiting_number is None: waiting_number = 0 diff --git a/oj/__init__.py b/oj/__init__.py index c19c079..d22a846 100644 --- a/oj/__init__.py +++ b/oj/__init__.py @@ -6,4 +6,9 @@ \___/ |_| |_||_||_||_| |_| \___| \___/ \__,_| \__,_| \__, | \___| |_.__/ \__, | \__, | \__,_| \__,_| |___/ |___/ |_| https://github.com/QingdaoU/OnlineJudge -""" \ No newline at end of file +""" +from __future__ import absolute_import + +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app diff --git a/oj/celery.py b/oj/celery.py new file mode 100644 index 0000000..2f53c0c --- /dev/null +++ b/oj/celery.py @@ -0,0 +1,17 @@ +from __future__ import absolute_import + +import os + +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'oj.settings') + +from django.conf import settings + +app = Celery('oj') + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) \ No newline at end of file diff --git a/oj/local_settings.py b/oj/local_settings.py index a08ae7e..965fd6b 100644 --- a/oj/local_settings.py +++ b/oj/local_settings.py @@ -22,6 +22,11 @@ REDIS_CACHE = { "db": 1 } +# for celery +REDIS_HOST = "localhost" +REDIS_PORT = 6379 +REDIS_DB = 0 + DEBUG = True ALLOWED_HOSTS = [] diff --git a/oj/server_settings.py b/oj/server_settings.py index bba27c6..f8942a9 100644 --- a/oj/server_settings.py +++ b/oj/server_settings.py @@ -31,6 +31,11 @@ REDIS_CACHE = { "db": 1 } +# for celery +REDIS_HOST = os.environ.get("REDIS_PORT_6379_TCP_ADDR", "127.0.0.1") +REDIS_PORT = 6379 +REDIS_DB = 0 + DEBUG = False ALLOWED_HOSTS = ['*'] diff --git a/oj/settings.py b/oj/settings.py index 83d49ef..13fb8f9 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -22,6 +22,15 @@ if ENV == "local": elif ENV == "server": from .server_settings import * +import djcelery +djcelery.setup_loader() + +BROKER_BACKEND = "redis" +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' + + BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -51,9 +60,12 @@ INSTALLED_APPS = ( 'mq', 'contest', 'mail', + 'judge', + 'judge_dispatcher', 'django_extensions', 'rest_framework', + 'djcelery', ) if DEBUG: diff --git a/submission/models.py b/submission/models.py index b28bd5a..660b6d8 100644 --- a/submission/models.py +++ b/submission/models.py @@ -1,7 +1,7 @@ # coding=utf-8 from django.db import models from utils.shortcuts import rand_str -from judge.judger.result import result +from judge.result import result class Submission(models.Model): diff --git a/submission/views.py b/submission/views.py index 1f2acb1..94043cf 100644 --- a/submission/views.py +++ b/submission/views.py @@ -7,7 +7,7 @@ from django.shortcuts import render from django.core.paginator import Paginator from rest_framework.views import APIView -from judge.judger_controller.tasks import judge +from judge_dispatcher.tasks import create_judge_task from account.decorators import login_required, super_admin_required from account.models import SUPER_ADMIN, User from problem.models import Problem @@ -23,8 +23,8 @@ from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, logger = logging.getLogger("app_info") -def _judge(submission_id, time_limit, memory_limit, test_case_id): - judge.delay(submission_id, time_limit, memory_limit, test_case_id) +def _judge(submission_id, code, language_code, time_limit, memory_limit, test_case_id): + create_judge_task.delay(submission_id, code, language_code, time_limit, memory_limit, test_case_id) get_cache_redis().incr("judge_queue_length") @@ -49,7 +49,7 @@ class SubmissionAPIView(APIView): problem_id=problem.id) try: - _judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission.id, submission.code, submission.language, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") @@ -94,7 +94,7 @@ class ContestSubmissionAPIView(APIView): code=data["code"], problem_id=problem.id) try: - _judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission.id, submission.code, submission.language, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") @@ -279,7 +279,7 @@ class SubmissionRejudgeAdminAPIView(APIView): except Problem.DoesNotExist: return error_response(u"题目不存在") try: - _judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission.id, submission.code, submission.language, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") From 89ceca862973d69165628c3d12df380658e92ef6 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 29 Nov 2015 22:02:46 +0800 Subject: [PATCH 05/24] =?UTF-8?q?=E5=88=A0=E9=99=A4=E4=BA=86=20mq=20?= =?UTF-8?q?=E7=9A=84=20app=EF=BC=8C=E5=B0=86=E4=BB=A3=E7=A0=81=E7=A7=BB?= =?UTF-8?q?=E5=85=A5=20rpc=20=E9=80=9A=E4=BF=A1=E7=9A=84=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E4=B8=AD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/tasks.py | 78 +++++++++++++++++++++++++++- mq/__init__.py | 1 - mq/models.py | 1 - mq/scripts/__init__.py | 1 - mq/scripts/mq.py | 104 -------------------------------------- oj/settings.py | 1 - 6 files changed, 77 insertions(+), 109 deletions(-) delete mode 100644 mq/__init__.py delete mode 100644 mq/models.py delete mode 100644 mq/scripts/__init__.py delete mode 100644 mq/scripts/mq.py diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index b5bfcd4..23670e8 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -1,11 +1,20 @@ # coding=utf-8 import json +import logging + +from django.db import transaction from celery import shared_task from rpc_client import TimeoutServerProxy from judge.result import result +from contest.models import ContestProblem, ContestRank, Contest, CONTEST_UNDERWAY +from problem.models import Problem from submission.models import Submission +from account.models import User +from utils.cache import get_cache_redis + +logger = logging.getLogger("app_info") @shared_task @@ -14,7 +23,6 @@ def create_judge_task(submission_id, code, language_code, time_limit, memory_lim try: s = TimeoutServerProxy('http://121.42.198.156:8080', timeout=20) data = s.run(submission_id, language_code, code, time_limit, memory_limit, test_case_id) - print data # 编译错误 if data["code"] == 1: submission.result = result["compile_error"] @@ -32,3 +40,71 @@ def create_judge_task(submission_id, code, language_code, time_limit, memory_lim submission.info = str(e) finally: submission.save() + + # 更新该用户的解题状态用 + try: + user = User.objects.get(pk=submission.user_id) + except User.DoesNotExist: + logger.warning("Submission user does not exist, submission_id: " + submission_id) + return + + if not submission.contest_id: + try: + problem = Problem.objects.get(id=submission.problem_id) + except Problem.DoesNotExist: + logger.warning("Submission problem does not exist, submission_id: " + submission_id) + return + + problems_status = user.problems_status + + # 更新普通题目的计数器 + problem.add_submission_number() + if "problems" not in problems_status: + problems_status["problems"] = {} + if submission.result == result["accepted"]: + problem.add_ac_number() + problems_status["problems"][str(problem.id)] = 1 + else: + problems_status["problems"][str(problem.id)] = 2 + user.problems_status = problems_status + user.save() + # 普通题目的话,到这里就结束了 + return + + # 能运行到这里的都是比赛题目 + try: + contest = Contest.objects.get(id=submission.contest_id) + if contest.status != CONTEST_UNDERWAY: + logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + submission_id) + return + contest_problem = ContestProblem.objects.get(contest=contest, id=submission.problem_id) + except Contest.DoesNotExist: + logger.warning("Submission contest does not exist, submission_id: " + submission_id) + return + except ContestProblem.DoesNotExist: + logger.warning("Submission problem does not exist, submission_id: " + submission_id) + return + + # 如果比赛现在不是封榜状态,删除比赛的排名缓存 + if contest.real_time_rank: + get_cache_redis().delete(str(contest.id) + "_rank_cache") + + with transaction.atomic(): + try: + contest_rank = ContestRank.objects.get(contest=contest, user=user) + contest_rank.update_rank(submission) + except ContestRank.DoesNotExist: + ContestRank.objects.create(contest=contest, user=user).update_rank(submission) + + problems_status = user.problems_status + + contest_problem.add_submission_number() + if "contest_problems" not in problems_status: + problems_status["contest_problems"] = {} + if submission.result == result["accepted"]: + contest_problem.add_ac_number() + problems_status["contest_problems"][str(contest_problem.id)] = 1 + else: + problems_status["contest_problems"][str(contest_problem.id)] = 0 + user.problems_status = problems_status + user.save() diff --git a/mq/__init__.py b/mq/__init__.py deleted file mode 100644 index 9bad579..0000000 --- a/mq/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/mq/models.py b/mq/models.py deleted file mode 100644 index 9bad579..0000000 --- a/mq/models.py +++ /dev/null @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/mq/scripts/__init__.py b/mq/scripts/__init__.py deleted file mode 100644 index 9bad579..0000000 --- a/mq/scripts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/mq/scripts/mq.py b/mq/scripts/mq.py deleted file mode 100644 index a3b15e7..0000000 --- a/mq/scripts/mq.py +++ /dev/null @@ -1,104 +0,0 @@ -# coding=utf-8 -import logging - -import redis - -from django.db import transaction - -from judge.judger_controller.settings import redis_config -from judge.judger.result import result -from submission.models import Submission -from problem.models import Problem -from utils.cache import get_cache_redis -from contest.models import ContestProblem, Contest, CONTEST_UNDERWAY, ContestRank -from account.models import User - -logger = logging.getLogger("app_info") - - -class MessageQueue(object): - def __init__(self): - self.conn = redis.StrictRedis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) - self.queue = 'queue' - - def listen_task(self): - while True: - submission_id = self.conn.blpop(self.queue, 0)[1] - logger.debug("receive submission_id: " + submission_id) - - try: - submission = Submission.objects.get(id=submission_id) - except Submission.DoesNotExist: - logger.warning("Submission does not exist, submission_id: " + submission_id) - continue - - # 更新该用户的解题状态用 - try: - user = User.objects.get(pk=submission.user_id) - except User.DoesNotExist: - logger.warning("Submission user does not exist, submission_id: " + submission_id) - continue - - if not submission.contest_id: - try: - problem = Problem.objects.get(id=submission.problem_id) - except Problem.DoesNotExist: - logger.warning("Submission problem does not exist, submission_id: " + submission_id) - continue - - problems_status = user.problems_status - - # 更新普通题目的计数器 - problem.add_submission_number() - if "problems" not in problems_status: - problems_status["problems"] = {} - if submission.result == result["accepted"]: - problem.add_ac_number() - problems_status["problems"][str(problem.id)] = 1 - else: - problems_status["problems"][str(problem.id)] = 2 - user.problems_status = problems_status - user.save() - # 普通题目的话,到这里就结束了 - continue - - # 能运行到这里的都是比赛题目 - try: - contest = Contest.objects.get(id=submission.contest_id) - if contest.status != CONTEST_UNDERWAY: - logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + submission_id) - continue - contest_problem = ContestProblem.objects.get(contest=contest, id=submission.problem_id) - except Contest.DoesNotExist: - logger.warning("Submission contest does not exist, submission_id: " + submission_id) - continue - except ContestProblem.DoesNotExist: - logger.warning("Submission problem does not exist, submission_id: " + submission_id) - continue - - # 如果比赛现在不是封榜状态,删除比赛的排名缓存 - if contest.real_time_rank: - get_cache_redis().delete(str(contest.id) + "_rank_cache") - - with transaction.atomic(): - try: - contest_rank = ContestRank.objects.get(contest=contest, user=user) - contest_rank.update_rank(submission) - except ContestRank.DoesNotExist: - ContestRank.objects.create(contest=contest, user=user).update_rank(submission) - - problems_status = user.problems_status - - contest_problem.add_submission_number() - if "contest_problems" not in problems_status: - problems_status["contest_problems"] = {} - if submission.result == result["accepted"]: - contest_problem.add_ac_number() - problems_status["contest_problems"][str(contest_problem.id)] = 1 - else: - problems_status["contest_problems"][str(contest_problem.id)] = 0 - user.problems_status = problems_status - user.save() - -logger.debug("Start message queue") -MessageQueue().listen_task() diff --git a/oj/settings.py b/oj/settings.py index 13fb8f9..54fefa7 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -57,7 +57,6 @@ INSTALLED_APPS = ( 'problem', 'admin', 'submission', - 'mq', 'contest', 'mail', 'judge', From 02a0a123725e43ba4a4c42d419a43dab8e23ba0d Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 6 Dec 2015 18:44:58 +0800 Subject: [PATCH 06/24] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=A4=E9=A2=98?= =?UTF-8?q?=E5=BC=80=E5=A7=8B=E6=97=B6=E9=97=B4=E5=92=8C=E7=BB=93=E6=9D=9F?= =?UTF-8?q?=E6=97=B6=E9=97=B4=EF=BC=8C=E4=BE=BF=E4=BA=8E=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- submission/models.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/submission/models.py b/submission/models.py index 660b6d8..a0db4b3 100644 --- a/submission/models.py +++ b/submission/models.py @@ -8,6 +8,10 @@ class Submission(models.Model): id = models.CharField(max_length=32, default=rand_str, primary_key=True, db_index=True) user_id = models.IntegerField(db_index=True) create_time = models.DateTimeField(auto_now_add=True) + # 判题开始时间 + judge_start_time = models.IntegerField(blank=True, null=True) + # 判题结束时间 + judge_end_time = models.IntegerField(blank=True, null=True) result = models.IntegerField(default=result["waiting"]) language = models.IntegerField() code = models.TextField() From 7661f99d36c85411124eb348d0556c792830d5ce Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 6 Dec 2015 18:45:21 +0800 Subject: [PATCH 07/24] =?UTF-8?q?=E5=B0=86=E4=BB=A5=E5=89=8D=E7=9A=84=20mq?= =?UTF-8?q?=20=E4=BD=BF=E7=94=A8=E7=B1=BB=E6=94=B9=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/tasks.py | 141 ++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 68 deletions(-) diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index 23670e8..4ad333d 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -1,6 +1,7 @@ # coding=utf-8 import json import logging +import time from django.db import transaction @@ -17,94 +18,98 @@ from utils.cache import get_cache_redis logger = logging.getLogger("app_info") -@shared_task -def create_judge_task(submission_id, code, language_code, time_limit, memory_limit, test_case_id): - submission = Submission.objects.get(id=submission_id) - try: - s = TimeoutServerProxy('http://121.42.198.156:8080', timeout=20) - data = s.run(submission_id, language_code, code, time_limit, memory_limit, test_case_id) - # 编译错误 - if data["code"] == 1: - submission.result = result["compile_error"] - submission.info = data["data"]["error"] - # system error - elif data["code"] == 2: - submission.result = result["system_error"] - submission.info = data["data"]["error"] - elif data["code"] == 0: - submission.result = data["data"]["result"] - submission.info = json.dumps(data["data"]["info"]) - submission.accepted_answer_time = data["data"]["accepted_answer_time"] - except Exception as e: - submission.result = result["system_error"] - submission.info = str(e) - finally: - submission.save() - - # 更新该用户的解题状态用 - try: - user = User.objects.get(pk=submission.user_id) - except User.DoesNotExist: - logger.warning("Submission user does not exist, submission_id: " + submission_id) - return - - if not submission.contest_id: +class JudgeDispatcher(object): + def __init__(self, submission, time_limit, memory_limit, test_case_id): + self.submission = submission + self.time_limit = time_limit + self.memory_limit = memory_limit + self.test_case_id = test_case_id + self.user = User.objects.get(id=submission.user_id) + + def choose_judge_server(self): + pass + + def judge(self): + self.submission.judge_start_time = int(time.time() * 1000) try: - problem = Problem.objects.get(id=submission.problem_id) - except Problem.DoesNotExist: - logger.warning("Submission problem does not exist, submission_id: " + submission_id) - return + judge_server = self.choose_judge_server() - problems_status = user.problems_status + s = TimeoutServerProxy(judge_server.ip + ":" + judge_server.port, timeout=20) + data = s.run(self.submission.id, self.submission.language_code, + self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) + # 编译错误 + if data["code"] == 1: + self.submission.result = result["compile_error"] + self.submission.info = data["data"]["error"] + # system error + elif data["code"] == 2: + self.submission.result = result["system_error"] + self.submission.info = data["data"]["error"] + elif data["code"] == 0: + self.submission.result = data["data"]["result"] + self.submission.info = json.dumps(data["data"]["info"]) + self.submission.accepted_answer_time = data["data"]["accepted_answer_time"] + except Exception as e: + self.submission.result = result["system_error"] + self.submission.info = str(e) + finally: + self.submission.judge_end_time = int(time.time() * 1000) + self.submission.save() + + if self.submission.contest_id: + self.update_contest_problem_status() + else: + self.update_problem_status() + + def update_problem_status(self): + problem = Problem.objects.get(id=self.submission.problem_id) # 更新普通题目的计数器 problem.add_submission_number() + + # 更新用户做题状态 + problems_status = self.user.problems_status if "problems" not in problems_status: problems_status["problems"] = {} - if submission.result == result["accepted"]: + if self.submission.result == result["accepted"]: problem.add_ac_number() problems_status["problems"][str(problem.id)] = 1 else: problems_status["problems"][str(problem.id)] = 2 - user.problems_status = problems_status - user.save() + self.user.problems_status = problems_status + self.user.save() # 普通题目的话,到这里就结束了 - return - # 能运行到这里的都是比赛题目 - try: - contest = Contest.objects.get(id=submission.contest_id) + def update_contest_problem_status(self): + # 能运行到这里的都是比赛题目 + contest = Contest.objects.get(id=self.submission.contest_id) if contest.status != CONTEST_UNDERWAY: - logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + submission_id) + logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + self.submission.id) return - contest_problem = ContestProblem.objects.get(contest=contest, id=submission.problem_id) - except Contest.DoesNotExist: - logger.warning("Submission contest does not exist, submission_id: " + submission_id) - return - except ContestProblem.DoesNotExist: - logger.warning("Submission problem does not exist, submission_id: " + submission_id) - return - - # 如果比赛现在不是封榜状态,删除比赛的排名缓存 - if contest.real_time_rank: - get_cache_redis().delete(str(contest.id) + "_rank_cache") - - with transaction.atomic(): - try: - contest_rank = ContestRank.objects.get(contest=contest, user=user) - contest_rank.update_rank(submission) - except ContestRank.DoesNotExist: - ContestRank.objects.create(contest=contest, user=user).update_rank(submission) - - problems_status = user.problems_status + contest_problem = ContestProblem.objects.get(contest=contest, id=self.submission.problem_id) contest_problem.add_submission_number() + + problems_status = self.user.problems_status if "contest_problems" not in problems_status: problems_status["contest_problems"] = {} - if submission.result == result["accepted"]: + if self.submission.result == result["accepted"]: contest_problem.add_ac_number() problems_status["contest_problems"][str(contest_problem.id)] = 1 else: problems_status["contest_problems"][str(contest_problem.id)] = 0 - user.problems_status = problems_status - user.save() + self.user.problems_status = problems_status + self.user.save() + + self.update_contest_rank(contest) + + def update_contest_rank(self, contest): + if contest.real_time_rank: + get_cache_redis().delete(str(contest.id) + "_rank_cache") + + with transaction.atomic(): + try: + contest_rank = ContestRank.objects.get(contest=contest, user=self.user) + contest_rank.update_rank(self.submission) + except ContestRank.DoesNotExist: + ContestRank.objects.create(contest=contest, user=self.user).update_rank(self.submission) From b542d7c5c512d6b3daff8ac8762d015626b8e668 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Sun, 6 Dec 2015 18:45:53 +0800 Subject: [PATCH 08/24] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=A4=E9=A2=98?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E8=AE=A4=E8=AF=81=20token=E5=92=8C?= =?UTF-8?q?=E9=94=81=EF=BC=9B=E5=A2=9E=E5=8A=A0=E5=88=A4=E9=A2=98=E7=AD=89?= =?UTF-8?q?=E5=BE=85=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/models.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 67401c2..20fbc1b 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -8,8 +8,19 @@ class JudgeServer(models.Model): # 这个服务器最大可能运行的判题实例数量 max_instance_number = models.IntegerField() left_instance_number = models.IntegerField() + token = models.CharField(max_length=30) + # 进行测试用例同步的时候加锁 + lock = models.BooleanField(default=False) # status 为 false 的时候代表不使用这个服务器 status = models.BooleanField(default=True) class Meta: db_table = "judge_server" + + +class JudgeWaitingQueue(models.Model): + submission_id = models.CharField(max_length=40) + create_time = models.DateTimeField(auto_now_add=True) + + class Meta: + db_table = "judge_waiting_queue" From e85c5b6b4a1c97db5c378378f2d3959ae66520f8 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 16:54:29 +0800 Subject: [PATCH 09/24] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=20migration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/migrations/0001_initial.py | 41 +++++++++++++++++++ judge_dispatcher/migrations/__init__.py | 0 .../migrations/0007_auto_20151207_1645.py | 24 +++++++++++ 3 files changed, 65 insertions(+) create mode 100644 judge_dispatcher/migrations/0001_initial.py create mode 100644 judge_dispatcher/migrations/__init__.py create mode 100644 submission/migrations/0007_auto_20151207_1645.py diff --git a/judge_dispatcher/migrations/0001_initial.py b/judge_dispatcher/migrations/0001_initial.py new file mode 100644 index 0000000..e85330e --- /dev/null +++ b/judge_dispatcher/migrations/0001_initial.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='JudgeServer', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('ip', models.GenericIPAddressField()), + ('port', models.IntegerField()), + ('max_instance_number', models.IntegerField()), + ('left_instance_number', models.IntegerField()), + ('workload', models.IntegerField(default=0)), + ('token', models.CharField(max_length=30)), + ('lock', models.BooleanField(default=False)), + ('status', models.BooleanField(default=True)), + ], + options={ + 'db_table': 'judge_server', + }, + ), + migrations.CreateModel( + name='JudgeWaitingQueue', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('submission_id', models.CharField(max_length=40)), + ('create_time', models.DateTimeField(auto_now_add=True)), + ], + options={ + 'db_table': 'judge_waiting_queue', + }, + ), + ] diff --git a/judge_dispatcher/migrations/__init__.py b/judge_dispatcher/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/submission/migrations/0007_auto_20151207_1645.py b/submission/migrations/0007_auto_20151207_1645.py new file mode 100644 index 0000000..a053d98 --- /dev/null +++ b/submission/migrations/0007_auto_20151207_1645.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('submission', '0006_submission_shared'), + ] + + operations = [ + migrations.AddField( + model_name='submission', + name='judge_end_time', + field=models.IntegerField(null=True, blank=True), + ), + migrations.AddField( + model_name='submission', + name='judge_start_time', + field=models.IntegerField(null=True, blank=True), + ), + ] From df0d69ae31b4c1126cabf69af5b484d8fd569519 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 16:54:49 +0800 Subject: [PATCH 10/24] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E5=88=A4=E9=A2=98?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E8=B0=83=E7=94=A8=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/models.py | 6 ++++++ judge_dispatcher/tasks.py | 10 ++++++++-- submission/views.py | 13 ++++++------- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 20fbc1b..68ef60d 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -8,12 +8,18 @@ class JudgeServer(models.Model): # 这个服务器最大可能运行的判题实例数量 max_instance_number = models.IntegerField() left_instance_number = models.IntegerField() + workload = models.IntegerField(default=0) token = models.CharField(max_length=30) # 进行测试用例同步的时候加锁 lock = models.BooleanField(default=False) # status 为 false 的时候代表不使用这个服务器 status = models.BooleanField(default=True) + def use_judge_instance(self): + self.left_instance_number -= 1 + self.workload = 100 - int(self.left_instance_number / self.max_instance_number) + self.save() + class Meta: db_table = "judge_server" diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index 4ad333d..68e26ce 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -5,7 +5,6 @@ import time from django.db import transaction -from celery import shared_task from rpc_client import TimeoutServerProxy from judge.result import result @@ -14,6 +13,7 @@ from problem.models import Problem from submission.models import Submission from account.models import User from utils.cache import get_cache_redis +from .models import JudgeServer,JudgeWaitingQueue logger = logging.getLogger("app_info") @@ -27,12 +27,18 @@ class JudgeDispatcher(object): self.user = User.objects.get(id=submission.user_id) def choose_judge_server(self): - pass + servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload") + if servers.exists(): + return servers[0] def judge(self): self.submission.judge_start_time = int(time.time() * 1000) try: judge_server = self.choose_judge_server() + # 如果没有合适的判题服务器,就放入等待队列中等待判题 + if not judge_server: + JudgeWaitingQueue.objects.create(submission_id=self.submission.id) + return s = TimeoutServerProxy(judge_server.ip + ":" + judge_server.port, timeout=20) data = s.run(self.submission.id, self.submission.language_code, diff --git a/submission/views.py b/submission/views.py index 94043cf..3a54bc9 100644 --- a/submission/views.py +++ b/submission/views.py @@ -7,7 +7,7 @@ from django.shortcuts import render from django.core.paginator import Paginator from rest_framework.views import APIView -from judge_dispatcher.tasks import create_judge_task +from judge_dispatcher.tasks import JudgeDispatcher from account.decorators import login_required, super_admin_required from account.models import SUPER_ADMIN, User from problem.models import Problem @@ -23,9 +23,8 @@ from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, logger = logging.getLogger("app_info") -def _judge(submission_id, code, language_code, time_limit, memory_limit, test_case_id): - create_judge_task.delay(submission_id, code, language_code, time_limit, memory_limit, test_case_id) - get_cache_redis().incr("judge_queue_length") +def _judge(submission, time_limit, memory_limit, test_case_id): + JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge() class SubmissionAPIView(APIView): @@ -49,7 +48,7 @@ class SubmissionAPIView(APIView): problem_id=problem.id) try: - _judge(submission.id, submission.code, submission.language, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") @@ -94,7 +93,7 @@ class ContestSubmissionAPIView(APIView): code=data["code"], problem_id=problem.id) try: - _judge(submission.id, submission.code, submission.language, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") @@ -279,7 +278,7 @@ class SubmissionRejudgeAdminAPIView(APIView): except Problem.DoesNotExist: return error_response(u"题目不存在") try: - _judge(submission.id, submission.code, submission.language, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") From d61dae9be063e843975d8e9cca713663e93e63ec Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 17:18:55 +0800 Subject: [PATCH 11/24] =?UTF-8?q?fix=20typo=EF=BC=8C=E5=BA=94=E8=AF=A5?= =?UTF-8?q?=E6=98=AF=20http=20=E5=BC=80=E5=A4=B4=E7=9A=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index 68e26ce..9e20cc8 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -40,8 +40,9 @@ class JudgeDispatcher(object): JudgeWaitingQueue.objects.create(submission_id=self.submission.id) return - s = TimeoutServerProxy(judge_server.ip + ":" + judge_server.port, timeout=20) - data = s.run(self.submission.id, self.submission.language_code, + s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20) + + data = s.run(self.submission.id, self.submission.language, self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) # 编译错误 if data["code"] == 1: From efee635173c3cb6e6e22b7140675dc38017c3164 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 17:35:03 +0800 Subject: [PATCH 12/24] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=A4=E9=A2=98?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=20token=20=E9=AA=8C=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge/runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/judge/runner.py b/judge/runner.py index 9eb78b0..5ca5c20 100644 --- a/judge/runner.py +++ b/judge/runner.py @@ -11,14 +11,15 @@ from settings import judger_workspace class JudgeInstanceRunner(object): - def __init__(self): - pass - def run(self, submission_id, language_code, code, time_limit, memory_limit, test_case_id): + def run(self, token, submission_id, language_code, code, time_limit, memory_limit, test_case_id): language = languages[language_code] host_name = socket.gethostname() judge_base_path = os.path.join(judger_workspace, "run", submission_id) + if not token or token != os.environ.get("rpc_token"): + return {"code": 2, "data": {"error": "Invalid token", "server": host_name}} + try: os.mkdir(judge_base_path) os.chmod(judge_base_path, 0777) From f1449962e4c450e87a164274020a234385783664 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 17:35:28 +0800 Subject: [PATCH 13/24] =?UTF-8?q?=E5=90=91=E5=88=A4=E9=A2=98=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=99=A8=E4=BC=A0=E9=80=92=E6=8E=88=E6=9D=83=20token?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index 9e20cc8..ba40707 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -42,7 +42,7 @@ class JudgeDispatcher(object): s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20) - data = s.run(self.submission.id, self.submission.language, + data = s.run(judge_server.token, self.submission.id, self.submission.language, self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) # 编译错误 if data["code"] == 1: From 43e8ec2d90a3f14b3da385bf679c3287f837ab71 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 19:15:28 +0800 Subject: [PATCH 14/24] =?UTF-8?q?=E5=88=A0=E9=99=A4=20celery=20=E4=BE=9D?= =?UTF-8?q?=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dockerfiles/oj_web_server/requirements.txt | 5 +++-- oj/__init__.py | 5 ----- oj/celery.py | 17 ----------------- oj/local_settings.py | 9 +++++---- oj/server_settings.py | 10 ++++++---- oj/settings.py | 6 +----- runJudge.sh | 1 - 7 files changed, 15 insertions(+), 38 deletions(-) delete mode 100644 oj/celery.py delete mode 100755 runJudge.sh diff --git a/dockerfiles/oj_web_server/requirements.txt b/dockerfiles/oj_web_server/requirements.txt index 7ad8718..a0582be 100644 --- a/dockerfiles/oj_web_server/requirements.txt +++ b/dockerfiles/oj_web_server/requirements.txt @@ -4,11 +4,12 @@ redis django-redis-sessions djangorestframework django-rest-swagger -celery gunicorn coverage django-extensions supervisor pillow jsonfield -Envelopes \ No newline at end of file +Envelopes +rq +django-rq \ No newline at end of file diff --git a/oj/__init__.py b/oj/__init__.py index d22a846..f420637 100644 --- a/oj/__init__.py +++ b/oj/__init__.py @@ -7,8 +7,3 @@ |___/ |___/ |_| https://github.com/QingdaoU/OnlineJudge """ -from __future__ import absolute_import - -# This will make sure the app is always imported when -# Django starts so that shared_task will use this app. -from .celery import app as celery_app diff --git a/oj/celery.py b/oj/celery.py deleted file mode 100644 index 2f53c0c..0000000 --- a/oj/celery.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import absolute_import - -import os - -from celery import Celery - -# set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'oj.settings') - -from django.conf import settings - -app = Celery('oj') - -# Using a string here means the worker will not have to -# pickle the object when using Windows. -app.config_from_object('django.conf:settings') -app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) \ No newline at end of file diff --git a/oj/local_settings.py b/oj/local_settings.py index 965fd6b..82bfe2e 100644 --- a/oj/local_settings.py +++ b/oj/local_settings.py @@ -22,10 +22,11 @@ REDIS_CACHE = { "db": 1 } -# for celery -REDIS_HOST = "localhost" -REDIS_PORT = 6379 -REDIS_DB = 0 +REDIS_QUEUE = { + "host": "127.0.0.1", + "port": 6379, + "db": 2 +} DEBUG = True diff --git a/oj/server_settings.py b/oj/server_settings.py index f8942a9..cd86c16 100644 --- a/oj/server_settings.py +++ b/oj/server_settings.py @@ -31,10 +31,12 @@ REDIS_CACHE = { "db": 1 } -# for celery -REDIS_HOST = os.environ.get("REDIS_PORT_6379_TCP_ADDR", "127.0.0.1") -REDIS_PORT = 6379 -REDIS_DB = 0 +REDIS_QUEUE = { + "host": os.environ.get("REDIS_PORT_6379_TCP_ADDR", "127.0.0.1"), + "port": 6379, + "db": 2 +} + DEBUG = False diff --git a/oj/settings.py b/oj/settings.py index 54fefa7..65462e6 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -22,9 +22,6 @@ if ENV == "local": elif ENV == "server": from .server_settings import * -import djcelery -djcelery.setup_loader() - BROKER_BACKEND = "redis" CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' @@ -63,8 +60,7 @@ INSTALLED_APPS = ( 'judge_dispatcher', 'django_extensions', - 'rest_framework', - 'djcelery', + 'rest_framework' ) if DEBUG: diff --git a/runJudge.sh b/runJudge.sh deleted file mode 100755 index ba25da8..0000000 --- a/runJudge.sh +++ /dev/null @@ -1 +0,0 @@ -nohup celery -A judge.judger_controller worker -l DEBUG & From bf6a42b5b115565cb9d9d52376cfe8c1c5b2336a Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 20:07:52 +0800 Subject: [PATCH 15/24] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20rq=20=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- oj/settings.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/oj/settings.py b/oj/settings.py index 65462e6..02b0816 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -60,7 +60,8 @@ INSTALLED_APPS = ( 'judge_dispatcher', 'django_extensions', - 'rest_framework' + 'rest_framework', + 'django_rq', ) if DEBUG: @@ -193,3 +194,18 @@ IMAGE_UPLOAD_DIR = os.path.join(BASE_DIR, 'upload/') WEBSITE_INFO = {"website_name": "qduoj", "website_footer": u"青岛大学信息工程学院 创新实验室", "url": "https://qduoj.com"} + +RQ_QUEUES = { + 'judge': { + 'HOST': REDIS_QUEUE["host"], + 'PORT': REDIS_QUEUE["port"], + 'DB': 2, + 'DEFAULT_TIMEOUT': 60, + }, + 'mail': { + 'HOST': REDIS_QUEUE["host"], + 'PORT': REDIS_QUEUE["port"], + 'DB': 3, + 'DEFAULT_TIMEOUT': 60, + } +} \ No newline at end of file From 2f557994574a352699b5a9e51acace717e009b22 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 21:19:55 +0800 Subject: [PATCH 16/24] =?UTF-8?q?=E4=BD=BF=E7=94=A8=20huey=20=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E9=98=9F=E5=88=97=E3=80=82=20ps=20=E7=9C=9F=E6=83=B3?= =?UTF-8?q?=E8=87=AA=E5=B7=B1=E5=86=99=E4=B8=80=E4=B8=AA=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E9=98=9F=E5=88=97=EF=BC=8Crq=20=E4=B8=8D=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=A4=9A=20worker=EF=BC=8Ccelery=20=E5=A4=AA=E5=A4=8D=E6=9D=82?= =?UTF-8?q?=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dockerfiles/oj_web_server/requirements.txt | 3 +-- oj/settings.py | 22 ++++++++-------------- submission/tasks.py | 9 +++++++++ submission/views.py | 7 +------ 4 files changed, 19 insertions(+), 22 deletions(-) create mode 100644 submission/tasks.py diff --git a/dockerfiles/oj_web_server/requirements.txt b/dockerfiles/oj_web_server/requirements.txt index a0582be..56676a9 100644 --- a/dockerfiles/oj_web_server/requirements.txt +++ b/dockerfiles/oj_web_server/requirements.txt @@ -11,5 +11,4 @@ supervisor pillow jsonfield Envelopes -rq -django-rq \ No newline at end of file +huey \ No newline at end of file diff --git a/oj/settings.py b/oj/settings.py index 02b0816..391c8b0 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -61,7 +61,7 @@ INSTALLED_APPS = ( 'django_extensions', 'rest_framework', - 'django_rq', + 'huey.djhuey', ) if DEBUG: @@ -195,17 +195,11 @@ WEBSITE_INFO = {"website_name": "qduoj", "website_footer": u"青岛大学信息工程学院 创新实验室", "url": "https://qduoj.com"} -RQ_QUEUES = { - 'judge': { - 'HOST': REDIS_QUEUE["host"], - 'PORT': REDIS_QUEUE["port"], - 'DB': 2, - 'DEFAULT_TIMEOUT': 60, - }, - 'mail': { - 'HOST': REDIS_QUEUE["host"], - 'PORT': REDIS_QUEUE["port"], - 'DB': 3, - 'DEFAULT_TIMEOUT': 60, - } +HUEY = { + 'backend': 'huey.backends.redis_backend', + 'name': 'task_queue', + 'connection': {'host': REDIS_QUEUE["host"], 'port': REDIS_QUEUE["port"], 'db': REDIS_QUEUE["db"]}, + 'always_eager': False, # Defaults to False when running via manage.py run_huey + # Options to pass into the consumer when running ``manage.py run_huey`` + 'consumer_options': {'workers': 50}, } \ No newline at end of file diff --git a/submission/tasks.py b/submission/tasks.py new file mode 100644 index 0000000..67719c4 --- /dev/null +++ b/submission/tasks.py @@ -0,0 +1,9 @@ +# coding=utf-8 +from huey.djhuey import task + +from judge_dispatcher.tasks import JudgeDispatcher + + +@task() +def _judge(submission, time_limit, memory_limit, test_case_id): + JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge() \ No newline at end of file diff --git a/submission/views.py b/submission/views.py index 3a54bc9..6a16eaf 100644 --- a/submission/views.py +++ b/submission/views.py @@ -7,14 +7,13 @@ from django.shortcuts import render from django.core.paginator import Paginator from rest_framework.views import APIView -from judge_dispatcher.tasks import JudgeDispatcher from account.decorators import login_required, super_admin_required from account.models import SUPER_ADMIN, User from problem.models import Problem from contest.models import ContestProblem, Contest from contest.decorators import check_user_contest_permission from utils.shortcuts import serializer_invalid_response, error_response, success_response, error_page, paginate -from utils.cache import get_cache_redis +from .task import _judge from .models import Submission from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, SubmissionhareSerializer, SubmissionRejudgeSerializer, @@ -23,10 +22,6 @@ from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, logger = logging.getLogger("app_info") -def _judge(submission, time_limit, memory_limit, test_case_id): - JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge() - - class SubmissionAPIView(APIView): @login_required def post(self, request): From 4ef9f9f01b93446c3c10b4e7529d00fb71c147d6 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 21:39:13 +0800 Subject: [PATCH 17/24] =?UTF-8?q?=E4=B8=BA=E5=BC=82=E6=AD=A5=E9=98=9F?= =?UTF-8?q?=E5=88=97=E4=BF=AE=E6=94=B9=20supervisor=20=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dockerfiles/oj_web_server/supervisord.conf | 2 +- dockerfiles/oj_web_server/{mq.conf => task_queue.conf} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename dockerfiles/oj_web_server/{mq.conf => task_queue.conf} (100%) diff --git a/dockerfiles/oj_web_server/supervisord.conf b/dockerfiles/oj_web_server/supervisord.conf index 420b65f..dec4bf4 100644 --- a/dockerfiles/oj_web_server/supervisord.conf +++ b/dockerfiles/oj_web_server/supervisord.conf @@ -23,4 +23,4 @@ serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets. [include] -files=gunicorn.conf mq.conf \ No newline at end of file +files=gunicorn.conf task_queue.conf \ No newline at end of file diff --git a/dockerfiles/oj_web_server/mq.conf b/dockerfiles/oj_web_server/task_queue.conf similarity index 100% rename from dockerfiles/oj_web_server/mq.conf rename to dockerfiles/oj_web_server/task_queue.conf From 6bce16b8530d4c6dfe5606164aba01f49f38e540 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 21:39:58 +0800 Subject: [PATCH 18/24] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E6=96=87=E4=BB=B6=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dockerfiles/oj_web_server/task_queue.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dockerfiles/oj_web_server/task_queue.conf b/dockerfiles/oj_web_server/task_queue.conf index ae1797c..39f837e 100644 --- a/dockerfiles/oj_web_server/task_queue.conf +++ b/dockerfiles/oj_web_server/task_queue.conf @@ -1,12 +1,12 @@ [program:mq] -command=python manage.py runscript mq +command=python manage.py run_huey directory=/code/ user=root numprocs=1 -stdout_logfile=/code/log/mq.log -stderr_logfile=/code/log/mq.log +stdout_logfile=/code/log/task_queue.log +stderr_logfile=/code/log/task_queue.log autostart=true autorestart=true startsecs=5 From 1337b26d504dea52b2a1c84dad36f9a61b2430ff Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Mon, 7 Dec 2015 23:20:27 +0800 Subject: [PATCH 19/24] =?UTF-8?q?=E6=AF=8F=E6=AC=A1=E6=9C=89=E5=88=A4?= =?UTF-8?q?=E9=A2=98=E4=BB=BB=E5=8A=A1=E5=AE=8C=E6=88=90=E7=9A=84=E6=97=B6?= =?UTF-8?q?=E5=80=99=EF=BC=8C=E9=80=92=E5=BD=92=E8=B0=83=E7=94=A8=E8=87=AA?= =?UTF-8?q?=E5=B7=B1=E5=88=A4=E6=96=AD=E8=BF=98=E6=9C=89=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E7=9A=84=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../migrations/0002_auto_20151207_2310.py | 32 +++++++++++++++++ judge_dispatcher/models.py | 12 ++++++- judge_dispatcher/tasks.py | 35 ++++++++++++++----- submission/tasks.py | 4 +-- submission/views.py | 2 +- 5 files changed, 73 insertions(+), 12 deletions(-) create mode 100644 judge_dispatcher/migrations/0002_auto_20151207_2310.py diff --git a/judge_dispatcher/migrations/0002_auto_20151207_2310.py b/judge_dispatcher/migrations/0002_auto_20151207_2310.py new file mode 100644 index 0000000..1597919 --- /dev/null +++ b/judge_dispatcher/migrations/0002_auto_20151207_2310.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('judge_dispatcher', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='judgewaitingqueue', + name='memory_limit', + field=models.IntegerField(default=1), + preserve_default=False, + ), + migrations.AddField( + model_name='judgewaitingqueue', + name='test_case_id', + field=models.CharField(default=1, max_length=40), + preserve_default=False, + ), + migrations.AddField( + model_name='judgewaitingqueue', + name='time_limit', + field=models.IntegerField(default=1), + preserve_default=False, + ), + ] diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 68ef60d..4e75bca 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -17,7 +17,14 @@ class JudgeServer(models.Model): def use_judge_instance(self): self.left_instance_number -= 1 - self.workload = 100 - int(self.left_instance_number / self.max_instance_number) + self.workload = 100 - int(float(self.left_instance_number) / self.max_instance_number * 100) + print self.left_instance_number, self.workload + self.save() + + def release_judge_instance(self): + self.left_instance_number += 1 + self.workload = 100 - int(float(self.left_instance_number) / self.max_instance_number * 100) + print self.left_instance_number, self.workload self.save() class Meta: @@ -26,6 +33,9 @@ class JudgeServer(models.Model): class JudgeWaitingQueue(models.Model): submission_id = models.CharField(max_length=40) + time_limit = models.IntegerField() + memory_limit = models.IntegerField() + test_case_id = models.CharField(max_length=40) create_time = models.DateTimeField(auto_now_add=True) class Meta: diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index ba40707..38d9ef1 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -13,7 +13,8 @@ from problem.models import Problem from submission.models import Submission from account.models import User from utils.cache import get_cache_redis -from .models import JudgeServer,JudgeWaitingQueue + +from .models import JudgeServer, JudgeWaitingQueue logger = logging.getLogger("app_info") @@ -25,25 +26,31 @@ class JudgeDispatcher(object): self.memory_limit = memory_limit self.test_case_id = test_case_id self.user = User.objects.get(id=submission.user_id) - + def choose_judge_server(self): servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload") if servers.exists(): - return servers[0] - - def judge(self): + server = servers[0] + server.use_judge_instance() + return server + + def judge(self, is_waiting_task=False): self.submission.judge_start_time = int(time.time() * 1000) try: judge_server = self.choose_judge_server() # 如果没有合适的判题服务器,就放入等待队列中等待判题 if not judge_server: - JudgeWaitingQueue.objects.create(submission_id=self.submission.id) + if not is_waiting_task: + JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, + memory_limit=self.memory_limit, test_case_id=self.test_case_id) return s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20) data = s.run(judge_server.token, self.submission.id, self.submission.language, self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) + + judge_server.release_judge_instance() # 编译错误 if data["code"] == 1: self.submission.result = result["compile_error"] @@ -67,7 +74,19 @@ class JudgeDispatcher(object): self.update_contest_problem_status() else: self.update_problem_status() - + + if is_waiting_task: + JudgeWaitingQueue.objects.filter(submission_id=self.submission.id).delete() + + waiting_submissions = JudgeWaitingQueue.objects.all() + if waiting_submissions.exists(): + submission = waiting_submissions.first() + # 防止循环依赖 + from submission.tasks import _judge + _judge(Submission.objects.get(id=submission.submission_id), time_limit=submission.time_limit, + memory_limit=submission.memory_limit, test_case_id=submission.test_case_id, + is_waiting_task=True) + def update_problem_status(self): problem = Problem.objects.get(id=self.submission.problem_id) @@ -109,7 +128,7 @@ class JudgeDispatcher(object): self.user.save() self.update_contest_rank(contest) - + def update_contest_rank(self, contest): if contest.real_time_rank: get_cache_redis().delete(str(contest.id) + "_rank_cache") diff --git a/submission/tasks.py b/submission/tasks.py index 67719c4..25476e3 100644 --- a/submission/tasks.py +++ b/submission/tasks.py @@ -5,5 +5,5 @@ from judge_dispatcher.tasks import JudgeDispatcher @task() -def _judge(submission, time_limit, memory_limit, test_case_id): - JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge() \ No newline at end of file +def _judge(submission, time_limit, memory_limit, test_case_id, is_waiting_task=False): + JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge(is_waiting_task) \ No newline at end of file diff --git a/submission/views.py b/submission/views.py index 6a16eaf..ccda8ac 100644 --- a/submission/views.py +++ b/submission/views.py @@ -13,7 +13,7 @@ from problem.models import Problem from contest.models import ContestProblem, Contest from contest.decorators import check_user_contest_permission from utils.shortcuts import serializer_invalid_response, error_response, success_response, error_page, paginate -from .task import _judge +from .tasks import _judge from .models import Submission from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, SubmissionhareSerializer, SubmissionRejudgeSerializer, From 124a402ade3fd37e1e3218c1bce6c463c48e5886 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Tue, 8 Dec 2015 14:51:50 +0800 Subject: [PATCH 20/24] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=AB=9E=E6=80=81?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6=E5=AF=BC=E8=87=B4=E7=9A=84=E8=AE=A1=E6=95=B0?= =?UTF-8?q?=E5=99=A8=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/models.py | 20 ++++++++++++-------- judge_dispatcher/tasks.py | 27 +++++++++++++++------------ 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 4e75bca..6b2a4de 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -16,16 +16,20 @@ class JudgeServer(models.Model): status = models.BooleanField(default=True) def use_judge_instance(self): - self.left_instance_number -= 1 - self.workload = 100 - int(float(self.left_instance_number) / self.max_instance_number * 100) - print self.left_instance_number, self.workload - self.save() + # 因为use 和 release 中间是判题时间,可能这个 model 的数据已经被修改了,所以不能直接使用self.xxx,否则取到的是旧数据 + server = JudgeServer.objects.select_for_update().get(id=self.id) + server.left_instance_number -= 1 + server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) + print "\n ---- use", server.left_instance_number, server.workload + server.save() def release_judge_instance(self): - self.left_instance_number += 1 - self.workload = 100 - int(float(self.left_instance_number) / self.max_instance_number * 100) - print self.left_instance_number, self.workload - self.save() + # 使用原子操作 + server = JudgeServer.objects.select_for_update().get(id=self.id) + server.left_instance_number += 1 + server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) + print "\n ---- release", server.left_instance_number, server.workload + server.save() class Meta: db_table = "judge_server" diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index 38d9ef1..c487eef 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -26,31 +26,32 @@ class JudgeDispatcher(object): self.memory_limit = memory_limit self.test_case_id = test_case_id self.user = User.objects.get(id=submission.user_id) + print "init init" def choose_judge_server(self): servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload") if servers.exists(): - server = servers[0] - server.use_judge_instance() - return server + return servers.first() def judge(self, is_waiting_task=False): self.submission.judge_start_time = int(time.time() * 1000) - try: - judge_server = self.choose_judge_server() - # 如果没有合适的判题服务器,就放入等待队列中等待判题 - if not judge_server: - if not is_waiting_task: - JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, - memory_limit=self.memory_limit, test_case_id=self.test_case_id) - return + judge_server = self.choose_judge_server() + # 如果没有合适的判题服务器,就放入等待队列中等待判题 + if not judge_server: + if not is_waiting_task: + JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, + memory_limit=self.memory_limit, test_case_id=self.test_case_id) + return + + judge_server.use_judge_instance() + + try: s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20) data = s.run(judge_server.token, self.submission.id, self.submission.language, self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) - judge_server.release_judge_instance() # 编译错误 if data["code"] == 1: self.submission.result = result["compile_error"] @@ -67,6 +68,8 @@ class JudgeDispatcher(object): self.submission.result = result["system_error"] self.submission.info = str(e) finally: + judge_server.release_judge_instance() + self.submission.judge_end_time = int(time.time() * 1000) self.submission.save() From cb64ece6ee3237129bb7815649cd67b9413c2926 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Tue, 8 Dec 2015 19:04:31 +0800 Subject: [PATCH 21/24] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=83=A8=E5=88=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/tasks.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index c487eef..f9920e6 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -39,9 +39,9 @@ class JudgeDispatcher(object): # 如果没有合适的判题服务器,就放入等待队列中等待判题 if not judge_server: - if not is_waiting_task: - JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, - memory_limit=self.memory_limit, test_case_id=self.test_case_id) + print self.submission.id, "waiting" + JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, + memory_limit=self.memory_limit, test_case_id=self.test_case_id) return judge_server.use_judge_instance() @@ -51,6 +51,7 @@ class JudgeDispatcher(object): data = s.run(judge_server.token, self.submission.id, self.submission.language, self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) + print self.submission.id, "finished" # 编译错误 if data["code"] == 1: @@ -78,18 +79,18 @@ class JudgeDispatcher(object): else: self.update_problem_status() - if is_waiting_task: - JudgeWaitingQueue.objects.filter(submission_id=self.submission.id).delete() - waiting_submissions = JudgeWaitingQueue.objects.all() if waiting_submissions.exists(): - submission = waiting_submissions.first() # 防止循环依赖 from submission.tasks import _judge - _judge(Submission.objects.get(id=submission.submission_id), time_limit=submission.time_limit, - memory_limit=submission.memory_limit, test_case_id=submission.test_case_id, + waiting_submission = waiting_submissions.first() + print self.submission.id, "left queue" + _judge(Submission.objects.get(id=waiting_submission.submission_id), time_limit=waiting_submission.time_limit, + memory_limit=waiting_submission.memory_limit, test_case_id=waiting_submission.test_case_id, is_waiting_task=True) + waiting_submission.delete() + def update_problem_status(self): problem = Problem.objects.get(id=self.submission.problem_id) From bd5caa8f280792c5719f902c6cd9932d5beb19b6 Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Tue, 8 Dec 2015 19:04:56 +0800 Subject: [PATCH 22/24] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=EF=BC=9B=E5=88=A0=E9=99=A4=E4=B8=8D=E7=94=A8?= =?UTF-8?q?=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- account/models.py | 1 - oj/settings.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/account/models.py b/account/models.py index e08ff6c..2988add 100644 --- a/account/models.py +++ b/account/models.py @@ -71,6 +71,5 @@ class UserProfile(models.Model): phone_number = models.CharField(max_length=15, blank=True, null=True) school = models.CharField(max_length=200, blank=True, null=True) - class Meta: db_table = "user_profile" diff --git a/oj/settings.py b/oj/settings.py index 391c8b0..153c1af 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -59,7 +59,6 @@ INSTALLED_APPS = ( 'judge', 'judge_dispatcher', - 'django_extensions', 'rest_framework', 'huey.djhuey', ) @@ -106,7 +105,7 @@ WSGI_APPLICATION = 'oj.wsgi.application' # Internationalization # https://docs.djangoproject.com/en/1.8/topics/i18n/ -LANGUAGE_CODE = 'zh-cn' +LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' From 3afdc1a58b98b8a22d2f78f9caff2709d292086d Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Tue, 8 Dec 2015 22:39:26 +0800 Subject: [PATCH 23/24] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20MySQL=20=E4=B8=8A?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E6=88=B3=E5=AD=97=E6=AE=B5=E6=BA=A2=E5=87=BA?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../migrations/0008_auto_20151208_2106.py | 25 +++++++++++++++++++ submission/models.py | 7 ++++-- 2 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 submission/migrations/0008_auto_20151208_2106.py diff --git a/submission/migrations/0008_auto_20151208_2106.py b/submission/migrations/0008_auto_20151208_2106.py new file mode 100644 index 0000000..b3a8576 --- /dev/null +++ b/submission/migrations/0008_auto_20151208_2106.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9 on 2015-12-08 13:06 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('submission', '0007_auto_20151207_1645'), + ] + + operations = [ + migrations.AlterField( + model_name='submission', + name='judge_end_time', + field=models.BigIntegerField(blank=True, null=True), + ), + migrations.AlterField( + model_name='submission', + name='judge_start_time', + field=models.BigIntegerField(blank=True, null=True), + ), + ] diff --git a/submission/models.py b/submission/models.py index a0db4b3..fc1a070 100644 --- a/submission/models.py +++ b/submission/models.py @@ -9,9 +9,9 @@ class Submission(models.Model): user_id = models.IntegerField(db_index=True) create_time = models.DateTimeField(auto_now_add=True) # 判题开始时间 - judge_start_time = models.IntegerField(blank=True, null=True) + judge_start_time = models.BigIntegerField(blank=True, null=True) # 判题结束时间 - judge_end_time = models.IntegerField(blank=True, null=True) + judge_end_time = models.BigIntegerField(blank=True, null=True) result = models.IntegerField(default=result["waiting"]) language = models.IntegerField() code = models.TextField() @@ -28,3 +28,6 @@ class Submission(models.Model): class Meta: db_table = "submission" + + def __unicode__(self): + return self.id From 89cb788d0a8fc6a4810491ffa5656f4da9c9349a Mon Sep 17 00:00:00 2001 From: virusdefender <1670873886@qq.com> Date: Tue, 8 Dec 2015 22:49:05 +0800 Subject: [PATCH 24/24] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E5=A4=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=B2=A1=E6=9C=89=E5=8A=A0=E9=94=81?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E7=9A=84=E9=AB=98=E5=B9=B6=E5=8F=91=E4=B8=8B?= =?UTF-8?q?=E7=9A=84=E7=AB=9E=E6=80=81=E6=9D=A1=E4=BB=B6=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/models.py | 2 -- judge_dispatcher/tasks.py | 54 ++++++++++++++++++++------------------ 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 6b2a4de..7ec5af8 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -20,7 +20,6 @@ class JudgeServer(models.Model): server = JudgeServer.objects.select_for_update().get(id=self.id) server.left_instance_number -= 1 server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) - print "\n ---- use", server.left_instance_number, server.workload server.save() def release_judge_instance(self): @@ -28,7 +27,6 @@ class JudgeServer(models.Model): server = JudgeServer.objects.select_for_update().get(id=self.id) server.left_instance_number += 1 server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) - print "\n ---- release", server.left_instance_number, server.workload server.save() class Meta: diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index f9920e6..244c907 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -26,7 +26,6 @@ class JudgeDispatcher(object): self.memory_limit = memory_limit self.test_case_id = test_case_id self.user = User.objects.get(id=submission.user_id) - print "init init" def choose_judge_server(self): servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload") @@ -35,24 +34,23 @@ class JudgeDispatcher(object): def judge(self, is_waiting_task=False): self.submission.judge_start_time = int(time.time() * 1000) - judge_server = self.choose_judge_server() - # 如果没有合适的判题服务器,就放入等待队列中等待判题 - if not judge_server: - print self.submission.id, "waiting" - JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, - memory_limit=self.memory_limit, test_case_id=self.test_case_id) - return + with transaction.atomic(): + judge_server = self.choose_judge_server() - judge_server.use_judge_instance() + # 如果没有合适的判题服务器,就放入等待队列中等待判题 + if not judge_server: + JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, + memory_limit=self.memory_limit, test_case_id=self.test_case_id) + return + + judge_server.use_judge_instance() try: s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20) data = s.run(judge_server.token, self.submission.id, self.submission.language, self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) - print self.submission.id, "finished" - # 编译错误 if data["code"] == 1: self.submission.result = result["compile_error"] @@ -69,7 +67,8 @@ class JudgeDispatcher(object): self.submission.result = result["system_error"] self.submission.info = str(e) finally: - judge_server.release_judge_instance() + with transaction.atomic(): + judge_server.release_judge_instance() self.submission.judge_end_time = int(time.time() * 1000) self.submission.save() @@ -79,17 +78,20 @@ class JudgeDispatcher(object): else: self.update_problem_status() - waiting_submissions = JudgeWaitingQueue.objects.all() - if waiting_submissions.exists(): - # 防止循环依赖 - from submission.tasks import _judge - waiting_submission = waiting_submissions.first() - print self.submission.id, "left queue" - _judge(Submission.objects.get(id=waiting_submission.submission_id), time_limit=waiting_submission.time_limit, - memory_limit=waiting_submission.memory_limit, test_case_id=waiting_submission.test_case_id, - is_waiting_task=True) + with transaction.atomic(): + waiting_submissions = JudgeWaitingQueue.objects.select_for_update().all() + if waiting_submissions.exists(): + # 防止循环依赖 + from submission.tasks import _judge - waiting_submission.delete() + waiting_submission = waiting_submissions.first() + + submission = Submission.objects.get(id=waiting_submission.submission_id) + waiting_submission.delete() + + _judge(submission, time_limit=waiting_submission.time_limit, + memory_limit=waiting_submission.memory_limit, test_case_id=waiting_submission.test_case_id, + is_waiting_task=True) def update_problem_status(self): problem = Problem.objects.get(id=self.submission.problem_id) @@ -116,10 +118,12 @@ class JudgeDispatcher(object): if contest.status != CONTEST_UNDERWAY: logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + self.submission.id) return - contest_problem = ContestProblem.objects.get(contest=contest, id=self.submission.problem_id) + with transaction.atomic(): + contest_problem = ContestProblem.objects.select_for_update().get(contest=contest, id=self.submission.problem_id) - contest_problem.add_submission_number() + contest_problem.add_submission_number() + # todo 事务 problems_status = self.user.problems_status if "contest_problems" not in problems_status: problems_status["contest_problems"] = {} @@ -139,7 +143,7 @@ class JudgeDispatcher(object): with transaction.atomic(): try: - contest_rank = ContestRank.objects.get(contest=contest, user=self.user) + contest_rank = ContestRank.objects.select_for_update().get(contest=contest, user=self.user) contest_rank.update_rank(self.submission) except ContestRank.DoesNotExist: ContestRank.objects.create(contest=contest, user=self.user).update_rank(self.submission)