haystack Pipeline循环在BranchJoiner接收多个输入时失败

z18hc3ub  于 3个月前  发布在  其他
关注(0)|答案(2)|浏览(56)

Haystack中的Pipeline循环在prompt_concatenator_after_observation组件(参见附带的管道图)回传到ChatMessage列表时,会失败。BranchJoiner会因为以下错误信息而失败:

File "/Users/vblagoje/workspace/haystack/haystack/core/pipeline/pipeline.py", line 76, in _run_component
    res: Dict[str, Any] = instance.run(**inputs)
  File "/Users/vblagoje/workspace/haystack/haystack/components/joiners/branch.py", line 140, in run
    raise ValueError(f"BranchJoiner expects only one input, but {inputs_count} were received.")
ValueError: BranchJoiner expects only one input, but 2 were received.

这个问题似乎源于BranchJoiner同时接收初始输入和回传的输入,违反了其单一输入的前提条件。
重现步骤:

import os
from typing import List, Optional, Dict, Any
import re
from haystack.dataclasses import ChatMessage

from haystack import Document, component
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.converters import OutputAdapter
from haystack.components.routers import ConditionalRouter
from haystack.components.joiners import BranchJoiner
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.websearch import SerperDevWebSearch
from haystack import Pipeline
from haystack.utils import Secret

os.environ["OPENAI_API_KEY"] = "some-fake-key-replace-with-real-if-you-need-to-use-it"

def find_last_action(chat_messages: List[ChatMessage]):
    prompt: str = chat_messages[-1].content
    lines = prompt.strip().split('\n')
    for line in reversed(lines):
        pattern = r'Action:\s*(\w+)\[(.*?)\]'

        match = re.search(pattern, line)
        if match:
            action_name = match.group(1)
            parameter = match.group(2)
            return [action_name, parameter]
    return [None, None]

def concat_prompt(last_message: ChatMessage, current_prompt: List[ChatMessage], append: str):
    return [ChatMessage.from_user(current_prompt[-1].content + last_message.content + append)]

search_message_template = """
Given these web search results:

{% for doc in documents %}
    {{ doc.content }}
{% endfor %}

Be as brief as possible, max one sentence. 
Answer the question: {{search_query}}
"""

react_message_template = """
Solve a question answering task with interleaving Thought, Action, Observation steps.

Thought reasons about the current situation

Action can be:
google_search - Searches Google for the exact concept/entity (given in square brackets) and returns the results for you to use
finish - Returns the final answer (given in square brackets) and finishes the task

Observation sumarizes the Action outcome and helps in formulating the next
Thought in Thought, Action, Observation interleaving triplet of steps.

After each Observation, provide the next Thought and next Action.
Don't execute multiple steps even though you know the answer.
Only generate Thought and Action, never Observation, you'll get Observation from Action.
Follow the pattern in the example below.

Example:
###########################
Question: Which magazine was started first Arthur’s Magazine or First for Women?
Thought: I need to search Arthur’s Magazine and First for Women, and find which was started
first.
Action: google_search[When was 'Arthur’s Magazine' started?]
Observation: Arthur’s Magazine was an American literary periodical ˘
published in Philadelphia and founded in 1844. Edited by Timothy Shay Arthur, it featured work by
Edgar A. Poe, J.H. Ingraham, Sarah Josepha Hale, Thomas G. Spear, and others. In May 1846
it was merged into Godey’s Lady’s Book.
Thought: Arthur’s Magazine was started in 1844. I need to search First for Women founding date next
Action: google_search[When was 'First for Women' magazine started?]
Observation: First for Women is a woman’s magazine published by Bauer Media Group in the
USA. The magazine was started in 1989. It is based in Englewood Cliffs, New Jersey. In 2011
the circulation of the magazine was 1,310,696 copies.
Thought: First for Women was started in 1989. 1844 (Arthur’s Magazine) ¡ 1989 (First for
Women), so Arthur’s Magazine was started first.
Action: finish[Arthur’s Magazine]
############################

Let's start, the question is: {{query}}

Thought:
"""

routes = [
    {
        "condition": "{{'search' in tool_id_and_param[0]}}",
        "output": "{{tool_id_and_param[1]}}",
        "output_name": "search",
        "output_type": str,
    },
    {
        "condition": "{{'finish' in tool_id_and_param[0]}}",
        "output": "{{tool_id_and_param[1]}}",
        "output_name": "finish",
        "output_type": str,
    }
]

@component
class NoOp:
    @component.output_types(output=str)
    def run(self, query: str):
        return {"output": query}

class FakeThoughtActionOpenAIChatGenerator(OpenAIChatGenerator):

    @component.output_types(replies=List[ChatMessage])
    def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
        return {"replies": [ChatMessage.from_assistant("Thought: thinking\n Action: google_search[not important]\n")]}

