langchain 功能消息在stream_events API中无法使用,

pkbketx9  于 6个月前  发布在  其他
关注(0)|答案(2)|浏览(89)

检查其他资源

  • 我为这个问题添加了一个非常描述性的标题。
  • 我使用集成搜索在LangChain文档中进行了搜索。
  • 我使用GitHub搜索查找了一个类似的问题,但没有找到。
  • 我确信这是LangChain中的一个错误,而不是我的代码。
  • 通过更新到LangChain的最新稳定版本(或特定集成包)无法解决此错误。

示例代码

import os
from typing import *

from langchain_anthropic import ChatAnthropic
from langchain_mongodb.chat_message_histories import MongoDBChatMessageHistory
from langchain.agents import create_tool_calling_agent
from langchain_core.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from langchain.agents import AgentExecutor
from langchain.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.tools import Tool
from langchain_community.utilities import GoogleSerperAPIWrapper
from uuid import uuid4
chat_id = "b894e0c7-acb1-4907-9bsbc-bb98f5a970dc"

def google_search_tool(iso: str="us"):
    google_search = GoogleSerperAPIWrapper(gl=iso)
    google_image_search = GoogleSerperAPIWrapper(gl=iso, type="images")
    google_news_search = GoogleSerperAPIWrapper(gl=iso, type="news")
    google_places_search = GoogleSerperAPIWrapper(gl=iso, type="places")
    
    return [
        Tool(
            name="google_search",
            func=google_search.run,
            description="Search Google for information."
        ),
        Tool(
            name="google_image_search",
            func=google_image_search.run,
            description="Search Google for images."
        ),
        Tool(
            name="google_news_search",
            func=google_news_search.run,
            description="Search Google for news."
        ),
        Tool(
            name="google_places_search",
            func=google_places_search.run,
            description="Search Google for places."
        )
    ]

workspace_id = "test"
request_id = str(uuid4())
system_template = "You are a helpful AI agent. Always use the tools at your dispoal"
prompt = ""
tools = google_search_tool("in")
llm_kwargs = {}
llm = ChatAnthropic(
    model="claude-3-5-sonnet-20240620",
    streaming=True,
    api_key="yurrrrrrrrrrrrr",
)

