Skip to content

Sanic with sanic_jwt

依赖环境

yml
version: '3'
services:
  redis:
    container_name: web_redis
    image: redis:alpine
    restart: always
    ports:
      - "127.0.0.1:6379:6379"
    command: redis-server --save 20 1 --loglevel warning --requirepass REDIS_PASSWORD
    volumes:
      - ./__env_cache__/redis/datadir:/data
      - ./__env_cache__/redis/conf:/usr/local/etc/redis
      - ./__env_cache__/redis/logs:/logs
    logging:
      driver: "json-file"
      options:
        max-size: "1000k"
        max-file: "20"
    shm_size: 512mb
    network_mode: bridge
  api:
    container_name: web_api
    build: .
    image: u20p39poetry
    restart: always
    volumes:
      - ./project:/root/project
      - ./__env_cache__:/root/.cache/
    working_dir: /root/project
    command: poetry run python app.py
    logging:
      driver: "json-file"
      options:
        max-size: "1000k"
        max-file: "10"
    network_mode: host
    depends_on:
      - redis
toml
[tool.poetry.dependencies]
python = "^3.9"
sanic = "^23.6.0"
sanic-jwt = "^1.8.0"
redis = "^5.0.1"
passlib = "^1.7.4"
loguru = "^0.7.2"
sanic-cors = "^2.2.0"

[[tool.poetry.source]]
name = "aliyun"
url = "http://mirrors.aliyun.com/pypi/simple"
priority = "default"

项目代码

python
from sanic import Sanic
from sanic_jwt import initialize
from handlers.auth import (
    authenticate, 
    retrieve_refresh_token, 
    store_refresh_token, 
    retrieve_user,
    auth_views,
    MyResponses
)
from handlers.api import ApiData
from config import WEB_HOST, WEB_HTTP_PORT
from sanic_cors import CORS

app = Sanic(__name__)
SECRET = "KET_SECEPEPSAFEEP_IT_KE__I"

app.blueprint(ApiData)
CORS(app)  # Enable CORS for all routes

initialize(
    app,
    secret                = SECRET,
    authenticate          = authenticate,

    url_prefix            = '/apiv2/auth',
    path_to_authenticate  = '/login',
    path_to_refresh       = '/refresh',
    path_to_verify        = '/test',
    path_to_retrieve_user = "/user",

    refresh_token_enabled  = True,
    store_refresh_token    = store_refresh_token,
    retrieve_refresh_token = retrieve_refresh_token,
    retrieve_user          = retrieve_user,

    responses_class        = MyResponses,
    class_views            = auth_views,
)

if __name__ == '__main__':
    app.run(host=WEB_HOST, port=WEB_HTTP_PORT, access_log=True, auto_reload=True)
python
from dataclasses import dataclass

@dataclass
class UserItem:
    user_id: int
    username: str
    status: str

    def to_dict(self):
        return dict(
            user_id=self.user_id, 
            username=self.username, 
            status=self.status
            )

class AuthManager:

    def __init__(self):
        self.users = [{'user_id': 0, 'username': 'user', 'password': 'password', 'source': 'api', 'status': 0}]

    def find_user_by_uid(self, user_id):
        for item in self.users:
            if item['user_id'] == user_id:
                return UserItem(item['user_id'], item['username'], item['status'])
        return None

    def user_exists(self, username):
        for item in self.users:
            if item['username'] == username:
                return True, self.find_user_by_uid(item['user_id'])
        return False, None

    def insert_user(self, username, password, source, status):
        # self.users.append({'id': 0, 'username': username, 'password': password, 'source': source, 'status': status})
        return True
    
    def is_auth_match(self, username, password):
        is_exists, _ = self.user_exists(username)
        if is_exists:
            for item in self.users:
                if item['username'] == username and item['password'] == password:
                    return True, self.find_user_by_uid(item['user_id'])
        return False, None
python
from sanic_jwt import BaseEndpoint, Responses, exceptions
from sanic.response import json as resp_json
from lib.redis_app import AppRedis
import traceback
from handlers.auth_manager import AuthManager

am = AuthManager()

app_redis = AppRedis()

async def authenticate(request, *args, **kwargs):
    username = request.json.get("username", None)
    password = request.json.get("password", None)

    if not username or not password:
        raise exceptions.AuthenticationFailed("Missing username or password.")

    is_match, user = am.is_auth_match(username, password)
    if not is_match:
        raise exceptions.AuthenticationFailed("Auth fail.")

    return user


