Fastapi OAuth2令牌处理,缺少授权标头

qgelzfjb  于 12个月前  发布在  其他
关注(0)|答案(2)|浏览(133)

我正在使用FastAPI和OAuth2 PasswordBearer以及RequestForm来实现用户登录。登录和检索令牌可以工作,但使用令牌对我来说不起作用。
我得到了这个OAuth2 PasswordBearer设置和/token函数:

authmanager = OAuth2PasswordBearer(tokenUrl='dauPP/token')

@router.post("/token", response_model=Token)
async def login_for_access_token(db: AsyncIOMotorDatabase = Depends(get_database),
                                 form_data: OAuth2PasswordRequestForm = Depends()):
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

字符串
这个函数根据我的MongoDB检查表单中提供的给定用户名和密码,这很好用。
但如果我尝试访问托管路由器f.e. /home

@router.get("/home", response_class=HTMLResponse)
async def get_home(request: Request, current_user: User = Depends(get_current_active_user)):
    return templates.TemplateResponse("home.html", {
        "request": request, "title": "[D]ocument [A]dvanced [U]tility", "subtitle": current_user.username
    })


我收到
“未经认证”
如果我没有事先拿到代币的话,那就太好了。
我检查了PasswordBearer正在处理的内容:

async def __call__(self, request: Request) -> Optional[str]:
        print(request.headers)
        authorization: str = request.headers.get("Authorization")
        scheme, param = get_authorization_scheme_param(authorization)
        if not authorization or scheme.lower() != "bearer":
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        return param


所以我看了一下request. header的内容,这是我发现的(“Cheking for.”是我添加的打印):

