oauth2.0 将Keycloak与FastAPI集成

3phpmpom  于 2023-10-15  发布在  其他
关注(0)|答案(3)|浏览(247)

我有一个用FastAPI和SvelteKit编写的应用程序,这两个应用程序都在单独的容器中运行,目前的解决方案是Svelte将用户名和密码发送到FastAPI服务器,FastAPI服务器然后返回一个签名的JWT给Svelte,用于使用FastAPI进行身份验证。
登录流程Svelte ->用户名+密码-> FastAPI
FastAPI -> JWT -> Svelte(存储在cookie中)
以下是当请求Svelte(授权:Bearer)-> FastAPI
我想摆脱我自己的用户名和密码,并使用KeyCloak进行身份验证。我是OAuth的超级新手,所以我不知道我应该做什么,甚至不知道要搜索什么术语。
我知道我想要的是Svelte(后藤“/login”)->重定向到keycloak ->登录并获取令牌(以某种方式将令牌获取到FastAPI,以便我可以签署自己的令牌并将其发送给Svelte)
当我发出请求时Svelte -> FastAPI Token -> FastAPI

p1iqtdky

p1iqtdky1#

将Keycloak身份验证添加到应用程序的最简单方法是oauth2-proxy。oauth2-proxy就像一个反向代理一样站在你的应用程序前面,它将为你处理所有的OAuth2位。oauth2-proxy将生成Keycloak的登录URL(/realms/{realm-name}/protocol/openid-connect/auth)使用适当的参数启动OAuth2登录过程并将其重定向回应用程序页面,Keycloak将显示登录页面,然后在用户完成登录后,它将重定向回应用程序,此时oauth2-proxy将检查JWT令牌是否有效,设置cookie,然后将用户名、访问令牌和ID令牌作为HTTP头沿着传递到服务器。
当使用oauth2-proxy类型的方法时,您的Svelte前端应用程序或FastAPI应用程序都不需要了解OIDC/OAuth2的任何信息,它们甚至不需要访问客户端机密,这就降低了您的应用程序泄露这些机密的可能性。应用程序可能需要能够解码JWT令牌以读取访问令牌和ID令牌(其中包含auth声明),但如果您只需要一个Keycloak验证的用户名/电子邮件,那么您的FastAPI应用程序只需使用Header()读取HTTP标头即可获得用户名。无需签名验证即可解码JWT(因为我们假设oauth2-proxy已经验证了签名),只需使用标准库即可实现,基本上只需几个字符串操作,base64和json解析。
或者,您可以直接在应用程序中实现OAuth2流。如果应用程序需要做比JWT令牌的基本身份验证和授权更复杂的事情,或者如果您想要更好地控制身份验证过程,那么直接集成可能是必要的。Keycloak通过OIDC支持OAuth2,因此您可以使用任何OIDC库(如pyoidc)或Keycloak特定集成(如fastapi-keycloak-middleware)。在没有OIDC的情况下实现OAuth2也是可能的,但它会更加复杂。Keycloak、OIDC和OAuth2都有大量的配置,以支持各种不同的用例和安全需求,您需要更详细地研究这些配置,以确定哪种配置适合您的需求。
不使用OIDC和/或OAuth2库实现所有这些是可能的,但我并不真正推荐它们。您最终只会阅读大量无聊的OAuth2和OIDC规范。

guykilcj

guykilcj2#

我为fastAPI制作了一些Python code与keycloak集成,分享一下可能会有帮助。

Decomponentauth函数

#/auth.py
from fastapi.security import OAuth2AuthorizationCodeBearer
from keycloak import KeycloakOpenID # pip require python-keycloak
from config import settings
from fastapi import Security, HTTPException, status,Depends
from pydantic import Json
from models import User

# This is used for fastapi docs authentification
oauth2_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl=settings.authorization_url, # https://sso.example.com/auth/
    tokenUrl=settings.token_url, # https://sso.example.com/auth/realms/example-realm/protocol/openid-connect/token
)

# This actually does the auth checks
# client_secret_key is not mandatory if the client is public on keycloak
keycloak_openid = KeycloakOpenID(
    server_url=settings.server_url, # https://sso.example.com/auth/
    client_id=settings.client_id, # backend-client-id
    realm_name=settings.realm, # example-realm
    client_secret_key=settings.client_secret, # your backend client secret
    verify=True
)

async def get_idp_public_key():
    return (
        "-----BEGIN PUBLIC KEY-----\n"
        f"{keycloak_openid.public_key()}"
        "\n-----END PUBLIC KEY-----"
    )

# Get the payload/token from keycloak
async def get_payload(token: str = Security(oauth2_scheme)) -> dict:
    try:
        return keycloak_openid.decode_token(
            token,
            key= await get_idp_public_key(),
            options={
                "verify_signature": True,
                "verify_aud": False,
                "exp": True
            }
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e), # "Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
# Get user infos from the payload
async def get_user_info(payload: dict = Depends(get_payload)) -> User:
    try:
        return User(
            id=payload.get("sub"),
            username=payload.get("preferred_username"),
            email=payload.get("email"),
            first_name=payload.get("given_name"),
            last_name=payload.get("family_name"),
            realm_roles=payload.get("realm_access", {}).get("roles", []),
            client_roles=payload.get("realm_access", {}).get("roles", [])
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e), # "Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

定义您的模型,您可以使用任何您需要的模型:

#/models.py
from pydantic import BaseModel, EmailStr

class User(BaseModel):
    id: str
    username: str
    email: str
    first_name: str
    last_name: str
    realm_roles: list
    client_roles: list

class authConfiguration(BaseModel):
        server_url: str
        realm: str
        client_id: str
        client_secret: str
        authorization_url: str
        token_url: str

自定义配置:

#/config.py
from models import authConfiguration

settings = authConfiguration(
    server_url="http://localhost:8080/",
    realm="roc",
    client_id="rns:roc:portal",
    client_secret="",
    authorization_url="http://localhost:8080/realms/roc/protocol/openid-connect/auth",
    token_url="http://localhost:8080/realms/roc/protocol/openid-connect/token",
)

最后定义路由并保护它们:

#/main.py
import uvicorn
from fastapi import FastAPI,Depends
from models import User
from auth import get_user_info

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/secure")
async def root(user: User = Depends(get_user_info)):
    return {"message": f"Hello {user.username} you have the following service: {user.realm_roles}"}

if __name__ == '__main__':
    uvicorn.run('main:app', host="127.0.0.1", port=8081)

在keycloak方面创建一个领域,并在此真实创建一个客户端。创建一个测试用户来使用它进行身份验证,就是这样!

参考文献:

uklbhaso

uklbhaso3#

又是我,这里有一个结论,供将来遇到这个问题的人参考。我决定使用我的应用程序的官方keycloak.js连接器和keycloak python lib来验证后端的令牌。

相关问题