langflow 错误:尝试使用条件路由器分支会话时出错

ldioqlga  于 2个月前  发布在  其他
关注(0)|答案(8)|浏览(42)

我正在尝试实现一个 supervisor 模式,其中语言模型根据到目前为止的对话对当前消息进行分类。我可以在UI中完美地使用条件路由器组件来实现这一点,但是当我尝试使用API中的流程时,它会抛出一个错误。
我认为这是由于流程中需要两个单独的聊天输出来促进对话中的分支。以下是一个示例流程,用于演示问题。我目前正在使用 main 的最新更改进行测试。
dont-talk-about-bruno.json

❯ curl -X POST \
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "message",
"output_type": "chat",
"input_type": "chat",
"tweaks": {
"ChatInput-Ma4Jp": {},
"OpenAIModel-MdB0V": {},
"OpenAIModel-NJ8p6": {},
"Prompt-DfMtR": {},
"ConditionalRouter-j94T7": {},
"Pass-N8MXP": {},
"ChatOutput-TIge3": {},
"ChatOutput-Z6DoM": {},
"OpenAIModel-Naaf3": {},
"Pass-hbQT6": {},
"Prompt-PE0iF": {},
"Prompt-uCIqY": {},
"Memory-oi31T": {}
}}'

{"detail":"The message must be an iterator or an async iterator."}
tvokkenx

tvokkenx1#

嘿,@Nitewriter!我在这里帮助你解决任何bug、问题或贡献。让我们一起解决这个问题!
要解决在使用API中的ConditionalRouter组件时出现的错误“消息必须是迭代器或异步迭代器”,你需要确保INPUT_FIELD_NAME参数设置为迭代器或异步迭代器。这是流正确处理消息所必需的。
以下是如何解决此问题的方法:

  1. 确保INPUT_FIELD_NAME是迭代器或异步迭代器:
    确保你的流程中的INPUT_FIELD_NAME参数设置为迭代器或异步迭代器。以下是如何正确设置此参数的示例:
from typing import AsyncIterator

async def example_async_iterator() -> AsyncIterator[str]:
    for i in range(5):
        yield f"Message {i}"

# Assuming `vertex` is an instance of the class containing the `stream` method
vertex.params[INPUT_FIELD_NAME] = example_async_iterator()
await vertex.stream()
  1. 检查stream方法的实现:
    确保顶点类中的stream方法正确实现以处理迭代器或异步迭代器。以下是一个相关的代码片段,用于检查此条件:
async def stream(self):
    iterator = self.params.get(INPUT_FIELD_NAME, None)
    if not isinstance(iterator, (AsyncIterator, Iterator)):
        raise ValueError("The message must be an iterator or an async iterator.")
    ...
  1. Langflow中处理流的示例:
    在Langflow的上下文中,src/backend/base/langflow/api/v1/chat.py中的build_vertex_stream函数演示了如何为顶点处理流。以下是一个摘录:
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
    flow_id: uuid.UUID,
    vertex_id: str,
    session_id: Optional[str] = None,
    chat_service: "ChatService" = Depends(get_chat_service),
    session_service: "SessionService" = Depends(get_session_service),
):
    try:
        flow_id_str = str(flow_id)

        async def stream_vertex():
            try:
                cache = await chat_service.get_cache(flow_id_str)
                if not cache:
                    raise ValueError(f"No cache found for {flow_id_str}.")
                else:
                    graph = cache.get("result")

                vertex: "InterfaceVertex" = graph.get_vertex(vertex_id)
                if not hasattr(vertex, "stream"):
                    raise ValueError(f"Vertex {vertex_id} does not support streaming")
                if isinstance(vertex._built_result, str) and vertex._built_result:
                    stream_data = StreamData(
                        event="message",
                        data={"message": f"Streaming vertex {vertex_id}"},
                    )
                    yield str(stream_data)
                    stream_data = StreamData(
                        event="message",
                        data={"chunk": vertex._built_result},
                    )
                    yield str(stream_data)
                elif not vertex.frozen or not vertex._built:
                    logger.debug(f"Streaming vertex {vertex_id}")
                    stream_data = StreamData(
                        event="message",
                        data={"message": f"Streaming vertex {vertex_id}"},
                    )
                    yield str(stream_data)
                    async for chunk in vertex.stream():
                        stream_data = StreamData(
                            event="message",
                            data={"chunk": chunk},
                        )
                        yield str(stream_data)
                elif vertex.result is not None:
                    stream_data = StreamData(
                        event="message",
                        data={"chunk": vertex._built_result},
                    )
                    yield str(stream_data)
                else:
                    raise ValueError(f"No result found for vertex {vertex_id}")

            except Exception as exc:
                logger.exception(f"Error building Component: {exc}")
                exc_message = parse_exception(exc)
                if exc_message == "The message must be an iterator or an async iterator.":
                    exc_message = "This stream has already been closed."
                yield str(StreamData(event="error", data={"error": exc_message}))
            finally:
                logger.debug("Closing stream")
                await chat_service.set_cache(flow_id_str, graph)
                yield str(StreamData(event="close", data={"message": "Stream closed"}))

        return StreamingResponse(stream_vertex(), media_type="text/event-stream")
    except Exception as exc:
        raise HTTPException(status_code=500, detail="Error building Component") from exc