INFO:     127.0.0.1:59213 - "POST /dauAPP/token HTTP/1.1" 200 OK
Headers({'host': '127.0.0.1:8000', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'accept-language': 'de,en-US;q=0.7,en;q=0.3', 'accept-encoding': 'gzip, deflate', 'connection': 'keep-alive', 'upgrade-insecure-requests': '1', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1'})
Checking for authorization in header...result:None
INFO:     127.0.0.1:59213 - "GET /dauAPP/home HTTP/1.1" 401 Unauthorized


我对web开发很陌生,所以我的想法可能有根本性的缺陷,但我认为,返回访问令牌会导致客户端/浏览器存储它并自动将其添加到“授权”中的请求头中。我必须在客户端对令牌做些什么吗?可能是配置有问题,所以Auth-Header被切断了吗?
我认为它是如何工作的:

get_home is called 
async def get_home(request: Request, current_user: User = Depends(get_current_active_user)):
--> dependency calling get_current_active_user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
--> dependency calling get_current_user

async def get_current_user(db: AsyncIOMotorDatabase = Depends(get_database), token: str = Depends(authmanager)):
--> dependency calls authmanager (OAuth2PasswordBearer)

which calls the def __call__ function mentioned earlier and here the header is missing.

wj8zmpe1

wj8zmpe11#

但我认为,返回访问令牌将导致客户端/浏览器存储它,并自动将其添加到“授权”中的请求头中。
这种假设是错误的。您的应用负责存储令牌并为针对端点的每个请求传输令牌。如果您计划使用常规HTML并作为用户单击链接而不是作为API端点在其中导航,您可能需要考虑使用cookie而不是HTTP头(由浏览器自动发送)。
swagger-ui可以从API签名中确定是否需要Authorization头(这是OAuth2 PasswordBearer所做的),它知道它可以请求并期望提供该头。由于swagger-ui不是HTTP的一部分,因此这不是浏览器应该或能够做的事情。
但是,Cookie确实可以实现这一目的-因此,如果您想这样做,您可以使用Cookie。但是API请求不包括Cookie,并且更常见的是为这些请求使用Authorization标头。

lndjwyie

lndjwyie2#

我也是一个新手,在尝试直接使用FastAPI作为后端和前端实现网站时遇到了类似的问题(这不是最优的,但我还没有学会react.js)。
几天后我设法解决了这个问题。以下是我能从我有限的经验中告诉你的。
请注意以下事项:
1.您已经显示了OAuth2PasswordBearer的源代码。该类要求您将标记存储在一个头中,该头的键为"Authorization",值为"Bearer <token>"
1.您的"/token"路由正在创建并检索一个令牌,但该令牌既没有存储为头,也没有存储为cookie。
1.您可能希望将路由拆分为两个路由:"/login""/token"。使用"/login"处理表单数据并存储授权令牌,使用"/token"对用户进行身份验证并生成授权令牌。
1.一旦用户登录网站,您通常会希望将用户重定向到受保护的页面。但是,在重定向过程中,标题会丢失。Cookie是一种特殊的标题,在重定向过程中不会丢失。
1.正因为如此,您的代码可以在Swagger UI中工作,但当您通过在网站的登录表单中点击“提交”按钮来实际测试它时,就不行了。
1.如果你想使用redirect,那么你可以使用fastapi-login package。或者,你可以创建一个继承自OAuth2PasswordBearer的自定义类,并修改它的__call__方法,以便从cookie中检索授权令牌。我使用的是后一种方法。
1.使用安全cookie来存储令牌的一个好处是,您可以防止黑客使用JavaScript. Check this video.进行攻击。
下面的代码展示了如何在一个使用fastapi作为后端和前端的网站中使用tokenbearer进行身份验证。
让我们从文件结构开始:

.
├── app
│   ├── __init__.py
│   ├── models
│   │   ├── individual.py
│   │   ├── __init__.py
│   │   └── user.py
│   ├── routes
│   │   ├── individual.py
│   │   ├── __init__.py
│   │   ├── login.py
│   │   ├── logout.py
│   │   ├── readme.py
│   │   ├── register.py
│   │   └── test_db.py
│   ├── static
│   │   ├── favicon2.ico
│   │   ├── logo.jpeg
│   │   └── styles.css
│   └── templates
│       ├── index.html
│       ├── individual.html
│       ├── layout.html
│       ├── login.html
│       ├── register.html
├── core
│   ├── hashing.py
│   ├── helpers.py
│   ├── __init__.py
│   ├── jwt.py
│   ├── oauth_extension.py
├── data
│   ├── create_db.sql
│   ├── my_database.db
│   ├── db_setup.py
│   └── __init__.py
├── main.py
└── README.md

字符串
现在,假设您在"login.html"中有一个登录表单。当用户提交此表单时,会向"/login"路由发送一个发布请求。
下面的代码展示了如何检索一个令牌并将其存储在cookie和头文件中。之后,用户将被重定向到"/individual"。重定向会擦除头文件,但保留cookie,因此在这种情况下,将令牌存储在头文件中实际上是无用的。

# project/app/routes/login.py

from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from core.jwt import create_access_token, authenticate_user

templates = Jinja2Templates(directory="app/templates")
router = APIRouter(prefix="/auth", tags=["auth"])
ACCESS_TOKEN_EXPIRE_MINUTES = 30

@router.get("/login", response_class=HTMLResponse)
async def render_login_page(request: Request):
    context = {
        "request": request,
        "display_version": "GET"
    }
    return templates.TemplateResponse("login.html", context)

@router.post("/login")
async def validate_login_form(
     request: Request, user_auth_form: Annotated[OAuth2PasswordRequestForm, Depends()]
     ):
    print("reached /login route.")

    # Verify user credentials and create auth token
    token = await get_token(request, user_auth_form)
    access_token = token.get("access_token")

    print("Valid user. Store token and redirect")

    redirect_url = "/individual"

    # This is how to store the token in headers, but it actually is lost in this case during redirect
    headers = {"Authorization": f"Bearer {access_token}"}
    response = RedirectResponse(
         redirect_url, status_code = status.HTTP_303_SEE_OTHER, headers=headers
         )

    # Since headers are removed during redirection, but cookies are not, store token in cookie
    response.set_cookie(
         key='access-token', value=access_token, httponly=True, secure=True)
    return response

@router.post("/token")
async def get_token(
     request: Request, user_auth_form: Annotated[OAuth2PasswordRequestForm, Depends()]
     ):
        print("Reached '/auth/token' endpoint.\nValidating username and password")
        valid_user = await authenticate_user(user_auth_form)
        if not valid_user:
            raise HTTPException(
                 status_code = 400,
                 detail = "Incorrect username or password",
                 headers = {"WWW-Authenticate": "Bearer"},
                )
        # Valid user, generate temporary token
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
                        data = {"sub": user_auth_form.username},
                        expires_delta = access_token_expires
                        )
        return {"access_token": access_token, "token_type": "bearer"}


当用户到达"/individual"路由时,这是一个受保护的路由,它需要在请求中包含一个有效的身份验证令牌。我们通过使用如下所示的依赖关系注入来声明它:Depends(verify_token)。下面是该individual.py的代码:

# app/routes/individual.py
from fastapi import APIRouter, Request, Depends
from typing import Annotated
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from typing import Annotated
from app.models.user import TokenData
from core.jwt import verify_token

router = APIRouter()
templates = Jinja2Templates(directory="app/templates")

@router.get("/individual", response_class=HTMLResponse)
async def index(request: Request, user_token: Annotated[TokenData, Depends(verify_token)]):
    context = {
        "request": request,
        "display_version": "GET",
        "username": user_token.username
    }
    print("Welcome to individual: ", user_token)
    return templates.TemplateResponse("individual.html", context)


当到达"/individual"路由时,应该从request中检索令牌,以便授权用户访问此路由。如果授权令牌存储在request.headers中,则我们将使用OAuth2PasswordBearer来检索它。但在本例中,我们通过重定向到达了此路由。它从请求中删除此头。request. headers中没有"Authorization"键。因此,OAuth2PasswordBearer中的__call__方法将无法检索此行中的授权令牌,它将返回None

authorization: str = request.headers.get("Authorization")


为了解决这个问题,我得到了here的启发,创建了一个类OAuth2PasswordBearerWithCookie来处理cookie和头文件。
下面是OAuth2PasswordBearerWithCookie类的实现。

# project/core/oauth_extension.py

# Inspired by: https://www.fastapitutorial.com/blog/fastapi-jwt-httponly-cookie/

from typing import Optional, Tuple
from fastapi.security import OAuth2PasswordBearer
from fastapi.exceptions import HTTPException
from fastapi.security.utils import get_authorization_scheme_param
from starlette.requests import Request
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN

from starlette.datastructures import MutableHeaders

class OAuth2PasswordBearerWithCookie(OAuth2PasswordBearer):
    """ Extends the OAuth2PasswordBearer class to retrieve password either from cookies or headers"""
    async def __call__(self, request: Request) -> Optional[str]:

        # Try to get auth field from headers (will be None during a redirect)
        authorization = request.headers.get("Authorization")

        if authorization is not None:
            scheme, param = get_authorization_scheme_param(authorization)
            if not authorization or scheme.lower() != "bearer":
                if self.auto_error:
                    raise HTTPException(
                        status_code=HTTP_401_UNAUTHORIZED,
                        detail="Not authenticated",
                        headers={"WWW-Authenticate": "Bearer"},
                    )
                else:
                    return None
            return param

        # No token in header. Try using cookie:

        # Get token from cookies
        token = request.cookies.get('access-token')
        if token:
            param = token
            return param
        else:
            raise HTTPException(
                status_code=HTTP_401_UNAUTHORIZED,
                detail="Not authenticated",
                headers={"WWW-Authenticate": "Bearer"},
            )


为了完整起见,下面是授权的其他函数。注意oauth2_scheme如何使用上面定义的自定义类OAuth2PasswordBearerWithCookie

# project/core/jwt.py

from fastapi import Depends, HTTPException, status, Request
from jose import jwt, JWTError
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from app.models.user import TokenData
from data.db_setup import database
from core.hashing import Hasher
from datetime import timedelta, datetime, timezone
from typing import Annotated
from core.oauth_extension import OAuth2PasswordBearerWithCookie

# References:
#  https://www.jetbrains.com/guide/python/tutorials/fastapi-aws-kubernetes/auth_jwt/
# https://datatracker.ietf.org/doc/html/rfc6750#page-5

# To get secret key like this in bash, run:
# openssl rand -hex 32
# Secret key is used to sign the token by the server. Store is in your .venv, not here
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
# ACCESS_TOKEN_EXPIRE_MINUTES = 30

# https://github.com/tiangolo/fastapi/blob/master/fastapi/security/oauth2.py
# oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
# Notice: OAuth2PasswordBearer will not read cookies. Using my custom calss instead:
oauth2_scheme = OAuth2PasswordBearerWithCookie(tokenUrl="/auth/token")

async def authenticate_user(user_auth_form: OAuth2PasswordRequestForm) -> bool:
    """Returns True if username and password are registered in database"""
    # Find user in database
    user_row = await get_user_row_in_db(user_auth_form.username)

    if not user_row:
        print("username not found")
        # username not found in database
        return False

    if not Hasher.verify_password(
        user_auth_form.password, user_row.get("hashed_password")
        ):
        # password mismatch"
        print("invalid password")
        return False

    # User found and password does match.
    print("Valid user credentials!")
    return True

async def get_user_row_in_db(username: str) -> dict | None:
    """Queries database to retrieve username and hashed password from username"""

    desired_columns = ("id", "username", "hashed_password")

    # Get column names as 'col1, col2 ,...'
    concatenated_columns = ", ".join(
        [f"{column_name} " for column_name in desired_columns]
        )

    query = f"""
        SELECT {concatenated_columns}
        FROM users
        WHERE username = :username;
        """
    values = {"username": username}
    # Check if user is in database table 'users':
    row = await database.fetch_one(query=query, values=values)
    if not row:
        return None

    # Convert results to a dicionary with column names and value for each column
    return dict(zip(desired_columns, row))

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    """ Returns a JWT access token that expires within a time delta """
    to_encode = data.copy()

    # Set token creation timestamp
    current_time = datetime.now(tz=timezone.utc)
    to_encode.update({"iat": current_time})

    # Set token expire timestamp
    if expires_delta:
        expire = current_time + expires_delta
    else:
        expire = current_time + timedelta(minutes=15)
    to_encode.update({"exp": expire})

    # Encode token
    encoded_jwt = jwt.encode(claims = to_encode,
                             key = SECRET_KEY,
                             algorithm = ALGORITHM
                             )
    return encoded_jwt

async def verify_token(token: Annotated[str, Depends(oauth2_scheme)]) -> TokenData:
    """
    Decodes a JWT token and retrieves user
    We want to check the following:
    -1. Token Integrity:
        (a) Verify that the access token is valid,
        (b) hasn't expired,
        (c) and has been issued by your authentication server.
    -2. Token Claims: Ensure that the necessary claims are present in the payload.
    -3. User Existence: Check if the user identified by the claims (e.g., username) exists in your database.
    -4. Token Revocation: Depending on your security requirements, you might want to check whether the token has been revoked.
                       This can be important if you need to invalidate tokens before their natural expiration.
    """
    # Default exception if token validations fails
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    # Print (for debugging)
    print("*"*30,"\nToken verification in process")

    # Verify token claims and integrity
    try:
        # 1a, 1c: Check token is valid (1a), token has not expired(1b) and signed (1c)
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

        # 2: Check payload contains username
        username: str = payload.get("sub")

        print("-->Token payload")
        print(" username: ", username)

        if username is None:
            # Could not find username in token
            raise credentials_exception

    except JWTError:
        print("Token verification failed")
        raise credentials_exception

    token_data = TokenData(username=username)

    # 3: Verify user exists in database
    user = await get_user_row_in_db(username=token_data.username)
    if user is None:
        print("Token verification in process")
        raise credentials_exception

    # TODO: 4: Token revocation

    print("Token verification succeded")
    print(token_data)
    return token_data


我留下了一些打印语句,你可以用它们来调试。我希望这对你有帮助!

相关问题