Adapt for oauth by traefik

This commit is contained in:
matthias@matsewe.de
2024-06-25 11:46:31 +02:00
parent 1420dd7d23
commit 6cd1064f1d

View File

@@ -1,6 +1,6 @@
from typing import Annotated
from fastapi import HTTPException, Cookie, status
from fastapi import HTTPException, Cookie, status, Request
from fastapi.security import SecurityScopes
from jose import JWTError, jwt
from pydantic import ValidationError
@@ -10,9 +10,6 @@ import os
# to get a string like this run:
# openssl rand -hex 32
ALGORITHM = "HS512"
SECRET_KEY = os.environ['SECRET_KEY']
scopes_db = {
os.environ['ADMIN_EMAIL'] : ["admin"]
}
@@ -23,18 +20,17 @@ credentials_exception = HTTPException(
)
async def get_current_user(
security_scopes: SecurityScopes, access_token: Annotated[str, Cookie()] = ""
):
security_scopes: SecurityScopes, request: Request
):
try:
payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub") # type: ignore
username: str = request.headers.get("x-auth-request-user") # type: ignore
if username is None:
raise credentials_exception
email: str = payload.get("email") # type: ignore
email: str = request.headers.get("x-auth-request-email") # type: ignore
except (JWTError, ValidationError):
raise credentials_exception
scopes = scopes_db.get(email)
for scope in security_scopes.scopes:
if scope not in scopes:
raise credentials_exception
return payload | {"internal_scopes" : scopes}
return {"sub" : username, "email" : email, "internal_scopes" : scopes}