llama_index [Bug]: Cohere astream_chat() 与 await 一起使用时无法正常工作,

gzjq41n4  于 2个月前  发布在  其他
关注(0)|答案(3)|浏览(31)

Bug描述

Cohere的astream_chat()存在问题,它不能正常工作。
acompletion_with_retry中的代码似乎也无法正常工作。
以下链接中的异步示例也无法正常工作。它给出了与我的应用程序相同的错误。
https://docs.llamaindex.ai/en/stable/examples/llm/cohere/

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
[<ipython-input-19-26c09826c742>](https://v6zl6wjyioo-496ff2e9c6d22116-0-colab.googleusercontent.com/outputframe.html?vrz=colab_20240411-060128_RC00_623759654#) in <cell line: 1>()
----> 1 resp = await llm.astream_complete("Paul Graham is ")

9 frames
[/usr/local/lib/python3.10/dist-packages/llama_index/llms/cohere/utils.py](https://v6zl6wjyioo-496ff2e9c6d22116-0-colab.googleusercontent.com/outputframe.html?vrz=colab_20240411-060128_RC00_623759654#) in _completion_with_retry(**kwargs)
     98         else:
     99             if is_stream:
--> 100                 return await aclient.generate_stream(**kwargs)
    101             else:
    102                 return await aclient.generate(**kwargs)

TypeError: object async_generator can't be used in 'await' expression

版本

最新版本

重现步骤

在以下链接的Async部分运行示例:
https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/docs/examples/llm/cohere.ipynb

相关日志/回溯

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
[<ipython-input-19-26c09826c742>](https://v6zl6wjyioo-496ff2e9c6d22116-0-colab.googleusercontent.com/outputframe.html?vrz=colab_20240411-060128_RC00_623759654#) in <cell line: 1>()
----> 1 resp = await llm.astream_complete("Paul Graham is ")

9 frames
[/usr/local/lib/python3.10/dist-packages/llama_index/llms/cohere/utils.py](https://v6zl6wjyioo-496ff2e9c6d22116-0-colab.googleusercontent.com/outputframe.html?vrz=colab_20240411-060128_RC00_623759654#) in _completion_with_retry(**kwargs)
     98         else:
     99             if is_stream:
--> 100                 return await aclient.generate_stream(**kwargs)
    101             else:
    102                 return await aclient.generate(**kwargs)

TypeError: object async_generator can't be used in 'await' expression
13z8s7eq

13z8s7eq1#

你遇到的错误是由于尝试在返回异步生成器的函数上执行 await 操作,而不是在协程上执行。在Python中,异步生成器被设计为异步地产生一系列结果,应该使用 async for 循环进行迭代,而不是直接等待。

正确从异步生成器中获取值的方法是:

async for message in astream_chat():
    # process each message

如果 astream_chat() 打算直接等待,那么它的实现需要调整为返回一个协程。这种调整包括修改函数以将结果收集到一个列表或其他可以直接等待的结构中。
对于LlamaIndex仓库中的 astream_chat() 特定情况,它被设计为异步生成器。因此,你应该使用 async for 循环遍历其结果。如果需要直接等待此函数,审查其实现以进行调整是必要的。

来源

关于 Dosu

这个响应旨在帮助你节省时间并提高效率。它不是精确的解决方案,而是你自己研究的一个起点。
请帮我变得更有用!如果你觉得这有帮助,请留下一个👍;如果不相关,请留下一个👎。
如果你想继续讨论,请用 @dosu-bot 开始你的回复。

62o28rlo

62o28rlo2#

今天我在尝试使用Cohere运行异步流媒体聊天时遇到了相同的问题。这似乎与上面描述的问题相似。
关于这个问题有什么更新吗?你们@co-antwan(CC @harry-cohere)是否知道这个bug?
谢谢!

rqcrx0a6

rqcrx0a63#

我想要更新这个帖子,因为我已经成功解决了这个问题。
我对 astream_chat()acompletion_with_retry() 函数进行了一些细微的修改,以确保正确处理异步生成器和可等待响应,这似乎解决了问题。以下是更改的简要概述:

llms/cohere/base.py:
...
    @llm_chat_callback()
    async def astream_chat(
            self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponseAsyncGen:
        history = messages_to_cohere_history(messages[:-1])
        prompt = messages[-1].content
        documents = kwargs.pop("documents", None)

        all_kwargs = self._get_all_kwargs(**kwargs)
        all_kwargs["stream"] = True
        if all_kwargs["model"] not in CHAT_MODELS:
            raise ValueError(f"{all_kwargs['model']} not supported for chat")

        async def gen() -> ChatResponseAsyncGen:
            content = ""
            role = MessageRole.ASSISTANT
            async for r in await acompletion_with_retry(
                    aclient=self._get_aclient(),
                    max_retries=self.max_retries,
                    chat=True,
                    message=prompt,
                    chat_history=history,
                    documents=documents,
                    **all_kwargs,
            ):
                if "text" in r.__dict__:
                    content_delta = r.text
                else:
                    content_delta = ""
                content += content_delta
                yield ChatResponse(
                    message=ChatMessage(role=role, content=content),
                    delta=content_delta,
                    raw=r.__dict__,
                )

        return gen()

还对 completion_with_retry() 中的重试逻辑进行了重构,以清晰地分离处理异步生成器(用于流式响应)和常规异步函数(用于即时响应):

async def acompletion_with_retry(
        aclient: Any,
        max_retries: int,
        chat: bool = False,
        **kwargs: Any,
) -> Any:
    """Use tenacity to retry both stream and non-stream API calls, properly handling async generators."""
    retry_decorator = _create_retry_decorator(max_retries=max_retries)
    is_stream = kwargs.pop("stream", False)

    if is_stream:
        @retry_decorator
        async def _stream_with_retry(**kwargs: Any) -> Any:
            # If it's a stream, return the async generator directly
            async for s in (aclient.chat_stream(**kwargs) if chat else aclient.generate_stream(**kwargs)):
                yield s
        return _stream_with_retry(**kwargs)
    else:
        @retry_decorator
        async def _completion_with_retry(**kwargs: Any) -> Any:
            if chat:
                return await aclient.chat(**kwargs)
            else:
                return await aclient.generate(**kwargs)
        return await _completion_with_retry(**kwargs)

相关问题