如何使用基于 JWT 令牌的授权来保护 fastapi API 端点?

新手上路,请多包涵

我对 python 中的 FastAPI 有点陌生。我正在构建一个需要具有基于 JWT 令牌的授权的 API 后端框架。现在,我知道如何生成 JWT 令牌,但不确定如何将其与 Python 中快速 api 中的 API 方法集成。任何指针将不胜感激。

原文由 Aditya Bhattacharya 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 741
2 个回答

在朋友和同事的帮助下,我解决了这个问题,并想与社区分享这个解决方案。这是现在的样子:

Python代码—-

 import json

import os

import datetime

from fastapi import HTTPException, Header

from urllib.request import urlopen

from jose import jwt

from jose import exceptions as JoseExceptions

from utils import logger

AUTH0_DOMAIN = os.environ.get(
    'AUTH0_DOMAIN', 'https://<domain>/<tenant-id>/')

AUTH0_ISSUER = os.environ.get(
    'AUTO0_ISSUER', 'https://sts.windows.net/<tenant>/')

AUTH0_API_AUDIENCE = os.environ.get(
    'AUTH0_API_AUDIENCE', '<audience url>')

AZURE_OPENID_CONFIG = os.environ.get(
    'AZURE_OPENID_CONFIG', 'https://login.microsoftonline.com/common/.well-known/openid-configuration')

def get_token_auth_header(authorization):
    parts = authorization.split()

    if parts[0].lower() != "bearer":
        raise HTTPException(
            status_code=401,
            detail='Authorization header must start with Bearer')
    elif len(parts) == 1:
        raise HTTPException(
            status_code=401,
            detail='Authorization token not found')
    elif len(parts) > 2:
        raise HTTPException(
            status_code=401,
            detail='Authorization header be Bearer token')

    token = parts[1]
    return token

def get_payload(unverified_header, token, jwks_properties):
    try:
        payload = jwt.decode(
            token,
            key=jwks_properties["jwks"],
            algorithms=jwks_properties["algorithms"],  # ["RS256"] typically
            audience=AUTH0_API_AUDIENCE,
            issuer=AUTH0_ISSUER
        )
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=401,
            detail='Authorization token expired')
    except jwt.JWTClaimsError:
        raise HTTPException(
            status_code=401,
            detail='Incorrect claims, check the audience and issuer.')
    except Exception:
        raise HTTPException(
            status_code=401,
            detail='Unable to parse authentication token')

    return payload

class AzureJWKS:
    def __init__(self, openid_config: str=AZURE_OPENID_CONFIG):
        self.openid_url = openid_config
        self._jwks = None
        self._signing_algorithms = []
        self._last_updated = datetime.datetime(2000, 1, 1, 12, 0, 0)

    def _refresh_cache(self):
        openid_reader = urlopen(self.openid_url)
        azure_config = json.loads(openid_reader.read())
        self._signing_algorithms = azure_config["id_token_signing_alg_values_supported"]
        jwks_url = azure_config["jwks_uri"]

        jwks_reader = urlopen(jwks_url)
        self._jwks = json.loads(jwks_reader.read())

        logger.info(f"Refreshed jwks config from {jwks_url}.")
        logger.info("Supported token signing algorithms: {}".format(str(self._signing_algorithms)))
        self._last_updated = datetime.datetime.now()

    def get_jwks(self, cache_hours: int=24):

            logger.info("jwks config is out of date (last updated at {})".format(str(self._last_updated)))
            self._refresh_cache()
        return {'jwks': self._jwks, 'algorithms': self._signing_algorithms}

jwks_config = AzureJWKS()

async def require_auth(token: str = Header(...)):
    token = get_token_auth_header(token)


    try:
        unverified_header = jwt.get_unverified_header(token)
    except JoseExceptions.JWTError:
        raise HTTPException(
                    status_code=401,
                    detail='Unable to decode authorization token headers')

    payload = get_payload(unverified_header, token, jwks_config.get_jwks())
    if not payload:
        raise HTTPException(
                    status_code=401,
                    detail='Invalid authorization token')

    return payload

我希望社区能从中受益!

原文由 Aditya Bhattacharya 发布,翻译遵循 CC BY-SA 4.0 许可协议

我发现可以对已接受的答案进行某些改进:

  • 如果您选择使用 HTTPBearer security schema ,则 Authorization 标头内容的格式会自动验证,并且不需要像接受的答案中那样的功能 get_token_auth_header 。此外,关于身份验证,生成的文档最终变得非常清晰和解释:

在此处输入图像描述

  • 当您解码令牌时,您可以捕获所有属于类 JOSEError 的后代的异常,并打印它们的消息,避免捕获特定异常并编写自定义消息
  • 奖励:在 jwt 解码方法中,您可以指定要忽略的声明,因为您不想验证它们

示例片段:哪里…

 /endpoints
          - hello.py
          - __init__.p
dependency.py
main.py
 # dependency.py script
from jose import jwt
from jose.exceptions import JOSEError
from fastapi import HTTPException, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

security = HTTPBearer()

async def has_access(credentials: HTTPAuthorizationCredentials= Depends(security)):
    """
        Function that is used to validate the token in the case that it requires it
    """
    token = credentials.credentials

    try:
        payload = jwt.decode(token, key='secret', options={"verify_signature": False,
                                                           "verify_aud": False,
                                                           "verify_iss": False})
        print("payload => ", payload)
    except JOSEError as e:  # catches any exception
        raise HTTPException(
            status_code=401,
            detail=str(e))
 # main.py script
from fastapi import FastAPI, Depends
from endpoints import hello
from dependency import has_access

app = FastAPI()

# routes
PROTECTED = [Depends(has_access)]

app.include_router(
    hello.router,
    prefix="/hello",
    dependencies=PROTECTED
)
 # hello.py script
from fastapi import APIRouter

router = APIRouter()

@router.get("")
async def say_hi(name: str):
    return "Hi " + name

通过利用所有提到的功能,您最终可以超快地构建具有安全性的 API :)

原文由 onofricamila 发布,翻译遵循 CC BY-SA 4.0 许可协议

推荐问题