llama_index [Bug]:当与FastAPI一起使用时,流式响应不起作用(即使是使用websockets)

ny6fqffe  于 2个月前  发布在  其他
关注(0)|答案(7)|浏览(38)

问题描述

我无法使FastAPI中的StreamingResponse与StreamingResponse正常工作。
这个问题很奇怪,因为当我在下面的query方法中打印文本块时,我可以在终端中看到它依次输出文本 - 所以这不是response_gen / iterator的问题,而是FastAPI的StreamingResponse在使用llama-index response_gen迭代器时似乎无法正常工作。
我正在使用ChromaDB作为矢量存储库。我还尝试使用不使用websockets的简单POST REST API示例。Print_response_stream按预期工作。我尝试通过curl将文本块输出到命令行,通过使用python脚本(httpx/asyncio)在命令行上打印,以及一个简单的index.html,其中包含一个脚本将文本块输出到段落标签,以防我的命令行配置有问题。这些都没有起作用。
文本总是一次性打印出来(即使我在新行上打印)。

版本

0.9.45.post1

重现步骤

生成流式输出的函数

在此print_response_stream()之前的所有内容都设置/初始化完毕,而且在迭代器内我也打印了每个块的输出,每个块都正确地依次打印。因此问题可能出在FastAPI路由上。
查询方法

def query(self, query_text):
        # Query the index
        streaming_response = self.query_engine.query(query_text)
        return streaming_response.response_gen

FastAPI路由

@self.app.websocket("/ws/query")
        async def websocket_query(websocket: WebSocket):
            await self.connection_manager.connect(websocket)
            try:
                while True:
                    query_text = await websocket.receive_text()
                    streaming_response_gen = self.drive.query(query_text) # Your query engine here
                    for response in streaming_response_gen: # Assuming this is iterable
                        await self.connection_manager.send_personal_message(response, websocket)
            except WebSocketDisconnect:
                self.connection_manager.disconnect(websocket)
            except Exception as e:
                # Handle exceptions
                self.connection_manager.disconnect(websocket)
                print(f"Error: {str(e)}")

相关日志/回溯

Note that the 200 OK response is only received at the end of the text generation.

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     ('127.0.0.1', 56750) - "WebSocket /ws/query" [accepted]
INFO:     connection open
2024-02-08 16:35:36 - HTTP Request: POST http://localhost:11434/api/chat "HTTP/1.1 200 OK"
HTTP Request: POST http://localhost:11434/api/chat "HTTP/1.1 200 OK"
uz75evzq

uz75evzq1#

  • 🤖*

Hey @surya-ven, 很高兴再次见到你!看来你在寻找新的冒险。
根据你提供的信息,问题可能与FastAPI的StreamingResponse如何与你的服务器或客户端交互有关,而不是LlamaIndex response_gen迭代器本身的问题。
FastAPI的StreamingResponse被设计为在数据可用时立即发送数据。然而,你观察到的行为,即文本一次性在末尾打印出来,可能是由于服务器或客户端缓冲了响应。
如果你使用的是Uvicorn作为服务器,它可能会缓冲响应并一次性发送所有内容。你可以尝试使用Uvicorn的--no-send选项来禁用此行为:

uvicorn main:app --no-send

如果你使用的是不同的服务器,你应该查看其文档以查找类似的选项。
在客户端方面,如果你使用curl将块输出到命令行,你可以使用-N--no-buffer选项来禁用缓冲:

curl -N http://localhost:8000/stream

如果你使用的是网络浏览器作为客户端,它可能需要等待接收到整个响应后才显示。不幸的是,你可能无法在网络浏览器中禁用此行为。在这种情况下,你可以尝试使用允许禁用缓冲的不同客户端,或者尝试使用不同的方法来消费流,如Server-Sent Events或WebSockets。
关于response_gen对象的结构和类型,根据LlamaIndex代码库,它是TokenGen类型。然而,没有更多关于TokenGen类型的信息,很难提供更多关于其结构的信息。
请注意,这些是一般建议,可能无法解决你的具体问题。如果你仍然遇到困难,提供更多关于你的设置和所看到的具体行为的信息将更有帮助。

来源

这个响应旨在节省你的时间并对你有所帮助。它不是精确的解决方案,而是一个你自己研究的起点。
请帮助我变得更有用!如果这有帮助请点赞👍,如果不相关请点踩👎。
如果你想继续对话,请以@dosu-bot开头回复。

bfrts1fy

bfrts1fy2#

@dosu-bot 看起来在使用uvicorn.run从代码本身运行时,--no-send不是选项。

au9on6nz

au9on6nz3#

我可以确认这一点,当我使用异步StreamingResponse时遇到了这个问题。将其切换为同步可以使流媒体正常工作。

m2xkgtsf

m2xkgtsf4#

我确认了这个问题,当我使用异步StreamingResponse时遇到了这个问题。切换到同步模式使流媒体工作如预期般顺畅。
太好了,感谢指出这个问题,至少有一个临时的解决方法。然而,仍然无法使用websockets解决这个问题。

hs1ihplo

hs1ihplo5#

在没有FastAPI依赖的情况下,它就是无法正常工作的。所以response_gen有时会返回一个空响应,但是阻塞响应却可以正常工作。

wbgh16ku

wbgh16ku6#

请检查是否使用了GzipMiddleware。如果是,则streamingResponse将无法正常工作。

qxsslcnc

qxsslcnc7#

我在使用FastAPI websockets + LlamaIndex时遇到了一些问题,无法正常进行流式响应。通过模拟StreamResponse的一些功能并发现我的异步函数存在一些问题,最终解决了这个问题。

响应生成器应该是一个生成器,这样你可以通过以下方式模拟StreamingResponse的功能。这样一来,你就可以在不调用LlamaIndex的其他部分的情况下测试生成器部分,并且可以隔离任何问题:

TokenGen = Generator[str, None, None]

@dataclass
class StreamingResponse:
    response_gen: TokenGen
    source_nodes: List = field(default_factory=list)
    metadata: Optional[Dict[str, Any]] = None
    response_txt: Optional[str] = None

def mock_stream() -> Generator:
    response = "Hello there! How can I assist you today?" * int(100)
    for word in response.split():
        yield word + " "

def rag_response_mock_generator(user_message:str) -> StreamingResponse:
    """
This function is a mock of the llamaindex chat function
It will return a mock stream of responses from the vdb.
"""
    stream_resp = StreamingResponse(
        response_gen=mock_stream(),
        source_nodes=[],
        metadata = {},
        response_txt="streaming response mock"
    )

    return stream_resp

为了进一步隔离任何问题,你可以使用类似基本的OpenAI API端点来拆分查询-

from openai import OpenAI
client = OpenAI()

def mock_open_ai_response_generator(user_message:str)-> Generator:

    stream_resp = StreamingResponse(
        response_gen=client.chat.completions.create(
    model='gpt-3.5-turbo',
    messages=[
        {'role': 'user', 'content': user_message}
    ],
    temperature=0,
    stream=True
    ),
        source_nodes=[],
        metadata = {},
        response_txt="streaming response mock"
    )

    return stream_resp

此外,如果在index.as_query_engine函数中设置了use_async选项为True,它将返回一个AsyncGenerator,因此你需要模拟这个异步生成器。

相关问题