import os from datetime import timedelta from importlib import import_module import qrcode from django.conf import settings from django.contrib import auth from django.core.cache import cache from django.db.models import Count, Q from django.template.loader import render_to_string from django.utils import timezone from django.utils.decorators import method_decorator from django.utils.timezone import now from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie from otpauth import TOTP from options.options import SysOptions from problem.models import Problem from submission.models import JudgeStatus, Submission from utils.api import APIView, CSRFExemptAPIView, validate_serializer from utils.captcha import Captcha from utils.constants import CacheKey, ContestRuleType from utils.shortcuts import datetime2str, img2base64, rand_str from ..decorators import login_required from ..models import AdminType, User, UserProfile from ..serializers import ( ApplyResetPasswordSerializer, EditUserProfileSerializer, ImageUploadForm, RankInfoSerializer, ResetPasswordSerializer, SSOSerializer, TwoFactorAuthCodeSerializer, UserChangeEmailSerializer, UserChangePasswordSerializer, UserLoginSerializer, UsernameOrEmailCheckSerializer, UserProfileSerializer, UserRegisterSerializer, ) from ..tasks import send_email_async def _totp(token): return TOTP(token.encode("utf-8")) def _totp_uri(token, label, issuer): return _totp(token).to_uri(label, issuer) def _valid_totp(token, code): try: code = int(code) except (TypeError, ValueError): return False return _totp(token).verify(code) class UserProfileAPI(APIView): @method_decorator(ensure_csrf_cookie) def get(self, request, **kwargs): """ 判断是否登录, 若登录返回用户信息 """ user = request.user if not user.is_authenticated: return self.success() show_real_name = False username = request.GET.get("username") try: if username: user = User.objects.get(username=username, is_disabled=False) else: user = request.user # api返回的是自己的信息,可以返real_name show_real_name = True except User.DoesNotExist: return self.error("User does not exist") return self.success( UserProfileSerializer(user.userprofile, show_real_name=show_real_name).data ) @validate_serializer(EditUserProfileSerializer) @login_required def put(self, request): data = request.data user_profile = request.user.userprofile for k, v in data.items(): setattr(user_profile, k, v) user_profile.save() return self.success( UserProfileSerializer(user_profile, show_real_name=True).data ) class Metrics(APIView): def get(self, request): userid = request.GET.get("userid") submissions = Submission.objects.filter(user_id=userid, contest_id__isnull=True) if submissions.count() == 0: return self.error("暂无提交") else: latest_submission = submissions.first() last_submission = submissions.last() if last_submission and latest_submission: return self.success( { "now": datetime2str(timezone.now()), "latest": datetime2str(latest_submission.create_time), "first": datetime2str(last_submission.create_time), } ) else: return self.error("暂无提交") class AvatarUploadAPI(APIView): request_parsers = () @login_required def post(self, request): form = ImageUploadForm(request.POST, request.FILES) if form.is_valid(): avatar = form.cleaned_data["image"] else: return self.error("Invalid file content") if avatar.size > 2 * 1024 * 1024: return self.error("Picture is too large") suffix = os.path.splitext(avatar.name)[-1].lower() if suffix not in [".gif", ".jpg", ".jpeg", ".bmp", ".png"]: return self.error("Unsupported file format") name = rand_str(10) + suffix with open(os.path.join(settings.AVATAR_UPLOAD_DIR, name), "wb") as img: for chunk in avatar: img.write(chunk) user_profile = request.user.userprofile user_profile.avatar = f"{settings.AVATAR_URI_PREFIX}/{name}" user_profile.save() return self.success("Succeeded") class TwoFactorAuthAPI(APIView): @login_required def get(self, request): """ Get QR code """ user = request.user if user.two_factor_auth: return self.error("2FA is already turned on") token = rand_str() user.tfa_token = token user.save() label = f"{SysOptions.website_name_shortcut}:{user.username}" image = qrcode.make( _totp_uri(token, label, SysOptions.website_name.replace(" ", "")) ) return self.success(img2base64(image)) @login_required @validate_serializer(TwoFactorAuthCodeSerializer) def post(self, request): """ Open 2FA """ code = request.data["code"] user = request.user if _valid_totp(user.tfa_token, code): user.two_factor_auth = True user.save() return self.success("Succeeded") else: return self.error("Invalid code") @login_required @validate_serializer(TwoFactorAuthCodeSerializer) def put(self, request): code = request.data["code"] user = request.user if not user.two_factor_auth: return self.error("2FA is already turned off") if _valid_totp(user.tfa_token, code): user.two_factor_auth = False user.save() return self.success("Succeeded") else: return self.error("Invalid code") class CheckTFARequiredAPI(APIView): @validate_serializer(UsernameOrEmailCheckSerializer) def post(self, request): """ Check TFA is required """ data = request.data result = False if data.get("username"): try: user = User.objects.get(username=data["username"]) result = user.two_factor_auth except User.DoesNotExist: pass return self.success({"result": result}) class UserLoginAPI(APIView): @validate_serializer(UserLoginSerializer) def post(self, request): """ User login api """ data = request.data user = auth.authenticate(username=data["username"], password=data["password"]) # None is returned if username or password is wrong if user: if user.is_disabled: return self.error("Your account has been disabled") if not user.two_factor_auth: prev_login = user.last_login auth.login(request, user) request.session["prev_login"] = ( datetime2str(prev_login) if prev_login else "" ) return self.success("Succeeded") # `tfa_code` not in post data if user.two_factor_auth and "tfa_code" not in data: return self.error("tfa_required") if _valid_totp(user.tfa_token, data["tfa_code"]): prev_login = user.last_login auth.login(request, user) request.session["prev_login"] = ( datetime2str(prev_login) if prev_login else "" ) return self.success("Succeeded") else: return self.error("Invalid two factor verification code") else: return self.error("Invalid username or password") class UserLogoutAPI(APIView): def get(self, request): auth.logout(request) return self.success() class UsernameOrEmailCheck(APIView): @validate_serializer(UsernameOrEmailCheckSerializer) def post(self, request): """ check username or email is duplicate """ data = request.data # True means already exist. result = {"username": False, "email": False} if data.get("username"): result["username"] = User.objects.filter( username=data["username"].lower() ).exists() if data.get("email"): result["email"] = User.objects.filter(email=data["email"].lower()).exists() return self.success(result) class UserRegisterAPI(APIView): @validate_serializer(UserRegisterSerializer) def post(self, request): """ User register api """ if not SysOptions.allow_register: return self.error("Register function has been disabled by admin") data = request.data data["username"] = data["username"].lower() data["email"] = data["email"].lower() captcha = Captcha(request) if not captcha.check(data["captcha"]): return self.error("Invalid captcha") if User.objects.filter(username=data["username"]).exists(): return self.error("Username already exists") if User.objects.filter(email=data["email"]).exists(): return self.error("Email already exists") user = User.objects.create(username=data["username"], email=data["email"]) user.set_password(data["password"]) user.save() UserProfile.objects.create(user=user) return self.success("Succeeded") class UserChangeEmailAPI(APIView): @validate_serializer(UserChangeEmailSerializer) @login_required def post(self, request): data = request.data user = auth.authenticate( username=request.user.username, password=data["password"] ) if user: if user.two_factor_auth: if "tfa_code" not in data: return self.error("tfa_required") if not _valid_totp(user.tfa_token, data["tfa_code"]): return self.error("Invalid two factor verification code") data["new_email"] = data["new_email"].lower() if User.objects.filter(email=data["new_email"]).exists(): return self.error("The email is owned by other account") user.email = data["new_email"] user.save() return self.success("Succeeded") else: return self.error("Wrong password") class UserChangePasswordAPI(APIView): @validate_serializer(UserChangePasswordSerializer) @login_required def post(self, request): """ User change password api """ data = request.data username = request.user.username user = auth.authenticate(username=username, password=data["old_password"]) if user: if user.two_factor_auth: if "tfa_code" not in data: return self.error("tfa_required") if not _valid_totp(user.tfa_token, data["tfa_code"]): return self.error("Invalid two factor verification code") user.set_password(data["new_password"]) user.save() return self.success("Succeeded") else: return self.error("Invalid old password") class ApplyResetPasswordAPI(APIView): @validate_serializer(ApplyResetPasswordSerializer) def post(self, request): if request.user.is_authenticated: return self.error("You have already logged in, are you kidding me? ") data = request.data captcha = Captcha(request) if not captcha.check(data["captcha"]): return self.error("Invalid captcha") try: user = User.objects.get(email__iexact=data["email"]) except User.DoesNotExist: return self.error("User does not exist") if ( user.reset_password_token_expire_time and 0 < int((user.reset_password_token_expire_time - now()).total_seconds()) < 20 * 60 ): return self.error("You can only reset password once per 20 minutes") user.reset_password_token = rand_str() user.reset_password_token_expire_time = now() + timedelta(minutes=20) user.save() render_data = { "username": user.username, "website_name": SysOptions.website_name, "link": f"{SysOptions.website_base_url}/reset-password/{user.reset_password_token}", } email_html = render_to_string("reset_password_email.html", render_data) send_email_async.send( from_name=SysOptions.website_name_shortcut, to_email=user.email, to_name=user.username, subject="Reset your password", content=email_html, ) return self.success("Succeeded") class ResetPasswordAPI(APIView): @validate_serializer(ResetPasswordSerializer) def post(self, request): data = request.data captcha = Captcha(request) if not captcha.check(data["captcha"]): return self.error("Invalid captcha") try: user = User.objects.get(reset_password_token=data["token"]) except User.DoesNotExist: return self.error("Token does not exist") if user.reset_password_token_expire_time < now(): return self.error("Token has expired") user.reset_password_token = None user.two_factor_auth = False user.set_password(data["password"]) user.save() return self.success("Succeeded") class SessionManagementAPI(APIView): @login_required def get(self, request): engine = import_module(settings.SESSION_ENGINE) session_store = engine.SessionStore current_session = request.session.session_key session_keys = request.user.session_keys result = [] modified = False for key in session_keys[:]: session = session_store(key) # session does not exist or is expiry if not session._session: session_keys.remove(key) modified = True continue s = {} if current_session == key: s["current_session"] = True s["ip"] = session["ip"] s["user_agent"] = session["user_agent"] s["last_activity"] = datetime2str(session["last_activity"]) s["session_key"] = key result.append(s) if modified: request.user.save() return self.success(result) @login_required def delete(self, request): session_key = request.GET.get("session_key") if not session_key: return self.error("Parameter Error") request.session.delete(session_key) if session_key in request.user.session_keys: request.user.session_keys.remove(session_key) request.user.save() return self.success("Succeeded") else: return self.error("Invalid session_key") class UserRankAPI(APIView): def get(self, request): rule_type = request.GET.get("rule") username = request.GET.get("username", "") try: n = int(request.GET.get("n", "0")) except ValueError: n = 0 if rule_type not in ContestRuleType.choices(): rule_type = ContestRuleType.ACM profiles = UserProfile.objects.filter( user__admin_type__in=[AdminType.REGULAR_USER, AdminType.ADMIN], user__is_disabled=False, user__username__icontains=username, ).select_related("user") if rule_type == ContestRuleType.ACM: profiles = profiles.filter(accepted_number__gte=0).order_by( "-accepted_number", "submission_number" ) else: profiles = profiles.filter(total_score__gt=0).order_by("-total_score") if n > 0: profiles = profiles[:n] return self.success(self.paginate_data(request, profiles, RankInfoSerializer)) class UserActivityRankAPI(APIView): def get(self, request): start = request.GET.get("start") if not start: return self.error("start time is required") cache_key = f"{CacheKey.user_activity_rank}:{start}" cached = cache.get(cache_key) if cached is not None: return self.success(cached) hidden_names = User.objects.filter( Q(admin_type=AdminType.SUPER_ADMIN) | Q(is_disabled=True) ).values_list("username", flat=True) submissions = Submission.objects.filter( contest_id__isnull=True, create_time__gte=start, result=JudgeStatus.ACCEPTED, ).exclude(username__in=hidden_names) data = list( submissions.values("username") .annotate(count=Count("problem_id", distinct=True)) .order_by("-count")[:10] ) cache.set(cache_key, data, 600) return self.success(data) class UserProblemRankAPI(APIView): def get(self, request): problem_id = request.GET.get("problem_id") user = request.user if not user.is_authenticated: return self.error("User is not authenticated") problem = Problem.objects.get( _id=problem_id, contest_id__isnull=True, visible=True ) submissions = Submission.objects.filter( problem=problem, result=JudgeStatus.ACCEPTED ) all_ac_count = submissions.values("user_id").distinct().count() class_name = user.class_name or "" class_ac_count = 0 if class_name: users = User.objects.filter( class_name=user.class_name, is_disabled=False ).values_list("id", flat=True) user_ids = list(users) submissions = submissions.filter(user_id__in=user_ids) class_ac_count = submissions.values("user_id").distinct().count() my_submissions = submissions.filter(user_id=user.id) if len(my_submissions) == 0: return self.success( { "class_name": class_name, "rank": -1, "class_ac_count": class_ac_count, "all_ac_count": all_ac_count, } ) my_first_submission = my_submissions.order_by("create_time").first() rank = submissions.filter( create_time__lte=my_first_submission.create_time ).count() return self.success( { "class_name": class_name, "rank": rank, "class_ac_count": class_ac_count, "all_ac_count": all_ac_count, } ) class ProfileProblemDisplayIDRefreshAPI(APIView): @login_required def get(self, request): profile = request.user.userprofile acm_problems = profile.acm_problems_status.get("problems", {}) oi_problems = profile.oi_problems_status.get("problems", {}) ids = list(acm_problems.keys()) + list(oi_problems.keys()) if not ids: return self.success() display_ids = Problem.objects.filter(id__in=ids, visible=True).values_list( "_id", flat=True ) id_map = dict(zip(ids, display_ids)) for k, v in acm_problems.items(): v["_id"] = id_map[k] for k, v in oi_problems.items(): v["_id"] = id_map[k] profile.save(update_fields=["acm_problems_status", "oi_problems_status"]) return self.success() class OpenAPIAppkeyAPI(APIView): @login_required def post(self, request): user = request.user if not user.open_api: return self.error("OpenAPI function is truned off for you") api_appkey = rand_str() user.open_api_appkey = api_appkey user.save() return self.success({"appkey": api_appkey}) class SSOAPI(CSRFExemptAPIView): @login_required def get(self, request): token = rand_str() request.user.auth_token = token request.user.save() return self.success({"token": token}) @method_decorator(csrf_exempt) @validate_serializer(SSOSerializer) def post(self, request): try: user = User.objects.get(auth_token=request.data["token"]) except User.DoesNotExist: return self.error("User does not exist") return self.success( { "username": user.username, "avatar": user.userprofile.avatar, "admin_type": user.admin_type, } )