llama_index [Bug]:无法创建异步chromadb,

wecizke3  于 2个月前  发布在  其他
关注(0)|答案(3)|浏览(37)

#### Bug Description

我想在fastapi中创建一个status_checker API端点来跟踪chromadb嵌入的创建。我也喜欢以异步模式创建这些嵌入。以下是代码,但它会报错。请进行必要的修改。

#### Version

llama-index 0.10.12

#### Steps to Reproduce

import chromadb
 from llama_index.vector_stores.chroma import ChromaVectorStore
 from llama_index.core import VectorStoreIndex, StorageContext
 from llama_index.core import SimpleDirectoryReader
@app.post("/upload")
 async def upload_file(file: UploadFile = File(...)):

try:

# Ensure 'docs' directory exists
if not os.path.exists("docs"):
    os.makedirs("docs")

# Write the file to server with its original filename
file_path = os.path.join("docs", file.filename)
with open(file_path, "wb") as f:
    f.write(await file.read())
from rag_define import define_rag
asyncio.create_task(define_rag())

return JSONResponse(content={"message": "File uploaded successfully"})

except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)


@app.post("/status")
 async def status_checker():
 return global_variable.upload_in_progress
async def define_rag():
 documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data()
 if os.path.exists('./chroma_db'):
 print("*************************utilizing pre generated embeddings from chromadb folder")
 chroma_client = chromadb.PersistentClient(path="./chroma_db")
 chroma_collection = chroma_client.get_or_create_collection("quickstart")
 vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
 vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
 else:
 chroma_client = chromadb.PersistentClient(path="./chroma_db")
 chroma_collection = chroma_client.get_or_create_collection("quickstart")
 vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
 storage_context = StorageContext.from_defaults(vector_store=vector_store)
 # index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)
 vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model,
 storage_context=storage_context)#, use_async=True, show_progress=True)
vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
 global_variable.retriever_chunk = RecursiveRetriever(
 "vector",
 retriever_dict={"vector": vector_retriever_chunk},
 node_dict=all_nodes_dict,
 verbose=True,
 )
 print("Vector store creation done")
 global_variable.upload_in_progress = 1
 global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

#### Relevant Logs/Tracbacks

vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 74, in init
super().init(
File "/home/lib/python3.11/site-packages/llama_index/core/indices/base.py", line 91, in init
index_struct = self.build_index_from_nodes(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 307, in build_index_from_nodes
return self._build_index_from_nodes(nodes, **insert_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 277, in _build_index_from_nodes
run_async_tasks(tasks)
File "/homelib/python3.11/site-packages/llama_index/core/async_utils.py", line 49, in run_async_tasks
outputs: List[Any] = asyncio.run(_gather())
^^^^^^^^^^^^^^^^^^^^^^
File "/homelib/python3.11/asyncio/runners.py", line 186, in run
raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop
/home/miniconda3/envs/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'run_async_tasks.._gather' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'VectorStoreIndex._async_add_nodes_to_index' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

mw3dktmi

mw3dktmi1#

要解决从正在运行的事件循环中调用 asyncio.run() 的问题,并在您的 FastAPI 应用程序中正确创建异步 ChromaDB 嵌入,您应该避免在已经运行的事件循环中使用 asyncio.run()。相反,您可以直接使用 await 或使用 asyncio.create_task() 来调度协程。
以下是如何修改代码以正确处理异步嵌入的示例:
在这个例子中,define_rag 函数被正确定义为异步函数,而 VectorStoreIndex 是通过使用 await 关键字创建的,用于处理异步操作。这种方法确保了嵌入是异步且高效地创建,而不会导致与事件循环 1(https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/embeddings/huggingface.py)2(https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/embeddings/llama-index-embeddings-huggingface-api/llama_index/embeddings/huggingface_api/base.py) 相关的运行时错误。

  • 要继续与 Dosu 交谈,请提及 @dosu。*
7kqas0il

7kqas0il2#

请按照@dosu提供的相同代码执行,但仍然出现相同的错误。
请采取必要措施。

jutyujz0

jutyujz03#

错误仍然存在,因为asyncio.run()正在一个已经运行的事件循环中被调用。要解决这个问题,请确保在运行的事件循环中正确地等待所有异步操作。
以下是修正后的define_rag函数:

async def define_rag():
    documents = SimpleDirectoryReader(input_dir="./docs", required_exts=[".docx", ".doc", ".pdf", ".txt"]).load_data()
    if os.path.exists('./chroma_db'):
        print("*************************utilizing pre generated embeddings from chromadb folder")
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
    else:
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        vector_index_chunk = await VectorStoreIndex.from_documents(documents, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)

    vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
    global_variable.retriever_chunk = RecursiveRetriever(
        "vector",
        retriever_dict={"vector": vector_retriever_chunk},
        node_dict={node.node_id: node for node in documents},
        verbose=True,
    )
    print("Vector store creation done")
    global_variable.upload_in_progress = 1
    global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

这个修改确保了在运行的事件循环中正确地等待异步任务,避免了你遇到的RuntimeError错误。

  • 要继续与Dosu交流,请提及@dosu。*

相关问题