langchain 工具调用具有大参数的阻塞发生在on_chat_model_stream和on_chat_model_end之间,

rjjhvcjd  于 6个月前  发布在  其他
关注(0)|答案(2)|浏览(58)

检查其他资源

  • 我为这个问题添加了一个非常描述性的标题。
  • 我使用集成搜索在 LangGraph /LangChain 文档中进行了搜索。
  • 我使用 GitHub 搜索查找类似的问题,但没有找到。
  • 我确信这是 LangGraph/LangChain 中的一个 bug,而不是我的代码。
  • 我确信这应该作为一个问题 rather than a GitHub discussion 提出,因为这是一个 LangGraph bug 而不是设计问题。

示例代码

import getpass

api_endpoint = getpass.getpass("API Endpoint")
api_key = getpass.getpass("API Key")

from datetime import datetime

from langchain_core.messages import HumanMessage
from langchain_openai import AzureChatOpenAI
from langgraph.graph import END, MessageGraph
from langgraph.prebuilt import ToolExecutor
from langchain.tools import tool

from langchain_openai import AzureChatOpenAI

@tool
def file_saver(text: str) -> str:
    """Persist the given string to disk
"""
    pass

model = AzureChatOpenAI(
    deployment_name="cogdep-gpt-4o",
    model_name="gpt-4o",
    azure_endpoint=api_endpoint,
    openai_api_key=api_key,
    openai_api_type="azure",
    openai_api_version="2024-05-01-preview",
    streaming=True,
    temperature=0.1
)

tools = [file_saver]
model = model.bind_tools(tools)

def get_agent_executor():
    def should_continue(messages):
        print(f"{datetime.now()}: Starting should_continue")
        return "end"

    async def call_model(messages):
        response = await model.ainvoke(messages)
        return response

    workflow = MessageGraph()

    workflow.add_node("agent", call_model)

    workflow.set_entry_point("agent")

    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "end": END,
        },
    )
    return workflow.compile()

agent_executor = get_agent_executor()

messages = [HumanMessage(content="Think of a poem with 100 verses and save it to a file. Do not print it to me first.")]

async def run():
    async for event in agent_executor.astream_events(messages, version="v1"):
        kind = event["event"]
        print(f"{datetime.now()}: Received event: {kind}")

await run()

错误信息和堆栈跟踪(如果适用)

This is part of the output (in this case, there is a 23s gap between `on_chat_model_stream` and `on_chat_model_end`)

(...)
2024-07-09 05:29:35.705573: Received event: on_chat_model_stream
2024-07-09 05:29:35.713679: Received event: on_chat_model_stream
2024-07-09 05:29:35.724480: Received event: on_chat_model_stream
2024-07-09 05:29:35.753143: Received event: on_chat_model_stream
2024-07-09 05:29:58.571740: Received event: on_chat_model_end
2024-07-09 05:29:58.574671: Received event: on_chain_start
2024-07-09 05:29:58.576026: Received event: on_chain_end
2024-07-09 05:29:58.577963: Received event: on_chain_start
2024-07-09 05:29:58.578214: Starting should_continue

描述

你好!
当我们收到一个导致参数中包含大量数据的工具调用的 llm 答案时,我们注意到尽管我们正在使用异步版本,但我们的程序被阻塞了。我的猜测是,最终的消息是在上一条消息流传输之后构建的,这在 CPU 上需要一些时间?另外,有没有我们可以使用的不同方法?
非常感谢!

系统信息

System Information
------------------
> OS:  Linux
> OS Version:  langchain-ai/langgraph#1 SMP PREEMPT Thu Nov 16 10:49:20 UTC 2023
> Python Version:  3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 11:57:02) [GCC 12.3.0]

Package Information
-------------------
> langchain_core: 0.2.11
> langchain: 0.2.6
> langsmith: 0.1.84
> langchain_openai: 0.1.14
> langchain_text_splitters: 0.2.2
> langgraph: 0.1.5

Packages not installed (Not Necessarily a Problem)
--------------------------------------------------
The following packages were not found:

> langserve
vsmadaxz

vsmadaxz1#

感谢指出问题。我能够重现延迟。
看起来我们在解析json值上花费了过多的时间,这是我们可以改进的地方(我认为通过避免重复工作,尽管还没有深入研究过)。感谢引起我们的注意。
之前的运行统计概要。

