修改账户系统以及部分用户权限写法

增加部分测试和注释,完善国际化
This commit is contained in:
virusdefender
2016-06-23 12:19:16 +08:00
parent 61fe5675e0
commit eb02a00859
13 changed files with 555 additions and 204 deletions

View File

@@ -3,6 +3,7 @@ import os
import codecs
import qrcode
import StringIO
from django import http
from django.contrib import auth
from django.shortcuts import render
@@ -12,62 +13,209 @@ from django.conf import settings
from django.http import HttpResponse
from django.core.exceptions import MultipleObjectsReturned
from django.utils.timezone import now
from django.utils.translation import ugettext as _
from rest_framework.views import APIView
from rest_framework.response import Response
from utils.shortcuts import (serializer_invalid_response, error_response,
success_response, error_page, paginate, rand_str)
from utils.captcha import Captcha
from utils.otp_auth import OtpAuth
from .tasks import _send_email
from .decorators import login_required
from .models import User, UserProfile
from .models import User, UserProfile, AdminExtraPermission, AdminType
from .serializers import (UserLoginSerializer, UserRegisterSerializer,
UserChangePasswordSerializer,
UserSerializer, EditUserSerializer,
ApplyResetPasswordSerializer, ResetPasswordSerializer,
SSOSerializer, EditUserProfileSerializer,
UserProfileSerializer, TwoFactorAuthCodeSerializer)
TwoFactorAuthCodeSerializer)
from .decorators import super_admin_required
class UserLoginAPIView(APIView):
def post(self, request):
"""
用户登录json api接口
---
request_serializer: UserLoginSerializer
User login api
"""
serializer = UserLoginSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
user = auth.authenticate(username=data["username"], password=data["password"])
# 用户名或密码错误的话 返回None
# None is returned if username or password is wrong
if user:
if not user.two_factor_auth:
auth.login(request, user)
return success_response(u"登录成功")
return success_response(_("Succeeded"))
# 没有输入两步验证的验证码
# `tfa_code` not in post data
if user.two_factor_auth and "tfa_code" not in data:
return success_response("tfa_required")
if OtpAuth(user.tfa_token).valid_totp(data["tfa_code"]):
auth.login(request, user)
return success_response(u"登录成功")
return success_response(_("Succeeded"))
else:
return error_response(u"验证码错误")
return error_response(_("Invalid two factor verification code"))
else:
return error_response(u"用户名或密码错误")
return error_response(_("Invalid username or password"))
else:
return serializer_invalid_response(serializer)
#@login_required
class UserRegisterAPIView(APIView):
def post(self, request):
"""
User register api
"""
serializer = UserRegisterSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(_("Invalid captcha"))
try:
User.objects.get(username=data["username"])
return error_response(_("Username already exists"))
except User.DoesNotExist:
pass
try:
User.objects.get(email=data["email"])
return error_response(_("Email already exists"))
# Some old data has duplicate email
except MultipleObjectsReturned:
return error_response(_("Email already exists"))
except User.DoesNotExist:
user = User.objects.create(username=data["username"], real_name=data["real_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
UserProfile.objects.create(user=user, school=data["school"], student_id=data["student_id"])
return success_response(_("Succeeded"))
else:
return serializer_invalid_response(serializer)
class UserChangePasswordAPIView(APIView):
@login_required
def post(self, request):
"""
User change password api
"""
serializer = UserChangePasswordSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(_("Invalid captcha"))
username = request.user.username
user = auth.authenticate(username=username, password=data["old_password"])
if user:
user.set_password(data["new_password"])
user.save()
return success_response(_("Succeeded"))
else:
return error_response(_("Invalid old password"))
else:
return serializer_invalid_response(serializer)
class UserAdminAPIView(APIView):
@super_admin_required
def put(self, request):
"""
Edit user api
"""
serializer = EditUserSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
try:
user = User.objects.get(id=data["id"])
except User.DoesNotExist:
return error_response(_("User does not exist"))
try:
user = User.objects.get(username=data["username"])
if user.id != data["id"]:
return error_response(_("Username already exists"))
except User.DoesNotExist:
pass
try:
user = User.objects.get(email=data["email"])
if user.id != data["id"]:
return error_response(_("Email already exists"))
# Some old data has duplicate email
except MultipleObjectsReturned:
return error_response(_("Email already exists"))
except User.DoesNotExist:
pass
user.username = data["username"]
user.real_name = data["real_name"]
user.email = data["email"]
user.admin_type = data["admin_type"]
user.is_disabled = data["is_disabled"]
if data["password"]:
user.set_password(data["password"])
if data["open_api"]:
# Avoid reset user appkey after saving changes
if not user.open_api:
user.open_api_appkey = rand_str()
else:
user.open_api_appkey = None
user.open_api = data["open_api"]
if data["two_factor_auth"]:
# Avoid reset user tfa_token after saving changes
if not user.two_factor_auth:
user.tfa_token = rand_str()
else:
user.tfa_token = None
user.two_factor_auth = data["two_factor_auth"]
if data["admin_type"] == AdminType.ADMIN:
user.admin_extra_permission = list(set(data["admin_extra_permission"]))
else:
user.admin_extra_permission = []
user.save()
return success_response(UserSerializer(user).data)
else:
return serializer_invalid_response(serializer)
@super_admin_required
def get(self, request):
"""
User list api / Get user by id
"""
user_id = request.GET.get("user_id")
if user_id:
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return error_response(_("User does not exist"))
return success_response(UserSerializer(user).data)
user = User.objects.all().order_by("-create_time")
admin_type = request.GET.get("admin_type", None)
if admin_type:
try:
user = user.filter(admin_type__gte=int(admin_type))
except ValueError:
return error_response(_("Invalid parameter"))
keyword = request.GET.get("keyword", None)
if keyword:
user = user.filter(Q(username__contains=keyword) |
Q(real_name__contains=keyword) |
Q(email__contains=keyword))
return paginate(request, user, UserSerializer)
def logout(request):
auth.logout(request)
return http.HttpResponseRedirect("/")
@@ -83,67 +231,6 @@ def index_page(request):
return http.HttpResponseRedirect('/problems/')
class UserRegisterAPIView(APIView):
def post(self, request):
"""
用户注册json api接口
---
request_serializer: UserRegisterSerializer
"""
serializer = UserRegisterSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(u"验证码错误")
try:
User.objects.get(username=data["username"])
return error_response(u"用户名已存在")
except User.DoesNotExist:
pass
try:
User.objects.get(email=data["email"])
return error_response(u"该邮箱已被注册,请换其他邮箱进行注册")
# 兼容部分老数据,有邮箱重复的
except MultipleObjectsReturned:
return error_response(u"该邮箱已被注册,请换其他邮箱进行注册")
except User.DoesNotExist:
user = User.objects.create(username=data["username"], real_name=data["real_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
UserProfile.objects.create(user=user, school=data["school"], student_id=data["student_id"])
return success_response(u"注册成功!")
else:
return serializer_invalid_response(serializer)
class UserChangePasswordAPIView(APIView):
@login_required
def post(self, request):
"""
用户修改密码json api接口
---
request_serializer: UserChangePasswordSerializer
"""
serializer = UserChangePasswordSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(u"验证码错误")
username = request.user.username
user = auth.authenticate(username=username, password=data["old_password"])
if user:
user.set_password(data["new_password"])
user.save()
return success_response(u"用户密码修改成功!")
else:
return error_response(u"密码不正确,请重新修改!")
else:
return serializer_invalid_response(serializer)
class UsernameCheckAPIView(APIView):
def get(self, request):
"""
@@ -186,80 +273,6 @@ class EmailCheckAPIView(APIView):
return Response(status=does_not_existed)
class UserAdminAPIView(APIView):
@super_admin_required
def put(self, request):
"""
用户编辑json api接口
---
request_serializer: EditUserSerializer
response_serializer: UserSerializer
"""
serializer = EditUserSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
try:
user = User.objects.get(id=data["id"])
except User.DoesNotExist:
return error_response(u"该用户不存在!")
try:
user = User.objects.get(username=data["username"])
if user.id != data["id"]:
return error_response(u"昵称已经存在")
except User.DoesNotExist:
pass
user.username = data["username"]
user.real_name = data["real_name"]
user.email = data["email"]
user.admin_type = data["admin_type"]
if data["password"]:
user.set_password(data["password"])
# 后台控制用户是否可以使用openapi
if data["openapi"] is False:
user.openapi_appkey = None
elif data["openapi"] and user.openapi_appkey is None:
user.openapi_appkey = rand_str()
# 后台控制用户是否使用两步验证
# 注意:用户没开启,后台开启的话,用户没有绑定过两步验证token,会造成无法登陆的!
if data["tfa_auth"] is False:
user.two_factor_auth = False
elif data["tfa_auth"] and user.two_factor_auth is False:
user.two_factor_auth = True
user.tfa_token = rand_str()
# 后台控制用户是否被禁用
user.is_forbidden = data["is_forbidden"]
user.save()
return success_response(UserSerializer(user).data)
else:
return serializer_invalid_response(serializer)
@super_admin_required
def get(self, request):
"""
用户分页json api接口
---
response_serializer: UserSerializer
"""
user = User.objects.all().order_by("-create_time")
admin_type = request.GET.get("admin_type", None)
if admin_type:
try:
user = user.filter(admin_type__gte=int(admin_type))
except ValueError:
return error_response(u"参数错误")
keyword = request.GET.get("keyword", None)
if keyword:
user = user.filter(Q(username__contains=keyword) |
Q(real_name__contains=keyword) |
Q(email__contains=keyword))
return paginate(request, user, UserSerializer)
class UserInfoAPIView(APIView):
@login_required
def get(self, request):