用户登录成功后,服务端通过jwt生成一个随机token给用户(服务端无需保留token),以后用户再来访问时需携带token,服务端接收到token之后,通过jwt对token进行校验是否超时、是否合法。
2. jwt创建token
登录时
import jwt
import datetime
from jwt import exceptions
SALT = 'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv='
def create_token():
# 构造header
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
# 构造payload
payload = {
'user_id': 1, # 自定义用户ID
'username': 'admin', # 自定义用户名
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) # 超时时间
}
result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers)
return result
if __name__ == '__main__':
token = create_token()
print(token)
再次登录
import jwt
import datetime
from jwt import exceptions
SALT = 'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv='
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6Imp3dCJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNzEzNjkwNDY2fQ.OaBE0zSXfrqNufazcIhEbOXveZdBNnlVQP6aCaetd0U"
try:
verified_payload = jwt.decode(token, SALT, algorithms="HS256")
print(verified_payload)
except jwt.DecodeError:
print('token认证失败')
jwt-drf的案例
views.py
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from app01 import models
import datetime
import jwt
from django.conf import settings
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.authentication import BaseAuthentication
class AuthView(APIView):
def post(self, request):
# {"user":"admin", "password":"admin"}
# print(request.data)
user_object = models.UserInfo.objects.filter(**request.data).first()
if not user_object:
return Response({"code": 1000, 'msg': "用户名或密码错误"})
# 生成jwt token并返回
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
# 构造payload
payload = {
'user_id': user_object.id, # 自定义用户ID
'username': user_object.user, # 自定义用户名
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) # 超时时间
}
token = jwt.encode(payload=payload, key=settings.SECRET_KEY, algorithm="HS256", headers=headers)
return Response({"code": 0, 'data': token})
class MineAuthentication(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get("token")
# 1.是否有token
if not token:
raise AuthenticationFailed("认证失败")
# 2.合法性
try:
# print(token)
verified_payload = jwt.decode(token, settings.SECRET_KEY, algorithms="HS256")
# print(verified_payload)
# 获取当前时间 vs 有效期
return (verified_payload, token)
except Exception as e:
print(e)
raise AuthenticationFailed("认证失败")
def authenticate_header(self, request):
return "API"
class MineView(APIView):
authentication_classes = [MineAuthentication]
def get(self, request):
print(request.user, request.auth)
# ...
return Response("OK")
urls.py
from django.contrib import admin
from django.urls import path
from app01 import views
urlpatterns = [
path('admin/', admin.site.urls),
path('auth/', views.AuthView.as_view()),
path('mine/', views.MineView.as_view()),
path('update/token/', views.UpdateTokenView.as_view()),
]
jwt有效期与更新
有效期
payload = {
'user_id': user_object.id, # 自定义用户ID
'username': user_object.user, # 自定义用户名
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) # 超时时间
}
更新
更新01
5分钟过期,在中间件process_response中生成新的jwt token返回响应头
...
更新02
5分钟过期,在多返回一个数据即将过期
读取返回
再次发起请求,去更新token
class UpdateTokenView(APIView):
authentication_classes = [MineAuthentication]
def get(self, request):
print(request.user, request.auth)
# ...
# 生成jwt token并返回
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
# 构造payload
payload = {
'user_id': request.user['user_id'], # 自定义用户ID
'username': request.user['username'], # 自定义用户名
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) # 超时时间
}
token = jwt.encode(payload=payload, key=settings.SECRET_KEY, algorithm="HS256", headers=headers)
return Response("OK")