base_template = ChatPromptTemplate.from_messages([
    ("system", system_template),
    MessagesPlaceholder(variable_name="chat_history") if chat_id else None,
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

agent = create_tool_calling_agent(llm=llm, tools=tools, prompt=base_template)

chat_message_history = MongoDBChatMessageHistory(
    session_id=chat_id,
    connection_string=os.getenv('MONGO_URI'),
    database_name=os.getenv('MONGO_DBNAME'), # "api"
    collection_name="chat_histories",
)

conversational_memory = ConversationBufferWindowMemory(
    chat_memory=chat_message_history,
    memory_key='chat_history',
    return_messages=True,
    output_key="output",
    input_key="input",
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=conversational_memory,
    return_intermediate_steps=True,
    handle_parsing_errors=True
).with_config({"run_name": "Agent"})

response = []
run = agent_executor.astream_events(input = {"input": "what is glg stock"}, version="v2")
async for event in run:
    response.append(event)
    kind = event["event"]
    if kind == "on_chain_start":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print(
                f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
            )
    elif kind == "on_chain_end":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print()
            print("--")
            print(
                f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
            )
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind == "on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

from langchain_core.messages import FunctionMessage
import json

messages = chat_message_history.messages

for resp in response:
    if resp['event'] == "on_tool_end":
        tool_msg = FunctionMessage(content=json.dumps(resp['data']), id=resp['run_id'], name=resp['name'])   
        messages.insert(-1, tool_msg)
        
chat_message_history.clear()
chat_message_history.add_messages(messages)
chat_message_history.messages

错误信息和堆栈跟踪(如果适用)

{
	"name": "KeyError",
	"message": "'function'",
	"stack": "---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[14], line 3
      1 response = []
      2 run = agent_executor.astream_events(input = {\"input\": \"what is glg stock\"}, version=\"v2\")
----> 3 async for event in run:
      4     response.append(event)
      5     kind = event[\"event\"]

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:4788, in RunnableBindingBase.astream_events(self, input, config, **kwargs)
   4782 async def astream_events(
   4783     self,
   4784     input: Input,
   4785     config: Optional[RunnableConfig] = None,
   4786     **kwargs: Optional[Any],
   4787 ) -> AsyncIterator[StreamEvent]:
-> 4788     async for item in self.bound.astream_events(
   4789         input, self._merge_configs(config), **{**self.kwargs, **kwargs}
   4790     ):
   4791         yield item

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1146, in Runnable.astream_events(self, input, config, version, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
   1141     raise NotImplementedError(
   1142         'Only versions \"v1\" and \"v2\" of the schema is currently supported.'
   1143     )
   1145 async with aclosing(event_stream):
-> 1146     async for event in event_stream:
   1147         yield event

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:947, in _astream_events_implementation_v2(runnable, input, config, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
    945 # Await it anyway, to run any cleanup code, and propagate any exceptions
    946 try:
--> 947     await task
    948 except asyncio.CancelledError:
    949     pass

File /usr/lib/python3.10/asyncio/futures.py:288, in Future.__await__(self)
    286 if not self.done():
    287     raise RuntimeError(\"await wasn't used with future\")
--> 288 return self.result()

File /usr/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._result

File /usr/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don't have `__iter__` and `__next__` methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:907, in _astream_events_implementation_v2.<locals>.consume_astream()
    904 try:
    905     # if astream also calls tap_output_aiter this will be a no-op
    906     async with aclosing(runnable.astream(input, config, **kwargs)) as stream:
--> 907         async for _ in event_streamer.tap_output_aiter(run_id, stream):
    908             # All the content will be picked up
    909             pass
    910 finally:

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:153, in _AstreamEventsCallbackHandler.tap_output_aiter(self, run_id, output)
    151 tap = self.is_tapped.setdefault(run_id, sentinel)
    152 # wait for first chunk
--> 153 first = await py_anext(output, default=sentinel)
    154 if first is sentinel:
    155     return

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/utils/aiter.py:65, in py_anext.<locals>.anext_impl()
     58 async def anext_impl() -> Union[T, Any]:
     59     try:
     60         # The C code is way more low-level than this, as it implements
     61         # all methods of the iterator protocol. In this implementation
     62         # we're relying on higher-level coroutine concepts, but that's
     63         # exactly what we want -- crosstest pure-Python high-level
     64         # implementation and low-level C anext() iterators.
---> 65         return await __anext__(iterator)
     66     except StopAsyncIteration:
     67         return default

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:1595, in AgentExecutor.astream(self, input, config, **kwargs)
   1583 config = ensure_config(config)
   1584 iterator = AgentExecutorIterator(
   1585     self,
   1586     input,
   (...)
   1593     **kwargs,
   1594 )
-> 1595 async for step in iterator:
   1596     yield step

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent_iterator.py:246, in AgentExecutorIterator.__aiter__(self)
    240 while self.agent_executor._should_continue(
    241     self.iterations, self.time_elapsed
    242 ):
    243     # take the next step: this plans next action, executes it,
    244     # yielding action and observation as they are generated
    245     next_step_seq: NextStepOutput = []
--> 246     async for chunk in self.agent_executor._aiter_next_step(
    247         self.name_to_tool_map,
    248         self.color_mapping,
    249         self.inputs,
    250         self.intermediate_steps,
    251         run_manager,
    252     ):
    253         next_step_seq.append(chunk)
    254         # if we're yielding actions, yield them as they come
    255         # do not yield AgentFinish, which will be handled below

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:1304, in AgentExecutor._aiter_next_step(self, name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager)
   1301     intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
   1303     # Call the LLM to see what to do.
-> 1304     output = await self.agent.aplan(
   1305         intermediate_steps,
   1306         callbacks=run_manager.get_child() if run_manager else None,
   1307         **inputs,
   1308     )
   1309 except OutputParserException as e:
   1310     if isinstance(self.handle_parsing_errors, bool):

File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:554, in RunnableMultiActionAgent.aplan(self, intermediate_steps, callbacks, **kwargs)
    546 final_output: Any = None
    547 if self.stream_runnable:
    548     # Use streaming to make sure that the underlying LLM is invoked in a
    549     # streaming
   (...)
    552     # Because the response from the plan is not a generator, we need to
    553     # accumulate the output into final output and return that.
--> 554     async for chunk in self.runnable.astream(
    555         inputs, config={\"callbacks\": callbacks}
    556     ):
    557         if final_output is None:
    558             final_output = chunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2910, in RunnableSequence.astream(self, input, config, **kwargs)
   2907 async def input_aiter() -> AsyncIterator[Input]:
   2908     yield input
-> 2910 async for chunk in self.atransform(input_aiter(), config, **kwargs):
   2911     yield chunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2893, in RunnableSequence.atransform(self, input, config, **kwargs)
   2887 async def atransform(
   2888     self,
   2889     input: AsyncIterator[Input],
   2890     config: Optional[RunnableConfig] = None,
   2891     **kwargs: Optional[Any],
   2892 ) -> AsyncIterator[Output]:
-> 2893     async for chunk in self._atransform_stream_with_config(
   2894         input,
   2895         self._atransform,
   2896         patch_config(config, run_name=(config or {}).get(\"run_name\") or self.name),
   2897         **kwargs,
   2898     ):
   2899         yield chunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1981, in Runnable._atransform_stream_with_config(self, input, transformer, config, run_type, **kwargs)
   1976     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
   1977         py_anext(iterator),  # type: ignore[arg-type]
   1978         context=context,
   1979     )
   1980 else:
-> 1981     chunk = cast(Output, await py_anext(iterator))
   1982 yield chunk
   1983 if final_output_supported:

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:153, in _AstreamEventsCallbackHandler.tap_output_aiter(self, run_id, output)
    151 tap = self.is_tapped.setdefault(run_id, sentinel)
    152 # wait for first chunk
--> 153 first = await py_anext(output, default=sentinel)
    154 if first is sentinel:
    155     return

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/utils/aiter.py:65, in py_anext.<locals>.anext_impl()
     58 async def anext_impl() -> Union[T, Any]:
     59     try:
     60         # The C code is way more low-level than this, as it implements
     61         # all methods of the iterator protocol. In this implementation
     62         # we're relying on higher-level coroutine concepts, but that's
     63         # exactly what we want -- crosstest pure-Python high-level
     64         # implementation and low-level C anext() iterators.
---> 65         return await __anext__(iterator)
     66     except StopAsyncIteration:
     67         return default

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2863, in RunnableSequence._atransform(self, input, run_manager, config, **kwargs)
   2861     else:
   2862         final_pipeline = step.atransform(final_pipeline, config)
-> 2863 async for output in final_pipeline:
   2864     yield output

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1197, in Runnable.atransform(self, input, config, **kwargs)
   1194 final: Input
   1195 got_first_val = False
-> 1197 async for ichunk in input:
   1198     # The default implementation of transform is to buffer input and
   1199     # then call stream.
   1200     # It'll attempt to gather all input into a single chunk using
   1201     # the `+` operator.
   1202     # If the input is not addable, then we'll assume that we can
   1203     # only operate on the last chunk,
   1204     # and we'll iterate until we get to the last chunk.
   1205     if not got_first_val:
   1206         final = ichunk

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:4811, in RunnableBindingBase.atransform(self, input, config, **kwargs)
   4805 async def atransform(
   4806     self,
   4807     input: AsyncIterator[Input],
   4808     config: Optional[RunnableConfig] = None,
   4809     **kwargs: Any,
   4810 ) -> AsyncIterator[Output]:
-> 4811     async for item in self.bound.atransform(
   4812         input,
   4813         self._merge_configs(config),
   4814         **{**self.kwargs, **kwargs},
   4815     ):
   4816         yield item

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1215, in Runnable.atransform(self, input, config, **kwargs)
   1212             final = ichunk
   1214 if got_first_val:
-> 1215     async for output in self.astream(final, config, **kwargs):
   1216         yield output

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:417, in BaseChatModel.astream(self, input, config, stop, **kwargs)
    412 except BaseException as e:
    413     await run_manager.on_llm_error(
    414         e,
    415         response=LLMResult(generations=[[generation]] if generation else []),
    416     )
--> 417     raise e
    418 else:
    419     await run_manager.on_llm_end(
    420         LLMResult(generations=[[generation]]),
    421     )

File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:395, in BaseChatModel.astream(self, input, config, stop, **kwargs)
    393 generation: Optional[ChatGenerationChunk] = None
    394 try:
--> 395     async for chunk in self._astream(
    396         messages,
    397         stop=stop,
    398         **kwargs,
    399     ):
    400         if chunk.message.id is None:
    401             chunk.message.id = f\"run-{run_manager.run_id}\"

File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:701, in ChatAnthropic._astream(self, messages, stop, run_manager, stream_usage, **kwargs)
    699     stream_usage = self.stream_usage
    700 kwargs[\"stream\"] = True
--> 701 payload = self._get_request_payload(messages, stop=stop, **kwargs)
    702 stream = await self._async_client.messages.create(**payload)
    703 coerce_content_to_string = not _tools_in_params(payload)

File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:647, in ChatAnthropic._get_request_payload(self, input_, stop, **kwargs)
    639 def _get_request_payload(
    640     self,
    641     input_: LanguageModelInput,
   (...)
    644     **kwargs: Dict,
    645 ) -> Dict:
    646     messages = self._convert_input(input_).to_messages()
--> 647     system, formatted_messages = _format_messages(messages)
    648     payload = {
    649         \"model\": self.model,
    650         \"max_tokens\": self.max_tokens,
   (...)
    658         **kwargs,
    659     }
    660     return {k: v for k, v in payload.items() if v is not None}

File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:170, in _format_messages(messages)
    167     system = message.content
    168     continue
--> 170 role = _message_type_lookups[message.type]
    171 content: Union[str, List]
    173 if not isinstance(message.content, str):
    174     # parse as dict

KeyError: 'function'"
}

描述

当我将FunctionMessage添加到chathistory并再次运行代理时,会出现上述错误。
例如:

  1. 第一次运行)输入:苹果股票rn - 运行正常
  2. 第二次运行)输入:谷歌股票rn - 出现上述错误
bmvo0sr5

bmvo0sr51#

从堆栈跟踪来看,似乎不是来自astream事件API,而是聊天人类实现中的某个地方。
你是否介意检查一下,如果你生成一个与聊天历史记录相对应的BaseMessage列表,然后用async for chunk in model.astream(messages)list(model.stream(messages))调用它们会发生什么。
我怀疑它也会触发错误,并帮助产生一个最小可复现的例子

ecfdbz9o

ecfdbz9o2#

在周三,2024年7月10日凌晨3点,Eugene Yurtsev ***@***.***>写道:从堆栈跟踪来看,似乎这不是来自astream事件API的错误,而是聊天anthropic实现中的问题。你是否介意检查一下,如果你生成一个与聊天历史记录相对应的BaseMessage列表,然后使用async for chunk in model.astream(messages)和list(model.stream(messages))调用它们,会发生什么。我怀疑这也会触发错误,并帮助产生一个最小可复现的例子——直接回复此邮件,查看GitHub上的<#24007 (comment)>,或取消订阅< https://github.com/notifications/unsubscribe-auth/AUY3AOKCW4GTA54VGNIJLJDZLRJARAVCNFSM6AAAAABKSGSBKSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMJYG43DINBXGI >。你收到这个消息是因为你创建了这条线程。

消息ID:***@***.***>

相关问题