kubernetes 如何在Python FastAPI中记录原始HTTP请求/响应

3lxsmp7m  于 2022-11-28  发布在  Kubernetes
关注(0)|答案(3)|浏览(244)

我们正在使用Python FastAPI编写一个Web服务,该服务将托管在Kubernetes中。出于审计目的,我们需要保存特定路由的请求/响应的原始JSON主体。请求和响应JSON的主体大小都是1 MB左右,最好不影响响应时间。我们如何做到这一点?

hxzsmxv2

hxzsmxv21#

您可以尝试自定义APIRouter,如FastAPI官方文档中所示:

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute

class TimedRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            response: Response = await original_route_handler(request)
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            print(f"route duration: {duration}")
            print(f"route response: {response}")
            print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler

app = FastAPI()
router = APIRouter(route_class=TimedRoute)

@app.get("/")
async def not_timed():
    return {"message": "Not timed"}

@router.get("/timed")
async def timed():
    return {"message": "It's the time of my life"}

app.include_router(router)
lqfhib0f

lqfhib0f2#

选项1-使用中间件

您可以使用Middlewaremiddleware接受应用程序的每个请求,因此,允许您在request被任何特定端点处理之前处理它,以及在response被返回到客户端之前处理它。要创建middleware,您可以在函数顶部使用装饰器@app.middleware("http")。如下所示。因为您需要使用middleware内部流的请求主体-使用request.body()request.stream(),如this answer所示(在幕后,前一个方法实际上调用了后一个方法,请参见此处)-那么当您稍后将request传递给相应的端点时,它将不可用。您可以按照本文中描述的方法来使请求主体始终可用(例如,使用下面的set_body函数)。至于response主体,您可以使用this answer中描述的相同方法来使用主体,然后将response返回给客户端。前面链接的答案中描述的两种方法都可以使用;但是,下面的代码使用选项2,该选项将正文存储在bytes对象中,并直接返回自定义的Response(以及原始responsestatus_codeheadersmedia_type)。
要记录数据,可以使用BackgroundTask,如this answerthis answer中所述。BackgroundTask只在响应发送后运行(请参见Starlette documentation);因此,客户端在接收response之前不必等待日志记录完成(因此,响应时间不会受到明显影响)。

    • 注意:如果您有一个流requestresponse,其主体不适合您的服务器RAM(例如,假设在运行8GB RAM的机器上有一个100GB的主体),那么它就会出现问题,因为您将数据存储在RAM中,而RAM没有足够的可用空间来容纳累积的数据。您提到 "请求和响应JSON的主体大小大约为1MB";因此,这通常是可以的(但是,事先考虑一些问题总是一个好的做法,例如,您的API预计将同时服务多少请求,哪些其他应用程序可能正在使用RAM等,以便判断这是否是一个问题)。如果需要,您可以使用以下命令限制对API端点的请求数量,例如,SlowAPI(如this answer中所示)。此外,您还可以根据预定义的路由列表检查request.url.path(如this answer中所述)(请参阅"更新"部分),或者使用自定义的APIRoute类(如下面的选项2**中所示),将middleware的使用限制在特定端点。
工作示例
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging

app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)
    
def log_info(req_body, res_body):
    logging.info(req_body.decode())
    logging.info(res_body.decode())

async def set_body(request: Request, body: bytes):
    async def receive() -> Message:
        return {'type': 'http.request', 'body': body}
    request._receive = receive
    
@app.middleware('http')
async def some_middleware(request: Request, call_next):
    req_body = await request.body()
    await set_body(request, req_body)
    response = await call_next(request)
    
    res_body = b''
    async for chunk in response.body_iterator:
        res_body += chunk
    
    task = BackgroundTask(log_info, req_body, res_body)
    return Response(content=res_body, status_code=response.status_code, 
        headers=dict(response.headers), media_type=response.media_type, background=task)

@app.post('/')
def main(payload: Dict[Any, Any]):
    return payload

如果要对请求正文执行某些验证(例如,确保请求正文大小不超过某个值),可以使用.stream()方法(与this answer类似)一次一个块地处理正文,而不是使用request.body()

@app.middleware('http')
async def some_middleware(request: Request, call_next):    
    req_body = b''
    async for chunk in request.stream():
        req_body += chunk
    ...

选项2-使用APIRoute

您也可以使用自订APIRoute类别,类似于herehere

工作示例
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging

def log_info(req_body, res_body):
    logging.info(req_body.decode())
    logging.info(res_body.decode())
       
class LoggingRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            req_body = await request.body()
            response = await original_route_handler(request)
            res_body = response.body
            response.background = BackgroundTask(log_info, req_body, res_body)
            return response

        return custom_route_handler

app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)

@router.post('/')
def main(payload: Dict[Any, Any]):
    return payload
    
app.include_router(router)
egmofgnx

egmofgnx3#

由于其他答案对我不起作用,我在stackoverflow上搜索了相当广泛的内容来解决这个问题,我将在下面展示我的解决方案。
主要的问题是,当使用请求主体或响应主体时,在线提供的许多方法/解决方案根本不起作用,因为请求/响应主体在从流中阅读时被消耗掉了。
为了解决这个问题,我采用了一种方法,在阅读请求和响应后,基本上重构它们。这主要是基于用户'kovalevvlad'在https://github.com/encode/starlette/issues/495上的评论。
创建自定义中间件,然后将其添加到应用程序中以记录所有请求和响应。

from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message

# Set up your custom logger here
logger = ""

class RequestWithBody(Request):
    """Creation of new request with body"""
    def __init__(self, scope: Scope, body: bytes) -> None:
        super().__init__(scope, self._receive)
        self._body = body
        self._body_returned = False

    async def _receive(self) -> Message:
        if self._body_returned:
            return {"type": "http.disconnect"}
        else:
            self._body_returned = True
            return {"type": "http.request", "body": self._body, "more_body": False}

class CustomLoggingMiddleware(BaseHTTPMiddleware):
    """
    Use of custom middleware since reading the request body and the response consumes the bytestream.
    Hence this approach to basically generate a new request/response when we read the attributes for logging.
    """
    async def dispatch(  # type: ignore
        self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
    ) -> Response:
            # Store request body in a variable and generate new request as it is consumed.
            request_body_bytes = await request.body()
        
        request_with_body = RequestWithBody(request.scope, request_body_bytes)

        # Store response body in a variable and generate new response as it is consumed.
        response = await call_next(request_with_body)
        response_content_bytes, response_headers, response_status = await self._get_response_params(response)

        # Logging

        # If there is no request body handle exception, otherwise convert bytes to JSON.
        try:
            req_body = json.loads(request_body_bytes)
        except JSONDecodeError:
            req_body = ""
        # Logging of relevant variables.
        logger.info(
            f"{request.method} request to {request.url} metadata\n"
            f"\tStatus_code: {response.status_code}\n"
            f"\tRequest_Body: {req_body}\n"
        )
        # Finally, return the newly instantiated response values
        return Response(response_content_bytes, response_status, response_headers)

async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
    """Getting the response parameters of a response and create a new response."""
    response_byte_chunks: List[bytes] = []
    response_status: List[int] = []
    response_headers: List[Dict[str, str]] = []

    async def send(message: Message) -> None:
        if message["type"] == "http.response.start":
            response_status.append(message["status"])
            response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
        else:
            response_byte_chunks.append(message["body"])

    await response.stream_response(send)
    content = b"".join(response_byte_chunks)
    return content, response_headers[0], response_status[0]

相关问题