llama_index [Bug]:QueryFusionRetriever与自定义http_client不支持异步工作

lndjwyie  于 6个月前  发布在  其他
关注(0)|答案(3)|浏览(53)

Bug描述

我不知道这是个bug还是我使用不当。我有一个本地自签名证书,所以我必须修改 http_clientllm 中。后来,如果我使用 QueryFusionRetrieveruse_async=True,我会得到错误

TypeError: Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got <class 'httpx.Client'>

。我不明白为什么 http_client 需要 httpx.AsyncClient 的示例。我使用了 Simple Fusion Retriever 中的示例,我的代码示例如下。我尝试在 OpenAI 对象中调整 http_clientasync_http_client,但都没有用,只有设置 use_async=False 才有效。

版本

0.10.37

重现步骤

from llama_index.core import SimpleDirectoryReader
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
import httpx
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.llms.openai import OpenAI

documents_1 = SimpleDirectoryReader(
    input_files=["<path/to/file1>"]
).load_data()
documents_2 = SimpleDirectoryReader(
    input_files=["<path/to/file2>""]
).load_data()

http_client = httpx.Client(verify="<my/certificate>")
async_http_client = httpx.AsyncClient(verify="<my/certificate>")
llm = OpenAI(
    http_client=http_client,
    # async_http_client=async_http_client,
)
embed_model = OpenAIEmbedding(
    model="text-embedding-3-small",
    dimensions=512,
    http_client=http_client,
)

index_1 = VectorStoreIndex.from_documents(
    documents_1, embed_model=embed_model, show_progress=True
)
index_2 = VectorStoreIndex.from_documents(
    documents_2, embed_model=embed_model, show_progress=True
)

retriever = QueryFusionRetriever(
    [index_1.as_retriever(), index_2.as_retriever()],
    similarity_top_k=2,
    num_queries=4,  # set this to 1 to disable query generation
    use_async=True,
    verbose=True,
    llm=llm,
    # query_gen_prompt="...",  # we could override the query generation prompt here
)

nodes_with_scores = retriever.retrieve("How do I setup a chroma vector store?")

for node in nodes_with_scores:
    print(f"Score: {node.score:.2f} - {node.text[:100]}...")

错误信息如下:

nodes_with_scores = retriever.retrieve("How do I setup a chroma vector store?")
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 274, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\base_retriever.py", line 244, in retrieve
    nodes = self._retrieve(query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\retrievers\fusion_retriever.py", line 261, in _retrieve       
    results = self._run_nested_async_queries(queries)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\retrievers\fusion_retriever.py", line 220, in _run_nested_async_queries
    task_results = run_async_tasks(tasks)
                   ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\async_utils.py", line 66, in run_async_tasks
    outputs: List[Any] = asyncio_run(_gather())
                         ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\async_utils.py", line 30, in asyncio_run
    return loop.run_until_complete(coro)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\asyncio\base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\async_utils.py", line 64, in _gather
    return await asyncio.gather(*tasks_to_execute)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 307, in async_wrapper    
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\base_retriever.py", line 276, in aretrieve
    nodes = await self._aretrieve(query_bundle=query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 307, in async_wrapper    
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 109, in _aretrieve
    embedding = await embed_model.aget_agg_embedding_from_queries(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\embeddings\base.py", line 197, in aget_agg_embedding_from_queries
    query_embeddings = [await self.aget_query_embedding(query) for query in queries]
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\embeddings\base.py", line 197, in <listcomp>
    query_embeddings = [await self.aget_query_embedding(query) for query in queries]
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 307, in async_wrapper    
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\core\base\embeddings\base.py", line 165, in aget_query_embedding   
    query_embedding = await self._aget_query_embedding(query)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\embeddings\openai\base.py", line 393, in _aget_query_embedding     
    aclient = self._get_aclient()
              ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\llama_index\embeddings\openai\base.py", line 364, in _get_aclient
    self._aclient = AsyncOpenAI(**self._get_credential_kwargs())
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\openai\_client.py", line 334, in __init__
    super().__init__(
  File "C:\Users\me\.miniforge\envs\llamaindex\Lib\site-packages\openai\_base_client.py", line 1383, in __init__
    raise TypeError(
TypeError: Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got <class 'httpx.Client'>

相关日志/回溯

  • 无响应*
dgtucam1

dgtucam11#

QueryFusionRetriever 在使用 use_async=True 时需要一个 httpx.AsyncClient 的示例,因为异步操作需要一个异步 HTTP 客户端来处理非阻塞请求。这确保了操作的异步性,允许并发执行而不会阻塞主线程。
要解决在使用自定义 http_client 和自签名证书时出现的 TypeError 问题,您需要确保自定义的 http_clienthttpx.AsyncClient 的示例(如果 use_async=True 为真)。此外,您还应该配置 httpx.AsyncClient 以信任自签名证书。
以下是如何使用自签名证书创建自定义 httpx.AsyncClient 的示例:

import httpx
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI

# Create a custom AsyncClient with a self-signed certificate
custom_async_http_client = httpx.AsyncClient(verify="<my/certificate>")

# Pass the custom AsyncClient to OpenAI and OpenAIEmbedding
llm = OpenAI(
    async_http_client=custom_async_http_client,
)

embed_model = OpenAIEmbedding(
    model="text-embedding-3-small",
    dimensions=512,
    http_client=custom_async_http_client,
)

# Use the rest of your code as is
documents_1 = SimpleDirectoryReader(
    input_files=["<path/to/file1>"]
).load_data()
documents_2 = SimpleDirectoryReader(
    input_files=["<path/to/file2>"]
).load_data()

index_1 = VectorStoreIndex.from_documents(
    documents_1, embed_model=embed_model, show_progress=True
)
index_2 = VectorStoreIndex.from_documents(
    documents_2, embed_model=embed_model, show_progress=True
)

retriever = QueryFusionRetriever(
    [index_1.as_retriever(), index_2.as_retriever()],
    similarity_top_k=2,
    num_queries=4,  # set this to 1 to disable query generation
    use_async=True,
    verbose=True,
    llm=llm,
    # query_gen_prompt="...",  # we could override the query generation prompt here
)

nodes_with_scores = await retriever.retrieve("How do I setup a chroma vector store?")

for node in nodes_with_scores:
    print(f"Score: {node.score:.2f} - {node.text[:100]}...")

在此示例中,verify="<my/certificate>" 用于指定您的自签名证书的路径。请确保仅在安全且受控的环境中使用此方法,因为禁用 SSL 验证可能会使您面临安全风险 1(https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/tests/embeddings/test_azure_openai.py)。

  • 要继续与 Dosu 交谈,请提及 @dosu。*
kmbjn2e3

kmbjn2e32#

你需要将同步和异步自定义客户端传递给llm和嵌入。

你会在你的回溯中看到它指向嵌入。(尽管现在看起来,OpenAIEmbedding类还没有接受异步http客户端,所以必须添加)

nfg76nw0

nfg76nw03#

感谢您的反馈,我已经尝试按照您的建议将同步和异步自定义客户端包含到llm和嵌入中,但我仍然得到相同的错误...
如果您不使用自签名证书,您可以尝试在客户端使用verify=False吗?您是否能得到没有类似错误的结果?
谢谢。

相关问题