llama_index [Bug]:在管道UPSERTS摄取过程中,不必要的昂贵操作

vu8f3i0k  于 2个月前  发布在  其他
关注(0)|答案(1)|浏览(42)

Bug描述

self.docstore.get_all_document_hashes() 方法在 _handle_upserts 方法中非常耗时。
但它的返回值仅在 UPSERTS_AND_DELETE 情况下使用。
llama_index/llama-index-core/llama_index/core/ingestion/pipeline.py
第429行:| existing_doc_ids_before=set(self.docstore.get_all_document_hashes().values()) |

版本

0.10.50.post1

重现步骤

抱歉,我不能在这里分享代码。
但基本上遵循以下步骤:

  1. 使用 doc_store、cache 和 vector_store 创建一个 Ingestion Pipeline,并使用 UPSERTS 策略
  2. 准备大量文档,在我的情况下为 1.5k+
  3. 使用 pipeline run 加载所有文档
  4. 获取一个新文档,并仅使用 pipeline run 加载新文档。
    由于 doc_store 中已经有 1.5k+ 个文档,因此 get_all_document_hashes 方法将根据 doc store 后端变得非常慢或非常昂贵。
    在我的情况下,我使用了 DynamoDB doc_store 和 S3KVStore 支持的 KVDocStore:
    在 DynamoDB 的情况下,它需要进行全表扫描
    在 S3 后端的情况下,它需要7分钟才能获取所有哈希值。
    但这是完全不必要的,因为在 UPSERT 操作期间,我们不需要获取所有文档。
    为了解决这个问题,我实现了自己的 doc_store 类,该类从 get_all_document_hashes 方法返回空字典,整个管道可以正常工作且性能更好。
ggazkfy8

ggazkfy81#

嘿@headyin!我在这里帮助你解决任何bug、问题或贡献。让我们一起解决这个问题!
为了解决_handle_upserts方法中UPSERTS策略期间昂贵的self.docstore.get_all_document_hashes()调用问题,你可以优化函数,只在每个节点的基础上获取必要的文档哈希值。这可以避免预先获取所有文档哈希值,后端如DynamoDB和S3KVStore可能会特别慢。
以下是_handle_upserts函数的优化版本:

async def _ahandle_upserts(
    self,
    nodes: List[BaseNode],
    store_doc_text: bool = True,
) -> List[BaseNode]:
    """Handle docstore upserts by checking hashes and ids."""
    assert self.docstore is not None

    doc_ids_from_nodes = set()
    deduped_nodes_to_run = {}
    for node in nodes:
        ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
        doc_ids_from_nodes.add(ref_doc_id)
        existing_hash = await self.docstore.aget_document_hash(ref_doc_id)
        if not existing_hash:
            # document doesn't exist, so add it
            await self.docstore.aset_document_hash(ref_doc_id, node.hash)
            deduped_nodes_to_run[ref_doc_id] = node
        elif existing_hash and existing_hash != node.hash:
            await self.docstore.adelete_ref_doc(ref_doc_id, raise_error=False)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

            await self.docstore.aset_document_hash(ref_doc_id, node.hash)

            deduped_nodes_to_run[ref_doc_id] = node
        else:
            continue  # document exists and is unchanged, so skip it

    if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
        # Identify missing docs and delete them from docstore and vector store
        existing_doc_ids_before = set((await self.docstore.aget_all_document_hashes()).values())
        doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
        for ref_doc_id in doc_ids_to_delete:
            await self.docstore.adelete_document(ref_doc_id)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

    nodes_to_run = list(deduped_nodes_to_run.values())
    await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)

    return nodes_to_run

这种方法确保只为每个节点获取必要的文档哈希值,从而避免了昂贵的self.docstore.get_all_document_hashes()1(https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/ingestion/pipeline.py)2(https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/ingestion/pipeline.py)调用。

  • 要继续与Dosu交流,请提及@dosu。*

相关问题