llama_index [功能请求]:使用两个Supabase VectorStoreIndex的AutoMergingRetriever

aamkag61  于 2个月前  发布在  其他
关注(0)|答案(7)|浏览(29)

功能描述

由于supabase作为docStore不受支持,我尝试使用两个VectorStoreIndex实现一个AutoMergingRetriever。所做的更改如下:
SupabaseVectorStore添加了get_document,使用:

def get_document(self, doc_id: str, **kwargs: Any) -> Any:
        """Get document by doc id.

Args:
doc_id (str): document id
"""
        filters = {"doc_id": {"$eq": doc_id}}

        results = self._collection.query(
            data=None,
            filters=filters,
            include_value=True,
            include_metadata=True,
            **kwargs,
        )

        if len(results) > 0:
            id_, distance, metadata = results[0]
            text = metadata.pop("text", None)

            try:
                node = metadata_dict_to_node(metadata)
            except Exception:
                # NOTE: deprecated legacy logic for backward compatibility
                metadata, node_info, relationships = legacy_metadata_dict_to_node(
                    metadata
                )
                node = TextNode(
                    id_=id_,
                    text=text,
                    metadata=metadata,
                    start_char_idx=node_info.get("start", None),
                    end_char_idx=node_info.get("end", None),
                    relationships=relationships,
                )

            return node

        return None

更改了auto_merging_retriever.py中的两行:
在函数_get_parents_and_merge中:

parent_node = self._storage_context.vector_store.get_document(
                    parent_node_id
                )

以及:
在函数_fill_in_nodes中:

next_node = self._storage_context.vector_store.get_document(
                    cur_node.next_node.node_id
                )

定义了一个自定义检索器:

from typing import Dict, List, Optional, Tuple

from llama_index.core.indices.query.schema import QueryBundle
from llama_index.core.indices.utils import truncate_text
from llama_index.core.indices.vector_store.retrievers.retriever import (
    VectorIndexRetriever,
)
from llama_index.core.schema import BaseNode, IndexNode, NodeWithScore
from typing import Dict, List, Optional, Tuple, cast

from llama_index.core.callbacks.base import CallbackManager
from llama_index.core.indices.query.schema import QueryBundle
from llama_index.core.indices.utils import truncate_text
from llama_index.core.indices.vector_store.retrievers.retriever import (
    VectorIndexRetriever,
)
from llama_index.core.schema import BaseNode, IndexNode, NodeWithScore, QueryBundle
from llama_index.core.storage.storage_context import StorageContext

class MyRetriever(AutoMergingRetriever):
    def __init__(
        self,
        node_vector_retriever: VectorIndexRetriever,
        leaf_node_vector_retriever: VectorIndexRetriever,
        storage_context: StorageContext,
        simple_ratio_thresh: float = 0.5,
        verbose: bool = False,
        callback_manager: Optional[CallbackManager] = None,
        object_map: Optional[dict] = None,
        objects: Optional[List[IndexNode]] = None,
    ) -> None:
        super().__init__(
            vector_retriever=node_vector_retriever,
            storage_context=storage_context,
            simple_ratio_thresh=simple_ratio_thresh,
            verbose=verbose,
            callback_manager=callback_manager,
            object_map=object_map,
            objects=objects,
        )
        self._leaf_node_vector_retriever = leaf_node_vector_retriever

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        initial_nodes = self._vector_retriever.retrieve(query_bundle)
        initial_leaf_nodes = self._leaf_node_vector_retriever.retrieve(query_bundle)

        # Merge the initial nodes and leaf nodes
        initial_nodes.extend(initial_leaf_nodes)

        cur_nodes, is_changed = self._try_merging(initial_nodes)
        while is_changed:
            cur_nodes, is_changed = self._try_merging(cur_nodes)

        # sort by similarity
        cur_nodes.sort(key=lambda x: x.get_score(), reverse=True)

        return cur_nodes

使用方法:

from llama_index.core.retrievers import AutoMergingRetriever
from llama_index.core import VectorStoreIndex 
from llama_index.core.node_parser import get_leaf_nodes, get_root_nodes
 # Some more imports here

parent_vector_store = SupabaseVectorStore(
    postgres_connection_string=DB_CONNECTION,
    collection_name='parent_nodes',
    dimension=768
)
doc_storage_context = StorageContext.from_defaults(vector_store=parent_vector_store)
doc_index = VectorStoreIndex(nodes=nodes, storage_context=doc_storage_context)

leaf_nodes = get_leaf_nodes(nodes)