async def store_refresh_token(user_id, refresh_token, *args, **kwargs):
    r = app_redis.set_item(user_id, refresh_token)

async def retrieve_refresh_token(request, user_id, *args, **kwargs):
    result = app_redis.get_item(user_id)
    return result

async def retrieve_user(request, payload, *args, **kwargs):
    if payload:
        user_id = payload.get('user_id', None)
        user = am.find_user_by_uid(user_id)
        return user
    else:
        return None

class Register(BaseEndpoint):

    async def post(self, request, *args, **kwargs):
        try:
            payload_data = request.json
            username = payload_data.get('username')
            password = payload_data.get('password')
            is_exists, _ = am.user_exists(username)
            if not is_exists:
                am.insert_user(username, password, 'from api', 0)
                resp_data = dict(code=0, message="请求已提交,请等待管理员验证", data=None)
                return resp_json(resp_data, status=200)
            else:
                resp_data = dict(code=0, message="请勿频繁提交", data=None)
                return resp_json(resp_data, status=200)
        except:
            traceback.print_exc()
            resp_data = dict(code=-2, message="failed", data=None)
            return resp_json(resp_data, status=404)

auth_views = (
    ('/sign', Register),
)

class MyResponses(Responses):

    @staticmethod
    def extend_authenticate(request, user=None, access_token=None, refresh_token=None):
        return {"user_id": user.user_id, "username": user.username, "status": user.status}

def test_refresh_token(user_id, token):
    token_true = app_redis.get_item(user_id)
    if token == token_true:
        return True
    else:
        return False
python
from sanic import Blueprint
from sanic_jwt import protected, inject_user
from sanic.response import json as resp_json


ApiData = Blueprint('ApiData', url_prefix='/api')

@ApiData.get("/data")
@inject_user()
@protected()
async def data(request, user):
    return resp_json({"message": "success"}, status=200)
python
from config import REDIS_HOST, REDIS_PORT, REDIS_TOKEN_DB, REDIS_PASSWORD
from redis import StrictRedis

class AppRedis:

    def __init__(self):
        self.app_redis = StrictRedis(
            host             = REDIS_HOST, 
            port             = REDIS_PORT, 
            password         = REDIS_PASSWORD, 
            db               = REDIS_TOKEN_DB, 
            decode_responses = True
        )
    
    def set_item(self, user_id, refresh_token):
        key = f'refresh_token_{user_id}'
        return self.app_redis.set(key, refresh_token)

    def get_item(self, user_id):
        key = f'refresh_token_{user_id}'
        return self.app_redis.get(key)

项目代码 config

python
# web api
WEB_DOMAIN    = "<DOMAIN>"
WEB_HOST      = "127.0.0.1"
WEB_HTTP_PORT = 80

# redis
REDIS_HOST     = "127.0.0.1"
REDIS_PORT     = 6379
REDIS_PASSWORD = 0
REDIS_TOKEN_DB = "<REDIS_PASSWORD>"

测试

shell
# Login
curl --location --request POST '{{host}}/api/auth/login' \
--header 'Content-Type: application/json' \
--data-raw '{
    "username": "{{username}}",
    "password": "{{password}}"
}'

# Response
# Success
# {
#     "access_token": "{{access_token}}",
#     "refresh_token": "{{refresh_token}}"
# }
# Failure
# {
#     "reasons": [
#         "Password is incorrect."
#     ],
#     "exception": "AuthenticationFailed"
# }

# Refresh Access Token
curl --location --request POST '{{host}}/api/auth/refresh' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer {{access_token}}' \
--data-raw '{
    "refresh_token": "{{refresh_token}}"
}'

# Response
# Success
# {
#     "access_token": "{{access_token}}"
# }
# Failure
# {
#     "reasons": [
#         "Auth required."
#     ],
#     "exception": "Unauthorized"
# }

# Get Data
curl --location --request GET '{{host}}/api/data' \
--header 'Authorization: Bearer {{access_token}}'

# Response
# Success
# {
#     "message": "success"
# }
# Failure
# {
#     "reasons": [
#         "Auth required."
#     ],
#     "exception": "Unauthorized"
# }

Released under the MIT License.