通过确保将INPUT_FIELD_NAME参数正确设置为迭代器或异步迭代器,并使stream方法得到正确实现,你应该能够解决错误并通过API 1(https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/graph/vertex/types.py)2(https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/graph/vertex/types.py)3(https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/api/v1/chat.py)4(https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/api/v1/chat.py)成功运行流程。

  • 要继续与@dosu交谈,请提及@dosu。*
v2g6jxz6

v2g6jxz62#

嘿,@Nitewriter,错误The message must be an iterator or an async iterator.通常意味着组件中出现了问题。我相信你会在日志中找到一条指向实际问题的ERROR日志行。

yacmzcpb

yacmzcpb3#

@nicoloboschi 我会看看我能找到什么并在这里发布。谢谢。

qq24tv8q

qq24tv8q4#

以下是文本内容的翻译结果:

@nicoloboschi 当我使用curl调用流程时,我得到了Traceback的结果:

ValueError: The message must be an iterator or an async iterator. (<class 'langflow.graph.vertex.types.ComponentVertex'>)                    
                                                                                                                                                                          
                             ╭─────────────────────────────────────────── Traceback (most recent call last) ────────────────────────────────────────────╮                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/api/v1/endpoints.py:203 in simplified_run_flow       │                 
                             │                                                                                                                          │                 
                             │   200 │   """                                                                                                            │                 
                             │   201 │   start_time = time.perf_counter()                                                                               │                 
                             │   202 │   try:                                                                                                           │                 
                             │ ❱ 203 │   │   result = await simple_run_flow(                                                                            │                 
                             │   204 │   │   │   flow=flow,                                                                                             │                 
                             │   205 │   │   │   input_request=input_request,                                                                           │                 
                             │   206 │   │   │   stream=stream,                                                                                         │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/api/v1/endpoints.py:107 in simple_run_flow           │                 
                             │                                                                                                                          │                 
                             │   104 │   │   │   │   │   and (input_request.output_type == "any" or input_request.output_type                           │                 
                             │       in vertex.id.lower())  # type: ignore                                                                              │                 
                             │   105 │   │   │   │   )                                                                                                  │                 
                             │   106 │   │   │   ]                                                                                                      │                 
                             │ ❱ 107 │   │   task_result, session_id = await run_graph_internal(                                                        │                 
                             │   108 │   │   │   graph=graph,                                                                                           │                 
                             │   109 │   │   │   flow_id=flow_id_str,                                                                                   │                 
                             │   110 │   │   │   session_id=input_request.session_id,                                                                   │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/processing/process.py:48 in run_graph_internal       │                 
                             │                                                                                                                          │                 
                             │    45 │                                                                                                                  │                 
                             │    46 │   fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var                                     │                 
                             │    47 │                                                                                                                  │                 
                             │ ❱  48 │   run_outputs = await graph.arun(                                                                                │                 
                             │    49 │   │   inputs=inputs_list,                                                                                        │                 
                             │    50 │   │   inputs_components=components,                                                                              │                 
                             │    51 │   │   types=types,                                                                                               │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/graph/base.py:455 in arun                      │                 
                             │                                                                                                                          │                 
                             │    452 │   │   for _ in range(len(inputs) - len(types)):                                                                 │                 
                             │    453 │   │   │   types.append("chat")  # default to chat                                                               │                 
                             │    454 │   │   for run_inputs, components, input_type in zip(inputs, inputs_components, types):                          │                 
                             │ ❱  455 │   │   │   run_outputs = await self._run(                                                                        │                 
                             │    456 │   │   │   │   inputs=run_inputs,                                                                                │                 
                             │    457 │   │   │   │   input_components=components,                                                                      │                 
                             │    458 │   │   │   │   input_type=input_type,                                                                            │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/graph/base.py:357 in _run                      │                 
                             │                                                                                                                          │                 
                             │    354 │   │   │   │   raise ValueError(f"Vertex {vertex_id} not found")                                                 │                 
                             │    355 │   │   │                                                                                                         │                 
                             │    356 │   │   │   if not vertex.result and not stream and hasattr(vertex,                                               │                 
                             │        "consume_async_generator"):                                                                                       │                 
                             │ ❱  357 │   │   │   │   await vertex.consume_async_generator()                                                            │                 
                             │    358 │   │   │   if (not outputs and vertex.is_output) or (vertex.display_name in outputs or                           │                 
                             │        vertex.id in outputs):                                                                                            │                 
                             │    359 │   │   │   │   vertex_outputs.append(vertex.result)                                                              │                 
                             │    360                                                                                                                   │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/vertex/types.py:436 in consume_async_generator │                 
                             │                                                                                                                          │                 
                             │   433 │   │   self._built = True                                                                                         │                 
                             │   434 │                                                                                                                  │                 
                             │   435 │   async def consume_async_generator(self):                                                                       │                 
                             │ ❱ 436 │   │   async for _ in self.stream():                                                                              │                 
                             │   437 │   │   │   pass                                                                                                   │                 
                             │   438 │                                                                                                                  │                 
                             │   439 │   def _is_chat_input(self):                                                                                      │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/vertex/types.py:363 in stream                  │                 
                             │                                                                                                                          │                 
                             │   360 │   async def stream(self):                                                                                        │                 
                             │   361 │   │   iterator = self.params.get(INPUT_FIELD_NAME, None)                                                         │                 
                             │   362 │   │   if not isinstance(iterator, (AsyncIterator, Iterator)):                                                    │                 
                             │ ❱ 363 │   │   │   raise ValueError(                                                                                      │                 
                             │   364 │   │   │   │   f"The message must be an iterator or an async iterator.                                            │                 
                             │       ({type(iterator)})"                                                                                                │                 
                             │   365 │   │   │   )                                                                                                      │                 
                             │   366 │   │   is_async = isinstance(iterator, AsyncIterator)                                                             │                 
                             ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                 
                             ValueError: The message must be an iterator or an async iterator. (<class 'langflow.graph.vertex.types.ComponentVertex'>)

我将迭代器类型添加到了ValueError消息中,以便我们可以看到它接收到了什么。

krcsximq

krcsximq5#

@nicoloboschi 好的,我想我可能找到了问题所在。这似乎是因为Conditional Router组件在图的错误分支上调用了stop。运行器仍然想要解析该图的停止分支上的Chat Output组件,并因为OpenAI组件处于非活动状态(未构建)而出现错误。是否可以使用条件路由器实现对话分支?

8wtpewkr

8wtpewkr6#

About Bruno 聊天输出组件的输入字段上发生错误

❯ curl -X POST \                                                                                                                                                                                       
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "What is it like being an AI?",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {         
  "ChatInput-Ma4Jp": {},
  "Prompt-DfMtR": {},           
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},      
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "Pass-hbQT6": {},  
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {},     
  "OpenAIModel-pHvfu": {},
  "OpenAIModel-w2hnb": {},
  "OpenAIModel-dUp0N": {}
}}'

Not About Bruno 聊天输出组件的输入字段上发生错误

❯ curl -X POST \                                                                                                                                                                                       
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "Have you heard about Bruno?",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {         
  "ChatInput-Ma4Jp": {},
  "Prompt-DfMtR": {},           
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},      
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "Pass-hbQT6": {},  
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {},     
  "OpenAIModel-pHvfu": {},
  "OpenAIModel-w2hnb": {},
  "OpenAIModel-dUp0N": {}
}}'
fae0ux8s

fae0ux8s7#

Bailing instead of raising an error does allow the flow to work, however, it is probably not the solution you would want.

src/backend/base/langflow/graph/vertex/types.py:InterfaceVertex:stream

async def stream(self):
        iterator = self.params.get(INPUT_FIELD_NAME, None)
        if not isinstance(iterator, (AsyncIterator, Iterator)):
            return
ftf50wuq

ftf50wuq8#

@ogabrielluiz 。这看起来像是一个图构建器问题,你能看一下吗?

相关问题