This commit is contained in:
2026-06-02 23:13:06 -06:00
parent 385d04505b
commit d1fdbcf52b
6 changed files with 144 additions and 150 deletions

View File

@@ -20,27 +20,32 @@ class BasePermissionDecorator(object):
return functools.partial(self._async_call, obj)
return functools.partial(self.__call__, obj)
def error(self, data):
return JSONResponse.response({"error": "permission-denied", "data": data})
def error(self, data, err="permission-denied"):
return JSONResponse.response({"error": err, "data": data})
def _permission_error(self, request):
if not request.user.is_authenticated:
return self.error("请先登录", err="login-required")
return self.error("权限不足", err="permission-denied")
def __call__(self, *args, **kwargs):
request = args[1]
if self.check_permission(request):
if request.user.is_disabled:
return self.error("Your account is disabled")
return self.error("账号已禁用")
return self.func(*args, **kwargs)
else:
return self.error("Please login first")
return self._permission_error(request)
async def _async_call(self, *args, **kwargs):
request = args[1]
if self.check_permission(request):
if request.user.is_disabled:
return self.error("Your account is disabled")
return self.error("账号已禁用")
return await self.func(*args, **kwargs)
return self.error("Please login first")
return self._permission_error(request)
def check_permission(self, request):
raise NotImplementedError()
@@ -110,43 +115,42 @@ def check_contest_permission(check_type="details"):
若通过验证在view中可通过self.contest获得该contest
"""
def _get_contest_id(request):
return request.data.get("contest_id") or request.GET.get("contest_id")
def _check_access(self, request, user):
if not user.is_authenticated:
return self.error("请先登录", err="login-required")
if user.is_contest_admin(self.contest):
return None
if self.contest.contest_type == ContestType.PASSWORD_PROTECTED_CONTEST:
if not check_contest_password(request.session.get(CONTEST_PASSWORD_SESSION_KEY, {}).get(self.contest.id), self.contest.password):
return self.error("Wrong password or password expired")
if self.contest.status == ContestStatus.CONTEST_NOT_START and check_type != "details":
return self.error("Contest has not started yet.")
return None
def decorator(func):
def _check_permission(*args, **kwargs):
@functools.wraps(func)
async def _wrapper(*args, **kwargs):
self = args[0]
request = args[1]
user = request.user
if request.data.get("contest_id"):
contest_id = request.data["contest_id"]
else:
contest_id = request.GET.get("contest_id")
contest_id = _get_contest_id(request)
if not contest_id:
return self.error("Parameter error, contest_id is required")
try:
# use self.contest to avoid query contest again in view.
self.contest = Contest.objects.select_related("created_by").get(id=contest_id, visible=True)
self.contest = await Contest.objects.select_related("created_by").aget(id=contest_id, visible=True)
except Contest.DoesNotExist:
return self.error("Contest %s doesn't exist" % contest_id)
# Anonymous
if not user.is_authenticated:
return self.error("Please login first.")
# creator or owner
if user.is_contest_admin(self.contest):
return func(*args, **kwargs)
if self.contest.contest_type == ContestType.PASSWORD_PROTECTED_CONTEST:
# password error
if not check_contest_password(request.session.get(CONTEST_PASSWORD_SESSION_KEY, {}).get(self.contest.id), self.contest.password):
return self.error("Wrong password or password expired")
# regular user get contest problems, ranks etc. before contest started
if self.contest.status == ContestStatus.CONTEST_NOT_START and check_type != "details":
return self.error("Contest has not started yet.")
return func(*args, **kwargs)
return _check_permission
error = _check_access(self, request, request.user)
if error:
return error
return await func(*args, **kwargs)
return _wrapper
return decorator

View File

@@ -37,8 +37,10 @@ class AdminRoleRequiredMiddleware(MiddlewareMixin):
def process_request(self, request):
path = request.path_info
if path.startswith("/admin/") or path.startswith("/api/admin/"):
if not (request.user.is_authenticated and request.user.is_admin_role()):
return JSONResponse.response({"error": "login-required", "data": "Please login in first"})
if not request.user.is_authenticated:
return JSONResponse.response({"error": "login-required", "data": "请先登录"})
if not request.user.is_admin_role():
return JSONResponse.response({"error": "permission-denied", "data": "权限不足"})
class LogSqlMiddleware(MiddlewareMixin):