Sanic with sanic_jwt
依赖环境
yml
version: '3'
services:
redis:
container_name: web_redis
image: redis:alpine
restart: always
ports:
- "127.0.0.1:6379:6379"
command: redis-server --save 20 1 --loglevel warning --requirepass REDIS_PASSWORD
volumes:
- ./__env_cache__/redis/datadir:/data
- ./__env_cache__/redis/conf:/usr/local/etc/redis
- ./__env_cache__/redis/logs:/logs
logging:
driver: "json-file"
options:
max-size: "1000k"
max-file: "20"
shm_size: 512mb
network_mode: bridge
api:
container_name: web_api
build: .
image: u20p39poetry
restart: always
volumes:
- ./project:/root/project
- ./__env_cache__:/root/.cache/
working_dir: /root/project
command: poetry run python app.py
logging:
driver: "json-file"
options:
max-size: "1000k"
max-file: "10"
network_mode: host
depends_on:
- redis
toml
[tool.poetry.dependencies]
python = "^3.9"
sanic = "^23.6.0"
sanic-jwt = "^1.8.0"
redis = "^5.0.1"
passlib = "^1.7.4"
loguru = "^0.7.2"
sanic-cors = "^2.2.0"
[[tool.poetry.source]]
name = "aliyun"
url = "http://mirrors.aliyun.com/pypi/simple"
priority = "default"
项目代码
python
from sanic import Sanic
from sanic_jwt import initialize
from handlers.auth import (
authenticate,
retrieve_refresh_token,
store_refresh_token,
retrieve_user,
auth_views,
MyResponses
)
from handlers.api import ApiData
from config import WEB_HOST, WEB_HTTP_PORT
from sanic_cors import CORS
app = Sanic(__name__)
SECRET = "KET_SECEPEPSAFEEP_IT_KE__I"
app.blueprint(ApiData)
CORS(app) # Enable CORS for all routes
initialize(
app,
secret = SECRET,
authenticate = authenticate,
url_prefix = '/apiv2/auth',
path_to_authenticate = '/login',
path_to_refresh = '/refresh',
path_to_verify = '/test',
path_to_retrieve_user = "/user",
refresh_token_enabled = True,
store_refresh_token = store_refresh_token,
retrieve_refresh_token = retrieve_refresh_token,
retrieve_user = retrieve_user,
responses_class = MyResponses,
class_views = auth_views,
)
if __name__ == '__main__':
app.run(host=WEB_HOST, port=WEB_HTTP_PORT, access_log=True, auto_reload=True)
python
from dataclasses import dataclass
@dataclass
class UserItem:
user_id: int
username: str
status: str
def to_dict(self):
return dict(
user_id=self.user_id,
username=self.username,
status=self.status
)
class AuthManager:
def __init__(self):
self.users = [{'user_id': 0, 'username': 'user', 'password': 'password', 'source': 'api', 'status': 0}]
def find_user_by_uid(self, user_id):
for item in self.users:
if item['user_id'] == user_id:
return UserItem(item['user_id'], item['username'], item['status'])
return None
def user_exists(self, username):
for item in self.users:
if item['username'] == username:
return True, self.find_user_by_uid(item['user_id'])
return False, None
def insert_user(self, username, password, source, status):
# self.users.append({'id': 0, 'username': username, 'password': password, 'source': source, 'status': status})
return True
def is_auth_match(self, username, password):
is_exists, _ = self.user_exists(username)
if is_exists:
for item in self.users:
if item['username'] == username and item['password'] == password:
return True, self.find_user_by_uid(item['user_id'])
return False, None
python
from sanic_jwt import BaseEndpoint, Responses, exceptions
from sanic.response import json as resp_json
from lib.redis_app import AppRedis
import traceback
from handlers.auth_manager import AuthManager
am = AuthManager()
app_redis = AppRedis()
async def authenticate(request, *args, **kwargs):
username = request.json.get("username", None)
password = request.json.get("password", None)
if not username or not password:
raise exceptions.AuthenticationFailed("Missing username or password.")
is_match, user = am.is_auth_match(username, password)
if not is_match:
raise exceptions.AuthenticationFailed("Auth fail.")
return user
async def store_refresh_token(user_id, refresh_token, *args, **kwargs):
r = app_redis.set_item(user_id, refresh_token)
async def retrieve_refresh_token(request, user_id, *args, **kwargs):
result = app_redis.get_item(user_id)
return result
async def retrieve_user(request, payload, *args, **kwargs):
if payload:
user_id = payload.get('user_id', None)
user = am.find_user_by_uid(user_id)
return user
else:
return None
class Register(BaseEndpoint):
async def post(self, request, *args, **kwargs):
try:
payload_data = request.json
username = payload_data.get('username')
password = payload_data.get('password')
is_exists, _ = am.user_exists(username)
if not is_exists:
am.insert_user(username, password, 'from api', 0)
resp_data = dict(code=0, message="请求已提交,请等待管理员验证", data=None)
return resp_json(resp_data, status=200)
else:
resp_data = dict(code=0, message="请勿频繁提交", data=None)
return resp_json(resp_data, status=200)
except:
traceback.print_exc()
resp_data = dict(code=-2, message="failed", data=None)
return resp_json(resp_data, status=404)
auth_views = (
('/sign', Register),
)
class MyResponses(Responses):
@staticmethod
def extend_authenticate(request, user=None, access_token=None, refresh_token=None):
return {"user_id": user.user_id, "username": user.username, "status": user.status}
def test_refresh_token(user_id, token):
token_true = app_redis.get_item(user_id)
if token == token_true:
return True
else:
return False
python
from sanic import Blueprint
from sanic_jwt import protected, inject_user
from sanic.response import json as resp_json
ApiData = Blueprint('ApiData', url_prefix='/api')
@ApiData.get("/data")
@inject_user()
@protected()
async def data(request, user):
return resp_json({"message": "success"}, status=200)
python
from config import REDIS_HOST, REDIS_PORT, REDIS_TOKEN_DB, REDIS_PASSWORD
from redis import StrictRedis
class AppRedis:
def __init__(self):
self.app_redis = StrictRedis(
host = REDIS_HOST,
port = REDIS_PORT,
password = REDIS_PASSWORD,
db = REDIS_TOKEN_DB,
decode_responses = True
)
def set_item(self, user_id, refresh_token):
key = f'refresh_token_{user_id}'
return self.app_redis.set(key, refresh_token)
def get_item(self, user_id):
key = f'refresh_token_{user_id}'
return self.app_redis.get(key)
项目代码 config
python
# web api
WEB_DOMAIN = "<DOMAIN>"
WEB_HOST = "127.0.0.1"
WEB_HTTP_PORT = 80
# redis
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_PASSWORD = 0
REDIS_TOKEN_DB = "<REDIS_PASSWORD>"
测试
shell
# Login
curl --location --request POST '{{host}}/api/auth/login' \
--header 'Content-Type: application/json' \
--data-raw '{
"username": "{{username}}",
"password": "{{password}}"
}'
# Response
# Success
# {
# "access_token": "{{access_token}}",
# "refresh_token": "{{refresh_token}}"
# }
# Failure
# {
# "reasons": [
# "Password is incorrect."
# ],
# "exception": "AuthenticationFailed"
# }
# Refresh Access Token
curl --location --request POST '{{host}}/api/auth/refresh' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer {{access_token}}' \
--data-raw '{
"refresh_token": "{{refresh_token}}"
}'
# Response
# Success
# {
# "access_token": "{{access_token}}"
# }
# Failure
# {
# "reasons": [
# "Auth required."
# ],
# "exception": "Unauthorized"
# }
# Get Data
curl --location --request GET '{{host}}/api/data' \
--header 'Authorization: Bearer {{access_token}}'
# Response
# Success
# {
# "message": "success"
# }
# Failure
# {
# "reasons": [
# "Auth required."
# ],
# "exception": "Unauthorized"
# }