SpringCloud+Tornado基于jwt实现请求安全校验功能

时间:2021-05-20

项目背景

在实际项目中,Tornado项目作为一个微服务纳入SpringCloud体系,该过程中涉及到Tornado与Spring体系的安全验证,也就是权限调用校验,在该项目中Tornado是通过SpringCloud中的Feign调用的,经过一系列实验,最后选用jwt来实现这个权限效验的过程。

实现思路

用户进行登陆认证(后台微服务),认证成功后调用Tornado项目的认证接口生成token,该值返回到后台微服务保存在会话中,下一次请求时带上该token值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。

项目结构

common - authServer.py是认证接口common - basicServer.py是示例接口handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出utils - jwtUtils.py是生成与校验token的utils - responseUtils.py是返回结果工具类

具体实现

jwtUtils.py

# -*- coding:utf-8 -*-import jwtimport datetimefrom jwt import exceptionsfrom utils.responseUtils import JsonUtilsJWT_SALT = '1qazxdr5'def create_token(payload, timeout=12): """ 创建token :param payload: 例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息 :param timeout: token的过期时间,默认20分钟 :return: """ headers = { 'typ': 'jwt', 'alg': 'HS256' } payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout) result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8') return resultdef parse_payload(token): """ 对token进行校验并获取payload :param token: :return: """ try: verified_payload = jwt.decode(token, JWT_SALT, True) print(verified_payload) return JsonUtils.success('认证通过') except exceptions.ExpiredSignatureError: return JsonUtils.noAuth('token已失效') except jwt.DecodeError: return JsonUtils.noAuth('token认证失败') except jwt.InvalidTokenError: return JsonUtils.noAuth('非法的token')

baseHandler.py

# -*- coding:utf-8 -*-import functoolsimport jsonimport tornado.webfrom utils import jwtUtilsfrom utils.responseUtils import JsonUtils# 方式一:authenticated 装饰器def authenticated(method): @functools.wraps(method) def wrapper(self, *args, **kwargs): """ 这里调用的是 current_user 的 get 方法(property装饰), """ # 通过token请求头传递token head = self.request.headers token = head.get("token", "") if not token: self.write(JsonUtils.noAuth("未获取到token请求头")) self.set_header('Content-Type', 'application/json') return result = json.loads(jwtUtils.parse_payload(token)) # 将json解码 print(result) token_msg = json.dumps(result) if result['sta'] != '00': self.write(token_msg) self.set_header('Content-Type', 'application/json') return return method(self, *args, **kwargs) return wrapper# 方式二:进行预设 继承tornado的RequestHandlerclass BaseHandler(tornado.web.RequestHandler): def prepare(self): super(BaseHandler, self).prepare() def set_default_headers(self): super().set_default_headers()# 进行token校验,继承上面的BaseHandlerclass TokenHandler(BaseHandler): def prepare(self): # 通过token请求头传递token head = self.request.headers token = head.get("token","") if not token: self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头")) result = json.loads(jwtUtils.parse_payload(token)) # 将json解码 print(result) if result['sta'] != '00': self.isAuth = False else: self.isAuth = True self.authMsg = json.dumps(result)

authServer.py

import tornado.webfrom utils import jwtUtilsfrom utils.responseUtils import JsonUtilsclass authHandler(tornado.web.RequestHandler): def post(self, *args, **kwargs): """ 安全认证接口 :param args: :param kwargs: :return: """ username = self.get_argument("username") print("authHandler:" + username) if not username: self.write(JsonUtils.error("参数异常")) else: token = jwtUtils.create_token({"username": username}) print("token:" + token) self.write(JsonUtils.success(token)) self.set_header('Content-Type', 'application/json')

basicServer.py

import tornado.webimport jsonfrom pandas.core.frame import DataFramefrom handlers import baseHandlerfrom utils.responseUtils import JsonUtilsfrom handlers.baseHandler import authenticatedclass StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler): """ *** TokenHandler验证,对应baseHandler.py中的方式二 *** """ def get(self): username = self.get_argument('username', 'Hello') # 权限认证通过 if self.isAuth: self.write(JsonUtils.success(username)) else: self.write(self.authMsg)) self.set_header('Content-Type', 'application/json')class TestHandler(tornado.web.RequestHandler): """ *** authenticated验证,对应baseHandler.py中的方式一 *** """ @authenticated def post(self): username = self.get_argument('username', 'Hello') self.write(JsonUtils.success(username)) self.set_header('Content-Type', 'application/json')

responseUtils.py

from tornado.escape import json_encode, utf8class JsonUtils(object): @staticmethod def success(response): """ 正确返回 :param response: 返回结果 :return: string, {"message": "ok", "sta": "00", "data": } """ return json_encode({"message": "ok", "sta": "00", "data": response}) @staticmethod def info(message): """ 提示返回 :param message: 提示信息 :return: string, """ return json_encode({"message": str(message), "sta": "99001", "data": None}) @staticmethod def error(message): """ 错误返回 :param message: 错误信息 :return: string, """ return json_encode({"message": str(message), "sta": "9999", "data": None}) @staticmethod def noAuth(message): """ 无权限返回 :param message: 错误信息 :return: string, """ return json_encode({"message": str(message), "sta": "403", "data": None})

下面是一些调用的结果图示:



.end

到此这篇关于SpringCloud+Tornado基于jwt实现请求安全校验的文章就介绍到这了,更多相关SpringCloud+Tornado实现请求安全校验内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。

相关文章