llama_index [Bug]:在后处理器中未正确设置回调管理器,

jhdbpxl9  于 2个月前  发布在  其他
关注(0)|答案(3)|浏览(66)

Bug 描述

看起来大多数节点后处理器在运行时,不会在配置的回调管理器的范围内运行,因此导致跟踪丢失重新排名步骤。.

版本

llama-index==0.10.34

重现步骤

尝试这个简单的示例:

from llama_index.core.query_pipeline import QueryPipeline
from llama_index.core import PromptTemplate
from llama_index.core.postprocessor import LLMRerank
import os
from llama_index.core import (
    StorageContext,
    VectorStoreIndex,
    load_index_from_storage,
)
from llama_index.core.response_synthesizers import TreeSummarize

if not os.path.exists("storage"):
    index = VectorStoreIndex.from_documents(documents)
    # save index to disk
    index.set_index_id("vector_index")
    index.storage_context.persist("./storage")
else:
    # rebuild storage context
    storage_context = StorageContext.from_defaults(persist_dir="storage")
    # load index
    index = load_index_from_storage(storage_context, index_id="vector_index")


# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = LLMRerank()
summarizer = TreeSummarize(llm=llm)
# define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "llm": llm,
        "prompt_tmpl": prompt_tmpl,
        "retriever": retriever,
        "summarizer": summarizer,
        "reranker": reranker,
    }
)
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")

response = p.run(topic="YC")
print(response)

预期结果是在 langfuse(我正在使用的可观察性平台)中记录所有这些元素的跟踪。
但是,收到的跟踪如下:

在查看代码后,我们在组件中初始化后处理器,但只有在Sbert重新排名中我们才使用它,如下所示:

with self.callback_manager.event(
            CBEventType.RERANKING,
            payload={
                EventPayload.NODES: nodes,
                EventPayload.MODEL_NAME: self.model,
                EventPayload.QUERY_STR: query_bundle.query_str,
                EventPayload.TOP_K: self.top_n,
            },
        ) as event:
             # Rest of re-ranking logic

更新本地分支后,我意识到,我们在像CohereReranker这样的新集成中添加了新的重新排名器,但在原生核心重新排名器中除了sbert之外都没有做这个。这显然是一个bug。.
这应该在基本级别处理吗?在 PostprocessorComponent._run_component() 内部?
如果我们在基本组件中处理,那么我想不是每个人都需要在他们的集成中复制相同的代码。.

预期的跟踪应该是:

当使用Cohere Reranker时会出现这种情况。

相关日志/回溯

  • 无响应*
insrf1ej

insrf1ej1#

为了解决后处理器中回调管理器设置不正确导致重新排序步骤缺失跟踪的问题,你应该在所有后处理器中实现一致的模式,类似于SentenceTransformerRerankFlagEmbeddingRerankerCohereRerank类中所做的那样。这涉及到将重新排序逻辑 Package 在一个处理回调事件的上下文管理器中,并利用调度器进行事件处理。
下面是一个简洁的例子,指导你如何调整你的后处理器:

from llama_index.core.callbacks import CBEventType, EventPayload
from llama_index.core.instrumentation import get_dispatcher
from llama_index.core.instrumentation.events.rerank import ReRankEndEvent, ReRankStartEvent
from llama_index.core.postprocessor.types import BaseNodePostprocessor
from llama_index.core.schema import NodeWithScore, QueryBundle

dispatcher = get_dispatcher(__name__)

class YourCustomReranker(BaseNodePostprocessor):
    # Initialization and class setup

    def _postprocess_nodes(self, nodes, query_bundle=None):
        dispatch_event = dispatcher.get_dispatch_event()
        dispatch_event(ReRankStartEvent(query=query_bundle, nodes=nodes, top_n=self.top_n, model_name=self.model))

        with self.callback_manager.event(CBEventType.RERANKING, payload={
            EventPayload.NODES: nodes,
            EventPayload.MODEL_NAME: self.model,
            EventPayload.QUERY_STR: query_bundle.query_str,
            EventPayload.TOP_K: self.top_n,
        }) as event:
            # Re-ranking logic here

            event.on_end(payload={EventPayload.NODES: new_nodes})
        dispatch_event(ReRankEndEvent(nodes=new_nodes))

        return new_nodes

为了确保项目中后处理步骤的全面可见性和可追溯性:

  1. 实现回调管理器和调度器:确保每个后处理器都能访问到一个回调管理器示例和用于事件处理的调度器。
  2. 用事件 Package 核心逻辑:将核心逻辑封装在with self.callback_manager.event(...)上下文管理器和调度器事件(ReRankStartEventReRankEndEvent)中。
  3. 配置事件有效载荷:为调度器事件的方法和参数定制payload,以包含相关信息。
  4. 处理事件完成:使用event.on_end(payload={...})和调度器来指示重新排序过程的完成。
    这种方法将确保包括那些最初未设计用于SBERT重新排序的所有后处理器都在配置的回调管理器和调度器的范围内执行,从而允许在跟踪中包含重新排序步骤。
详细信息

来源

这个响应旨在帮助你节省时间并提高效率。它不是精确的解决方案,而是你自己研究的一个起点。
请帮助我变得更有用!如果这个有帮助,请留下一个👍;如果无关紧要,请留下一个👎。

  • 要继续对话,请提及@dosu。*
zte4gxcn

zte4gxcn2#

我的建议基本上是将

def _run_component(self, **kwargs: Any) -> Any:
        """Run component."""
        output = self.postprocessor.postprocess_nodes(
            kwargs["nodes"], query_str=kwargs.get("query_str", None)
        )
        return {"nodes": output}

更改为

def _run_component(self, **kwargs: Any) -> Any:
        """Run component."""
        from llama_index.core.callbacks import CBEventType, EventPayload
        with self.postprocessor.callback_manager.event(
            CBEventType.RERANKING,
            payload={
                EventPayload.NODES: kwargs["nodes"],
                EventPayload.MODEL_NAME: kwargs.get("model", None),
                EventPayload.QUERY_STR: kwargs.get("query_str", None),
                EventPayload.TOP_K: kwargs.get("top_n", kwargs.get("top_k", None)),
            },
        ) as event:
            output = self.postprocessor.postprocess_nodes(
                kwargs["nodes"], query_str=kwargs.get("query_str", None)
            )
            event.on_end(payload={EventPayload.NODES: output})
        return {"nodes": output}

,但在尝试时我意识到 model 参数和 top_k 可能对每个积分非常具体,即不同的名称或其他东西?我提供的方法是一种“hacky”的方式。希望听到您对此是否有更好的方法的想法。

jecbmhm3

jecbmhm33#

要有效地确保所有节点后处理器在配置的回调管理器的范围内运行,同时容纳集成特定的参数,如 modeltop_k ,你可以在 PostprocessorComponent 中细化 _run_component 方法。以下是一个保持灵活性以处理其他集成特定参数的方法:

def _run_component(self, **kwargs: Any) -> Any:
    """Run component with callback manager scope and support for integration-specific arguments."""
    with self.postprocessor.callback_manager:
        # Pass all kwargs to postprocess_nodes, including 'model' and 'top_k'
        output = self.postprocessor.postprocess_nodes(
            kwargs["nodes"], query_str=kwargs.get("query_str", None), **kwargs
        )
    return {"nodes": output}

此修改确保 postprocess_nodes 在回调管理器内执行,对于在重新排序步骤中有效捕获跟踪至关重要。通过将 **kwargs 传递给 postprocess_nodes ,该方法仍然具有处理其他参数的灵活性,而无需对其接口进行直接修改。请确保你的 postprocess_nodes 方法在 BaseNodePostprocessor 实现中已准备好处理这些额外的参数。这种方法提供了一种结构化的方式,将所有必要的操作包含在回调管理器的范围内,解决了重新排序步骤中缺失跟踪的问题。

详细信息

来源

此响应旨在帮助你节省时间并提高效率。它不是精确的解决方案,而是你自己研究的一个起点。
帮助我变得更有用!如果这有帮助,请留下一个👍,如果不相关,请留下一个👎。

  • 要继续对话,请提及 @dosu。*

相关问题