leaf_vector_store = SupabaseVectorStore(
    postgres_connection_string=DB_CONNECTION,
    collection_name='leaf_nodes',
    dimension=768
)
leaf_storage_context = StorageContext.from_defaults(vector_store=leaf_vector_store)
leaf_index = VectorStoreIndex(nodes=leaf_nodes, storage_context=leaf_storage_context)
leaf_retriever = leaf_index.as_retriever(similarity_top_k=6)
base_retriever = doc_index.as_retriever(similarity_top_k=6)
retriever = AutoMergingRetriever(leaf_retriever, storage_context=doc_storage_context, verbose=True)

my_retriever = MyRetriever(node_vector_retriever=base_retriever, leaf_node_vector_retriever=leaf_retriever, storage_context=doc_storage_context, verbose=True)

query_str = "What did the author do growing up?"
my_retrived_nodes = my_retriever.retrieve(query_str)

原因

使用Supabase的AutoMergingRetriever管道

功能价值

完整的Supabase RAG管道,可作为任何准备好供消费的前端框架的后端使用。

kupeojn6

kupeojn61#

  • 🤖*

Hey @rahulkrprajapati,很高兴看到你回来!希望你过得很好。让我们一起探讨你遇到的这个有趣的问题。
根据你提供的信息,似乎递归调用是由于 _retrieve 方法在 AutoMergingRetriever 类中发生的。这个方法获取初始节点并尝试合并它们。如果在合并过程中发生任何变化,代码将进入一个 while 循环,在该循环中继续尝试合并当前节点,直到没有更多变化发生。
递归发生是因为在 while 循环内部调用的 _try_merging 方法可能会将 is_changed 变量再次更改为 True,导致循环无限期地继续。这可能发生在 _get_parents_and_merge_fill_in_nodes 方法的条件不断满足的情况下。
要解决这个问题,你可以在一定迭代次数后或当节点没有发生显著变化时添加一个条件来跳出循环。以下是如何实现这一点的示例:

MAX_ITERATIONS = 10  # Set a maximum number of iterations

def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
    """Retrieve nodes given query.

Implemented by the user.
"""
    initial_nodes = self._vector_retriever.retrieve(query_bundle)

    cur_nodes, is_changed = self._try_merging(initial_nodes)
    iterations = 0
    while is_changed and iterations < MAX_ITERATIONS:
        prev_nodes = cur_nodes.copy()
        cur_nodes, is_changed = self._try_merging(cur_nodes)
        iterations += 1
        # Break the loop if no significant changes are made to the nodes
        if cur_nodes == prev_nodes:
            break

    # sort by similarity
    cur_nodes.sort(key=lambda x: x.get_score(), reverse=True)

    return cur_nodes

在此代码中,while 循环将在 MAX_ITERATIONS 次迭代后或当前节点与前一个节点相同时中断,表示没有对节点进行重大更改。这应该防止无限递归问题。
至于 SupabaseVectorStore 类中的 get_document 方法,我在 LlamaIndex 仓库中找不到关于其实现的具体信息。有可能这个方法是使用文档ID从向量存储中检索文档,但没有更多的信息或上下文,很难确定。如果你能提供有关此方法及其在你代码中的用法的更多详细信息,我可能会给出更准确的答案。
希望这有所帮助!如果你还有其他问题或需要进一步澄清,请随时提问。

来源

关于 Dosu

此响应旨在节省你的时间并对你有所帮助。它不是精确解决方案,而是你自己研究的起点。
帮助我变得更有用!如果这有帮助请留下一个👍,如果无关请留下一个👎。
如果你想继续对话,请以 @dosu-bot 开头回复。

w6mmgewl

w6mmgewl2#

有一个小修正:

while is_changed and iterations < self.MAX_ITERATIONS:
        prev_nodes = cur_nodes.copy()
        cur_nodes, is_changed = self._try_merging(cur_nodes)
        iterations += 1
        # Break the loop if no significant changes are made to the nodes
        if cur_nodes == prev_nodes:
            break
8iwquhpp

8iwquhpp3#

嗯。尝试想出一个方法,使这种更改在代码库中普遍适用。
也许如果 get_document() 被实现在基本向量存储类中,那么可以更新类型以满足 mypy,当这个函数被使用时

2guxujil

2guxujil4#

关于这个问题的悲伤之处在于,我们需要投入大量精力来实现这种方法(对于大多数矢量数据库)。

0mkxixxg

0mkxixxg5#

我很想帮忙。
我认为对于文档存储只有三个选项真是令人沮丧😔。我希望能有更多的选项可供选择。鉴于我的使用场景,Supabase似乎是一个不错的替代方案。如果我能让它适用于更多的矢量数据库,那就太棒了。

mbyulnm0

mbyulnm06#

@rahulkrprajapati 完全同意,希望有更多的docstore选项(目前我们有简单、Redis、MongoDB、Postgres、Firestore和Dynamodb)。

mtb9vblg

mtb9vblg7#

愿意提供帮助。与qdrant合作并卡在这个点上。

相关问题