class FakeConclusionOpenAIChatGenerator(OpenAIChatGenerator):

    @component.output_types(replies=List[ChatMessage])
    def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
        return {"replies": [ChatMessage.from_assistant("Tower of Pisa is 55 meters tall\n")]}

class FakeSerperDevWebSearch(SerperDevWebSearch):

    @component.output_types(documents=List[Document])
    def run(self, query: str):
        return {"documents": [Document(content="Eiffel Tower is 300 meters tall"),
                              Document(content="Tower of Pisa is 55 meters tall")]}

# main part
pipeline = Pipeline()
pipeline.add_component("main_input", BranchJoiner(List[ChatMessage]))
pipeline.add_component("prompt_builder", DynamicChatPromptBuilder(runtime_variables=["query"]))
pipeline.add_component("llm", FakeThoughtActionOpenAIChatGenerator(generation_kwargs={"stop": "Observation:"}))
pipeline.add_component("noop", NoOp())

# tools
pipeline.add_component("tool_extractor", OutputAdapter("{{messages | find_action}}",
                                                       output_type=List[str],
                                                       custom_filters={"find_action": find_last_action}))

pipeline.add_component("prompt_concatenator_after_action",
                       OutputAdapter("{{replies[-1] | concat_prompt(current_prompt,'')}}",
                                     output_type=List[ChatMessage],
                                     custom_filters={"concat_prompt": concat_prompt}))

pipeline.add_component("router", ConditionalRouter(routes))
pipeline.add_component("router_search",
                       FakeSerperDevWebSearch(api_key=Secret.from_token("some_fake_api_key")))
pipeline.add_component("search_prompt_builder",
                       DynamicChatPromptBuilder(runtime_variables=["documents", "search_query"]))
pipeline.add_component("search_llm", FakeConclusionOpenAIChatGenerator())
pipeline.add_component("router_finish", OutputAdapter("{{final_answer | format_final_answer}}",
                                                      output_type=str,
                                                      custom_filters={"format_final_answer": lambda x: x}))

pipeline.add_component("search_output_adapter", OutputAdapter("{{search_replies | format_observation}}",
                                                              output_type=List[ChatMessage],
                                                              custom_filters={"format_observation": lambda x: [
                                                                  ChatMessage.from_assistant(
                                                                      "Observation: " + x[-1].content + "\n")]}))

pipeline.add_component("prompt_concatenator_after_observation",
                       OutputAdapter("{{replies[-1] | concat_prompt(current_prompt, '\nThought:')}}",
                                     output_type=List[ChatMessage],
                                     custom_filters={"concat_prompt": concat_prompt}))

# main
pipeline.connect("main_input", "prompt_builder.prompt_source")
pipeline.connect("noop", "prompt_builder.query")
pipeline.connect("prompt_builder.prompt", "llm.messages")
pipeline.connect("llm.replies", "prompt_concatenator_after_action.replies")

# tools
pipeline.connect("prompt_builder.prompt", "prompt_concatenator_after_action.current_prompt")
pipeline.connect("prompt_concatenator_after_action", "tool_extractor.messages")

pipeline.connect("tool_extractor", "router")
pipeline.connect("router.search", "router_search.query")
pipeline.connect("router_search.documents", "search_prompt_builder.documents")
pipeline.connect("router.search", "search_prompt_builder.search_query")
pipeline.connect("search_prompt_builder.prompt", "search_llm.messages")
pipeline.connect("router.finish", "router_finish")

pipeline.connect("search_llm.replies", "search_output_adapter.search_replies")
pipeline.connect("search_output_adapter", "prompt_concatenator_after_observation.replies")
pipeline.connect("prompt_concatenator_after_action", "prompt_concatenator_after_observation.current_prompt")
pipeline.connect("prompt_concatenator_after_observation", "main_input")

search_message = [ChatMessage.from_user(search_message_template)]
messages = [ChatMessage.from_user(react_message_template)]
question = "which tower is taller: eiffel tower or tower of pisa?"
res = pipeline.run(data={"main_input": {"value": messages},
                         "noop": {"query": question},
                         "search_prompt_builder": {"prompt_source": search_message}})

print(res)

预期行为:
管道应该正确处理循环,允许BranchJoiner顺序处理循环输入,而不是同时处理。
实际行为:
当循环反馈到BranchJoiner时,管道会失败,导致它一次接收多个输入,从而引发上述异常。

ijnw1ujt

ijnw1ujt1#

你好,@vblagoje,这个错误是否与这个问题有关:#7740?

fhity93d

fhity93d2#

@mrm1001 是的 - 我们不能进行任何React剂环路,直到这个问题得到解决。

相关问题