认证组件
utils.auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class MyAuthentication(BaseAuthentication):
def authenticate(self, request):
raise AuthenticationFailed("认证失败了")
#settings.py
"""
进行drf的配置
"""
REST_FRAMEWORK = {
#认证代码的路径
"DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
}
#views.py
from utils.auth import MyAuthentication
class User(APIView):
#引用一下
authentication_classes = [MyAuthentication]
def get(self, request):
# 得校验一下
token = request.query_params.get("token")
if not token:
return Response({"status": False, "data": "请提供token"})
user_object = models.UserInfo.objects.filter(token=token).first()
if not user_object:
return Response({"status": False, "data": "非法token"})
return Response({"status": True, "data": [1, 2, 3]})
效果:
正儿八经的用法
#views.py
class User(APIView):
def get(self, request):
print(request.user)
print(request.auth)
return Response({"status": True, "data": [1, 2, 3]})
#auth.py
from app01 import models
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class MyAuthentication(BaseAuthentication):
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
token = request.query_params.get("token")
if not token:
raise AuthenticationFailed({"status": 0, "msg": "认证失败"})
user_object = models.UserInfo.objects.filter(token=token).first()
if not user_object:
raise AuthenticationFailed({"status": 0, "msg": "token无效"})
return user_object, token # request.user request.auth
这里因为在settings.py中写了认证代码的路径,所以当不需要认证时:
authentication_classes = []
比如你的登录,注册功能。
多个认证类
authentication_classes = [class_a, class_b, class_c]
#如果a成功了就会终止,失败了就继续执行b。依次执行,
#如果都失败了,可以自定义
REST_FRAMEWORK = {
#认证代码的路径
"DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
"UNAUTHENTICATED_USER": None,
"UNAUTHENTICATED_TOKEN": None,
}
#也可以是函数:
REST_FRAMEWORK = {
#认证代码的路径
"DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
"UNAUTHENTICATED_USER": lambda:"123",
"UNAUTHENTICATED_TOKEN": lambda:"qwe",
}
理论依据:
简单的流程分析
class APISettings:
def __getattr__(self, attr):
if attr not in self.defaults:
raise AttributeError("Invalid API setting: '%s'" % attr)
if attr in self.import_strings:
#读取配置文件
val = perform_import(val, attr)
self._cached_attrs.add(attr)
setattr(self, attr, val)
return val
class APIView(View):
#找到api_settings,其实相当于从settings.py中找
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
def initialize_request(self, request, *args, **kwargs):
parser_context = self.get_parser_context(request)
return Request(
#就是认证类的列表
authenticators=self.get_authenticators(),
)
def initial(self, request, *args, **kwargs):
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
def dispatch(self, request, *args, **kwargs):
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
self.initial(request, *args, **kwargs)
# 找到对应的方法
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
response = handler(request, *args, **kwargs)
except Exception as exc:
response = self.handle_exception(exc)
self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response
class User(APIView):
authentication_classes = [MyAuthentication]
def get(self, request):
return Response({"status": True, "data": [1, 2, 3]})
class Request:
def __init__(self, request, parsers=None, authenticators=None,
negotiator=None, parser_context=None):
assert isinstance(request, HttpRequest), (
'The `request` argument must be an instance of '
'`django.http.HttpRequest`, not `{}.{}`.'
.format(request.__class__.__module__, request.__class__.__name__)
)
self._request = request
self.parsers = parsers or ()
self.authenticators = authenticators or ()
self.negotiator = negotiator or self._default_negotiator()
self.parser_context = parser_context
self._data = Empty
self._files = Empty
self._full_data = Empty
self._content_type = Empty
self._stream = Empty
@property
def user(self):
if not hasattr(self, '_user'):
with wrap_attributeerrors():
self._authenticate()
return self._user
@user.setter
def user(self, value):
self._user = value
self._request.user = value
@property
def auth(self):
if not hasattr(self, '_auth'):
with wrap_attributeerrors():
self._authenticate()
return self._auth
@auth.setter
def auth(self, value):
self._auth = value
self._request.auth = value
#user调用
def _authenticate(self):
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
def initial(self, request, *args, **kwargs):
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
def dispatch(self, request, *args, **kwargs):
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
self.initial(request, *args, **kwargs)
# Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
response = handler(request, *args, **kwargs)
except Exception as exc:
response = self.handle_exception(exc)
self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response
权限组件
from rest_framework.permissions import BasePermission
#在视图函数之前应用
class MyPerMission(BasePermission):
def has_permission(self, request, view):
return True
#添加配置,或者导包
REST_FRAMEWORK = {
# 认证代码的路径
"DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
"UNAUTHENTICATED_USER": None,
"UNAUTHENTICATED_TOKEN": None,
"DEFAULT_PERMISSION_CLASSES": ["utils.permission.MyPermission", ],
}
重新写表
from django.db import models
class UserInfo(models.Model):
username = models.CharField(verbose_name="用户名", max_length=32)
password = models.CharField(verbose_name="密码", max_length=32)
level = models.IntegerField(verbose_name="级别", choices=[("1", "管理员"), ("2", "用户")], default=1)
token = models.CharField(verbose_name="token", max_length=32, null=True, blank=True)
手动添加普通用户
这里我为了方便,把配置注释了。手动添加
效果:
源码
#initial方法调用
#同时意味着我们可以自定义message,code的具体值
def permission_denied(self, request, message=None, code=None):
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message, code=code)
def get_permissions(self):
return [permission() for permission in self.permission_classes]
def check_permissions(self, request):
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
应用场景
常见2:多个权限组件时,必须保证都具备权限才行。 3个权限组件 【必须管理员】【必须是北京用户】【年龄必须对于18】
这里是且的关系
class User1(APIVIew):
xx = [【必须管理员】]
class User2(APIVIew):
xx = [【必须管理员】, 【必须是北京用户】]
class User3(APIVIew):
xx = [【必须管理员】【必须是北京用户】【年龄必须对于18】]
总结:多个组件时且的关系
常见3:扩展源码 (多个组件的或关系)
class User(APIView):
permission_classes = [【必须管理员】, 【必须是北京用户】]
def check_permissions(self, request):
for permission in self.get_permissions():
if permission.has_permission(request, self):
return
self.permission_denied(request, message="无权访问")
限流组件
文件位置:
from rest_framework.throttling import BaseThrottle
class MyThrottle(BaseThrottle):
def allow_request(self, request, view):
return False
可选择的基类
认证限流权限
限流配置的例子:
throttle.py
from rest_framework.throttling import SimpleRateThrottle
class MyThrottle(SimpleRateThrottle):
scope = "user"
def get_cache_key(self, request, view):
return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
"DEFAULT_THROTTLE_CLASSES": ["utils.throttle.MyThrottle", ],
'DEFAULT_THROTTLE_RATES': {
'user': "5/hour", # 3600s内访问5次
'anon': None,
},
版本
参数
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.QueryParameterVersioning",
效果:
以路由的形式:
path('<str:version>/user/', views.User.as_view()),
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.URLPathVersioning",
class User(APIView):
def get(self, request, *args, **kwargs):
请求头
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.AcceptHeaderVersioning",
path('user/', views.User.as_view()),
class User(APIView):
def get(self, request):
print("请求正常执行")
print(request.version)
源码
def determine_version(self, request, *args, **kwargs):
if self.versioning_class is None:
return (None, None)
scheme = self.versioning_class()
return (scheme.determine_version(request, *args, **kwargs), scheme)
进行判断: