llama_index [Bug]: OpenSearch Vector Store在#11513之后存在很多连接问题

dluptydi  于 2个月前  发布在  其他
关注(0)|答案(3)|浏览(28)

错误描述

我注意到在 #11513 之后,我的 OpenSearch 向量存储连接变得不稳定,我们可以通过最近与此问题相关的 bug 报告来确认这一点( #13358 , #11661 等)

  • 此事件循环已经在运行。
  • ConnectionError(Timeout context manager 应该在任务内部使用)导致的:RuntimeError(Timeout context manager 应该在任务内部使用)
  • 等等

我还看到建议使用 asyncio,但即使使用它,我们仍然可能会遇到错误

import nest_asyncio
nest_asyncio.apply()

我还注意到,当我们在 FastAPI 中使用 LlamaIndex 时,这个错误会变得更糟。

版本

最新版本

重现步骤

只需使用 OpenSearch 向量存储足够长的时间,你就会注意到它变得不稳定。

相关日志/回溯

  • 无响应*
ryhaxcpt

ryhaxcpt1#

为了解决在实施 #11513 后 OpenSearch vector store 的连接问题,特别是在使用 FastAPI 和 LlamaIndex 时,你需要确保 FastAPI 上下文中正确管理事件循环。你遇到的错误,如 "This event loop is already running" 和 "ConnectionError(Timeout context manager should be used inside a task) caused by: RuntimeError(Timeout context manager should be used inside a task)",通常与异步操作处理不当有关。
以下是处理事件循环的一种修订方法:

  1. 使用 asyncio.run 管理异步操作的事件循环。
  2. 在执行异步操作时确保事件循环没有运行。
    以下是如何修改代码以解决这些问题的示例:
import asyncio
import logging
import pytest
import uuid
from typing import List, Generator

from llama_index.legacy.schema import NodeRelationship, RelatedNodeInfo, TextNode
from llama_index.legacy.vector_stores.opensearch import (
    OpensearchVectorClient,
    OpensearchVectorStore,
)
from llama_index.legacy.vector_stores.types import VectorStoreQuery

logging.basicConfig(level=logging.DEBUG)

try:
    from opensearchpy import AsyncOpenSearch

    async def check_opensearch():
        os_client = AsyncOpenSearch("localhost:9200")
        await os_client.info()
        await os_client.close()

    asyncio.run(check_opensearch())
    opensearch_not_available = False
except (ImportError, Exception):
    opensearch_not_available = True

@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_connection() -> None:
    assert True

@pytest.fixture()
def index_name() -> str:
    """Return the index name."""
    return f"test_{uuid.uuid4().hex}"

@pytest.fixture()
def os_store(index_name: str) -> Generator[OpensearchVectorStore, None, None]:
    client = OpensearchVectorClient(
        endpoint="localhost:9200",
        index=index_name,
        dim=3,
    )

    yield OpensearchVectorStore(client)

    # teardown step
    # delete index
    asyncio.run(client._os_client.indices.delete(index=index_name))
    # close client aiohttp session
    asyncio.run(client._os_client.close())

@pytest.fixture(scope="session")
def node_embeddings() -> List[TextNode]:
    return [
        TextNode(
            text="lorem ipsum",
            id_="c330d77f-90bd-4c51-9ed2-57d8d693b3b0",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-0")},
            metadata={
                "author": "Stephen King",
                "theme": "Friendship",
            },
            embedding=[1.0, 0.0, 0.0],
        ),
        TextNode(
            text="lorem ipsum",
            id_="c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-1")},
            metadata={
                "director": "Francis Ford Coppola",
                "theme": "Mafia",
            },
            embedding=[0.0, 1.0, 0.0],
        ),
        TextNode(
            text="lorem ipsum",
            id_="c3ew11cd-8fb4-4b8f-b7ea-7fa96038d39d",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-2")},
            metadata={
                "director": "Christopher Nolan",
            },
            embedding=[0.0, 0.0, 1.0],
        ),
        TextNode(
            text="I was taught that the way of progress was neither swift nor easy.",
            id_="0b31ae71-b797-4e88-8495-031371a7752e",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-3")},
            metadata={
                "author": "Marie Curie",
            },
            embedding=[0.0, 0.0, 0.9],
        ),
        TextNode(
            text=(
                "The important thing is not to stop questioning."
                + " Curiosity has its own reason for existing."
            ),
            id_="bd2e080b-159a-4030-acc3-d98afd2ba49b",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-4")},
            metadata={
                "author": "Albert Einstein",
            },
            embedding=[0.0, 0.0, 0.5],
        ),
        TextNode(
            text=(
                "I am no bird; and no net ensnares me;"
                + " I am a free human being with an independent will."
            ),
            id_="f658de3b-8cef-4d1c-8bed-9a263c907251",
            relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-5")},
            metadata={
                "author": "Charlotte Bronte",
            },
            embedding=[0.0, 0.0, 0.3],
        ),
    ]

def count_docs_in_index(os_store: OpensearchVectorStore) -> int:
    """Refresh indices and return the count of documents in the index."""
    asyncio.run(os_store.client._os_client.indices.refresh(index=os_store.client._index))
    count = asyncio.run(os_store.client._os_client.count(index=os_store.client._index))
    return count["count"]

@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_functionality(
    os_store: OpensearchVectorStore, node_embeddings: List[TextNode]
) -> None:
    # add
    assert len(os_store.add(node_embeddings)) == len(node_embeddings)
    # query
    exp_node = node_embeddings[3]
    query = VectorStoreQuery(query_embedding=exp_node.embedding, similarity_top_k=1)
    query_result = os_store.query(query)
    assert query_result.nodes
    assert query_result.nodes[0].get_content() == exp_node.text
    # delete one node using its associated doc_id
    os_store.delete("test-1")
    assert count_docs_in_index(os_store) == len(node_embeddings) - 1

这种方法确保事件循环得到妥善管理,避免了 "This event loop is already running" 和 "ConnectionError(Timeout context manager should be used inside a task) caused by: RuntimeError(Timeout context manager should be used inside a task)" 错误。
如果问题仍然存在,请提供更具体的日志或回溯信息以进一步诊断问题。

详细信息

来源

此响应旨在帮助您节省时间并提高效率。它不是精确的解决方案,而是你自己研究的一个起点。
帮助我变得更有用!如果这有帮助,请留下一个 👍 如果无关紧要,请留下一个 👎。

  • 要继续对话,请提及 @dosu。*
sbtkgmzw

sbtkgmzw2#

在这个矢量存储中支持异步会很好。如果你有任何改进的想法,欢迎提交PR😇

6ie5vjzr

6ie5vjzr3#

我同意异步,但我认为如果这些更改是向后兼容的或者至少是可选的,那就太好了,这样就不会破坏现有的实现。
不幸的是,我对OpenSearch连接或Python异步处理不是很擅长,所以我无法帮助解决这个问题。

相关问题