#### Bug Description
我想在fastapi中创建一个status_checker API端点来跟踪chromadb嵌入的创建。我也喜欢以异步模式创建这些嵌入。以下是代码,但它会报错。请进行必要的修改。
#### Version
llama-index 0.10.12
#### Steps to Reproduce
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.core import SimpleDirectoryReader
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
try:
# Ensure 'docs' directory exists
if not os.path.exists("docs"):
os.makedirs("docs")
# Write the file to server with its original filename
file_path = os.path.join("docs", file.filename)
with open(file_path, "wb") as f:
f.write(await file.read())
from rag_define import define_rag
asyncio.create_task(define_rag())
return JSONResponse(content={"message": "File uploaded successfully"})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress
async def define_rag():
documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data()
if os.path.exists('./chroma_db'):
print("*************************utilizing pre generated embeddings from chromadb folder")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
else:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)
vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model,
storage_context=storage_context)#, use_async=True, show_progress=True)
vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
global_variable.retriever_chunk = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever_chunk},
node_dict=all_nodes_dict,
verbose=True,
)
print("Vector store creation done")
global_variable.upload_in_progress = 1
global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)
#### Relevant Logs/Tracbacks
vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 74, in init
super().init(
File "/home/lib/python3.11/site-packages/llama_index/core/indices/base.py", line 91, in init
index_struct = self.build_index_from_nodes(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 307, in build_index_from_nodes
return self._build_index_from_nodes(nodes, **insert_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 277, in _build_index_from_nodes
run_async_tasks(tasks)
File "/homelib/python3.11/site-packages/llama_index/core/async_utils.py", line 49, in run_async_tasks
outputs: List[Any] = asyncio.run(_gather())
^^^^^^^^^^^^^^^^^^^^^^
File "/homelib/python3.11/asyncio/runners.py", line 186, in run
raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop
/home/miniconda3/envs/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'run_async_tasks.._gather' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'VectorStoreIndex._async_add_nodes_to_index' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
3条答案
按热度按时间mw3dktmi1#
要解决从正在运行的事件循环中调用
asyncio.run()
的问题,并在您的 FastAPI 应用程序中正确创建异步 ChromaDB 嵌入,您应该避免在已经运行的事件循环中使用asyncio.run()
。相反,您可以直接使用await
或使用asyncio.create_task()
来调度协程。以下是如何修改代码以正确处理异步嵌入的示例:
在这个例子中,
define_rag
函数被正确定义为异步函数,而VectorStoreIndex
是通过使用await
关键字创建的,用于处理异步操作。这种方法确保了嵌入是异步且高效地创建,而不会导致与事件循环 1(https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/embeddings/huggingface.py)2(https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/embeddings/llama-index-embeddings-huggingface-api/llama_index/embeddings/huggingface_api/base.py) 相关的运行时错误。7kqas0il2#
请按照@dosu提供的相同代码执行,但仍然出现相同的错误。
请采取必要措施。
jutyujz03#
错误仍然存在,因为
asyncio.run()
正在一个已经运行的事件循环中被调用。要解决这个问题,请确保在运行的事件循环中正确地等待所有异步操作。以下是修正后的
define_rag
函数:这个修改确保了在运行的事件循环中正确地等待异步任务,避免了你遇到的
RuntimeError
错误。