附带的cprofile文件。

7    0.001    0.000   39.557    5.651 python3.11/sit
e-packages/langchain_core/language_models/chat_models.py:771(_agenerate_with_cache)
        1    0.011    0.011   39.537   39.537 python3.11/sit
e-packages/langchain_core/language_models/chat_models.py:79(generate_from_stream)
     4013    0.014    0.000   39.525    0.010 langchain_core/outputs/chat_generation.py:83(__add__)
     4013    0.023    0.000   39.413    0.010 langchain_core/messages/ai.py:232(__add__)
12050/8035    0.022    0.000   39.381    0.005 pydantic/v1/main.py:332(__init__)
12050/8035    0.139    0.000   39.360    0.005 pydantic/v1/main.py:1030(validate_model)
     4016    0.008    0.000   39.300    0.010 langchain_core/messages/ai.py:78(__init__)
     4016    0.007    0.000   39.292    0.010 langchain_core/messages/base.py:57(__init__)
     4015    0.014    0.000   38.904    0.010 langchain_core/messages/ai.py:178(init_tool_calls)
     4014   11.357    0.003   38.890    0.010 langchain_core/utils/json.py:44(parse_partial_json)
  2158964    2.544    0.000   27.532    0.000 python3.11/json/__init__.py:299(loads)
  2158964    1.206    0.000   21.431    0.000 python3.11/json/decoder.py:332(decode)
  2158964   16.069    0.000   19.615    0.000 /U

profiles.tgz
重现脚本:

import asyncio
import cProfile
import io
import pstats
from datetime import datetime
from typing import Literal

from langchain.tools import tool
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

from langgraph.graph import END, MessageGraph
from pyinstrument import Profiler

@tool
def file_saver(text: str) -> str:
    """Persist the given string to disk"""
    pass

model = ChatOpenAI(
    model="gpt-4o",
    streaming=True,
    temperature=0.1,
)

tools = [file_saver]
model = model.bind_tools(tools)

def get_agent_executor():
    def should_continue(messages):
        print(f"{datetime.now()}: Starting should_continue")
        return "end"

    async def call_model(messages):
        response = await model.ainvoke(messages)
        return response

    workflow = MessageGraph()

    workflow.add_node("agent", call_model)

    workflow.set_entry_point("agent")

    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "end": END,
        },
    )
    return workflow.compile()

agent_executor = get_agent_executor()

messages = [
    HumanMessage(
        content="Think of a ballad with 300 verses and save it"
        " to a file. Do not print it to me first. At the end of the poem, sign it as 's.b. Anonymous'."
        " You must write every line, do not skip."
    )
]

async def run():
    saw_cme = False
    py_profiler = None
    cprofiler = None
    evs = None
    async for event in agent_executor.astream_events(messages, version="v1"):
        kind = event["event"]
        if saw_cme:
            continue
        if kind == "on_chat_model_stream":
            if tc := event["data"]["chunk"].additional_kwargs.get("tool_calls"):
                if "Anonymous" in str(tc[0]["function"]):
                    if py_profiler:
                        py_profiler.stop()
                    py_profiler = Profiler(interval=0.0001, async_mode="disabled")
                    py_profiler.start()
                    if cprofiler:
                        cprofiler.disable()
                    cprofiler = cProfile.Profile()
                    cprofiler.enable()

        if kind == "on_chat_model_end":
            saw_cme = True
            if py_profiler:
                py_profiler.stop()
                py_profiler.write_html("profile.html", show_all=True)
                cprofiler.disable()
                s = io.StringIO()
                ps = pstats.Stats(cprofiler, stream=s).sort_stats(
                    pstats.SortKey.CUMULATIVE
                )
                ps.print_stats()
                with open("profile.txt", "w") as f:
                    f.write(s.getvalue())
            else:
                print("No profiling data")
            print(evs)
        evs = f"{datetime.now()}: Received event: {kind}"
        if kind == "on_chat_model_end":
            print(evs)

asyncio.run(run())
jchrr9hc

jchrr9hc2#

将要转移到langchain repo,但由于底层问题仍然存在。将向团队报告。

相关问题