检查其他资源
- 我为这个问题添加了一个非常描述性的标题。
- 我使用集成搜索在LangChain文档中进行了搜索。
- 我使用GitHub搜索查找了一个类似的问题,但没有找到。
- 我确信这是LangChain中的一个错误,而不是我的代码。
- 通过更新到LangChain的最新稳定版本(或特定集成包)无法解决此错误。
示例代码
import os
from typing import *
from langchain_anthropic import ChatAnthropic
from langchain_mongodb.chat_message_histories import MongoDBChatMessageHistory
from langchain.agents import create_tool_calling_agent
from langchain_core.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferWindowMemory
from langchain.agents import AgentExecutor
from langchain.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.tools import Tool
from langchain_community.utilities import GoogleSerperAPIWrapper
from uuid import uuid4
chat_id = "b894e0c7-acb1-4907-9bsbc-bb98f5a970dc"
def google_search_tool(iso: str="us"):
google_search = GoogleSerperAPIWrapper(gl=iso)
google_image_search = GoogleSerperAPIWrapper(gl=iso, type="images")
google_news_search = GoogleSerperAPIWrapper(gl=iso, type="news")
google_places_search = GoogleSerperAPIWrapper(gl=iso, type="places")
return [
Tool(
name="google_search",
func=google_search.run,
description="Search Google for information."
),
Tool(
name="google_image_search",
func=google_image_search.run,
description="Search Google for images."
),
Tool(
name="google_news_search",
func=google_news_search.run,
description="Search Google for news."
),
Tool(
name="google_places_search",
func=google_places_search.run,
description="Search Google for places."
)
]
workspace_id = "test"
request_id = str(uuid4())
system_template = "You are a helpful AI agent. Always use the tools at your dispoal"
prompt = ""
tools = google_search_tool("in")
llm_kwargs = {}
llm = ChatAnthropic(
model="claude-3-5-sonnet-20240620",
streaming=True,
api_key="yurrrrrrrrrrrrr",
)
base_template = ChatPromptTemplate.from_messages([
("system", system_template),
MessagesPlaceholder(variable_name="chat_history") if chat_id else None,
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
agent = create_tool_calling_agent(llm=llm, tools=tools, prompt=base_template)
chat_message_history = MongoDBChatMessageHistory(
session_id=chat_id,
connection_string=os.getenv('MONGO_URI'),
database_name=os.getenv('MONGO_DBNAME'), # "api"
collection_name="chat_histories",
)
conversational_memory = ConversationBufferWindowMemory(
chat_memory=chat_message_history,
memory_key='chat_history',
return_messages=True,
output_key="output",
input_key="input",
)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=conversational_memory,
return_intermediate_steps=True,
handle_parsing_errors=True
).with_config({"run_name": "Agent"})
response = []
run = agent_executor.astream_events(input = {"input": "what is glg stock"}, version="v2")
async for event in run:
response.append(event)
kind = event["event"]
if kind == "on_chain_start":
if (
event["name"] == "Agent"
): # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
print(
f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
)
elif kind == "on_chain_end":
if (
event["name"] == "Agent"
): # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
print()
print("--")
print(
f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
)
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
elif kind == "on_tool_start":
print("--")
print(
f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
)
elif kind == "on_tool_end":
print(f"Done tool: {event['name']}")
print(f"Tool output was: {event['data'].get('output')}")
print("--")
from langchain_core.messages import FunctionMessage
import json
messages = chat_message_history.messages
for resp in response:
if resp['event'] == "on_tool_end":
tool_msg = FunctionMessage(content=json.dumps(resp['data']), id=resp['run_id'], name=resp['name'])
messages.insert(-1, tool_msg)
chat_message_history.clear()
chat_message_history.add_messages(messages)
chat_message_history.messages
错误信息和堆栈跟踪(如果适用)
{
"name": "KeyError",
"message": "'function'",
"stack": "---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[14], line 3
1 response = []
2 run = agent_executor.astream_events(input = {\"input\": \"what is glg stock\"}, version=\"v2\")
----> 3 async for event in run:
4 response.append(event)
5 kind = event[\"event\"]
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:4788, in RunnableBindingBase.astream_events(self, input, config, **kwargs)
4782 async def astream_events(
4783 self,
4784 input: Input,
4785 config: Optional[RunnableConfig] = None,
4786 **kwargs: Optional[Any],
4787 ) -> AsyncIterator[StreamEvent]:
-> 4788 async for item in self.bound.astream_events(
4789 input, self._merge_configs(config), **{**self.kwargs, **kwargs}
4790 ):
4791 yield item
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1146, in Runnable.astream_events(self, input, config, version, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
1141 raise NotImplementedError(
1142 'Only versions \"v1\" and \"v2\" of the schema is currently supported.'
1143 )
1145 async with aclosing(event_stream):
-> 1146 async for event in event_stream:
1147 yield event
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:947, in _astream_events_implementation_v2(runnable, input, config, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
945 # Await it anyway, to run any cleanup code, and propagate any exceptions
946 try:
--> 947 await task
948 except asyncio.CancelledError:
949 pass
File /usr/lib/python3.10/asyncio/futures.py:288, in Future.__await__(self)
286 if not self.done():
287 raise RuntimeError(\"await wasn't used with future\")
--> 288 return self.result()
File /usr/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
199 self.__log_traceback = False
200 if self._exception is not None:
--> 201 raise self._exception.with_traceback(self._exception_tb)
202 return self._result
File /usr/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:907, in _astream_events_implementation_v2.<locals>.consume_astream()
904 try:
905 # if astream also calls tap_output_aiter this will be a no-op
906 async with aclosing(runnable.astream(input, config, **kwargs)) as stream:
--> 907 async for _ in event_streamer.tap_output_aiter(run_id, stream):
908 # All the content will be picked up
909 pass
910 finally:
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:153, in _AstreamEventsCallbackHandler.tap_output_aiter(self, run_id, output)
151 tap = self.is_tapped.setdefault(run_id, sentinel)
152 # wait for first chunk
--> 153 first = await py_anext(output, default=sentinel)
154 if first is sentinel:
155 return
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/utils/aiter.py:65, in py_anext.<locals>.anext_impl()
58 async def anext_impl() -> Union[T, Any]:
59 try:
60 # The C code is way more low-level than this, as it implements
61 # all methods of the iterator protocol. In this implementation
62 # we're relying on higher-level coroutine concepts, but that's
63 # exactly what we want -- crosstest pure-Python high-level
64 # implementation and low-level C anext() iterators.
---> 65 return await __anext__(iterator)
66 except StopAsyncIteration:
67 return default
File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:1595, in AgentExecutor.astream(self, input, config, **kwargs)
1583 config = ensure_config(config)
1584 iterator = AgentExecutorIterator(
1585 self,
1586 input,
(...)
1593 **kwargs,
1594 )
-> 1595 async for step in iterator:
1596 yield step
File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent_iterator.py:246, in AgentExecutorIterator.__aiter__(self)
240 while self.agent_executor._should_continue(
241 self.iterations, self.time_elapsed
242 ):
243 # take the next step: this plans next action, executes it,
244 # yielding action and observation as they are generated
245 next_step_seq: NextStepOutput = []
--> 246 async for chunk in self.agent_executor._aiter_next_step(
247 self.name_to_tool_map,
248 self.color_mapping,
249 self.inputs,
250 self.intermediate_steps,
251 run_manager,
252 ):
253 next_step_seq.append(chunk)
254 # if we're yielding actions, yield them as they come
255 # do not yield AgentFinish, which will be handled below
File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:1304, in AgentExecutor._aiter_next_step(self, name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager)
1301 intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
1303 # Call the LLM to see what to do.
-> 1304 output = await self.agent.aplan(
1305 intermediate_steps,
1306 callbacks=run_manager.get_child() if run_manager else None,
1307 **inputs,
1308 )
1309 except OutputParserException as e:
1310 if isinstance(self.handle_parsing_errors, bool):
File ~/v3/.dev/lib/python3.10/site-packages/langchain/agents/agent.py:554, in RunnableMultiActionAgent.aplan(self, intermediate_steps, callbacks, **kwargs)
546 final_output: Any = None
547 if self.stream_runnable:
548 # Use streaming to make sure that the underlying LLM is invoked in a
549 # streaming
(...)
552 # Because the response from the plan is not a generator, we need to
553 # accumulate the output into final output and return that.
--> 554 async for chunk in self.runnable.astream(
555 inputs, config={\"callbacks\": callbacks}
556 ):
557 if final_output is None:
558 final_output = chunk
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2910, in RunnableSequence.astream(self, input, config, **kwargs)
2907 async def input_aiter() -> AsyncIterator[Input]:
2908 yield input
-> 2910 async for chunk in self.atransform(input_aiter(), config, **kwargs):
2911 yield chunk
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2893, in RunnableSequence.atransform(self, input, config, **kwargs)
2887 async def atransform(
2888 self,
2889 input: AsyncIterator[Input],
2890 config: Optional[RunnableConfig] = None,
2891 **kwargs: Optional[Any],
2892 ) -> AsyncIterator[Output]:
-> 2893 async for chunk in self._atransform_stream_with_config(
2894 input,
2895 self._atransform,
2896 patch_config(config, run_name=(config or {}).get(\"run_name\") or self.name),
2897 **kwargs,
2898 ):
2899 yield chunk
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1981, in Runnable._atransform_stream_with_config(self, input, transformer, config, run_type, **kwargs)
1976 chunk: Output = await asyncio.create_task( # type: ignore[call-arg]
1977 py_anext(iterator), # type: ignore[arg-type]
1978 context=context,
1979 )
1980 else:
-> 1981 chunk = cast(Output, await py_anext(iterator))
1982 yield chunk
1983 if final_output_supported:
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/tracers/event_stream.py:153, in _AstreamEventsCallbackHandler.tap_output_aiter(self, run_id, output)
151 tap = self.is_tapped.setdefault(run_id, sentinel)
152 # wait for first chunk
--> 153 first = await py_anext(output, default=sentinel)
154 if first is sentinel:
155 return
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/utils/aiter.py:65, in py_anext.<locals>.anext_impl()
58 async def anext_impl() -> Union[T, Any]:
59 try:
60 # The C code is way more low-level than this, as it implements
61 # all methods of the iterator protocol. In this implementation
62 # we're relying on higher-level coroutine concepts, but that's
63 # exactly what we want -- crosstest pure-Python high-level
64 # implementation and low-level C anext() iterators.
---> 65 return await __anext__(iterator)
66 except StopAsyncIteration:
67 return default
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:2863, in RunnableSequence._atransform(self, input, run_manager, config, **kwargs)
2861 else:
2862 final_pipeline = step.atransform(final_pipeline, config)
-> 2863 async for output in final_pipeline:
2864 yield output
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1197, in Runnable.atransform(self, input, config, **kwargs)
1194 final: Input
1195 got_first_val = False
-> 1197 async for ichunk in input:
1198 # The default implementation of transform is to buffer input and
1199 # then call stream.
1200 # It'll attempt to gather all input into a single chunk using
1201 # the `+` operator.
1202 # If the input is not addable, then we'll assume that we can
1203 # only operate on the last chunk,
1204 # and we'll iterate until we get to the last chunk.
1205 if not got_first_val:
1206 final = ichunk
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:4811, in RunnableBindingBase.atransform(self, input, config, **kwargs)
4805 async def atransform(
4806 self,
4807 input: AsyncIterator[Input],
4808 config: Optional[RunnableConfig] = None,
4809 **kwargs: Any,
4810 ) -> AsyncIterator[Output]:
-> 4811 async for item in self.bound.atransform(
4812 input,
4813 self._merge_configs(config),
4814 **{**self.kwargs, **kwargs},
4815 ):
4816 yield item
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/runnables/base.py:1215, in Runnable.atransform(self, input, config, **kwargs)
1212 final = ichunk
1214 if got_first_val:
-> 1215 async for output in self.astream(final, config, **kwargs):
1216 yield output
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:417, in BaseChatModel.astream(self, input, config, stop, **kwargs)
412 except BaseException as e:
413 await run_manager.on_llm_error(
414 e,
415 response=LLMResult(generations=[[generation]] if generation else []),
416 )
--> 417 raise e
418 else:
419 await run_manager.on_llm_end(
420 LLMResult(generations=[[generation]]),
421 )
File ~/v3/.dev/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:395, in BaseChatModel.astream(self, input, config, stop, **kwargs)
393 generation: Optional[ChatGenerationChunk] = None
394 try:
--> 395 async for chunk in self._astream(
396 messages,
397 stop=stop,
398 **kwargs,
399 ):
400 if chunk.message.id is None:
401 chunk.message.id = f\"run-{run_manager.run_id}\"
File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:701, in ChatAnthropic._astream(self, messages, stop, run_manager, stream_usage, **kwargs)
699 stream_usage = self.stream_usage
700 kwargs[\"stream\"] = True
--> 701 payload = self._get_request_payload(messages, stop=stop, **kwargs)
702 stream = await self._async_client.messages.create(**payload)
703 coerce_content_to_string = not _tools_in_params(payload)
File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:647, in ChatAnthropic._get_request_payload(self, input_, stop, **kwargs)
639 def _get_request_payload(
640 self,
641 input_: LanguageModelInput,
(...)
644 **kwargs: Dict,
645 ) -> Dict:
646 messages = self._convert_input(input_).to_messages()
--> 647 system, formatted_messages = _format_messages(messages)
648 payload = {
649 \"model\": self.model,
650 \"max_tokens\": self.max_tokens,
(...)
658 **kwargs,
659 }
660 return {k: v for k, v in payload.items() if v is not None}
File ~/v3/.dev/lib/python3.10/site-packages/langchain_anthropic/chat_models.py:170, in _format_messages(messages)
167 system = message.content
168 continue
--> 170 role = _message_type_lookups[message.type]
171 content: Union[str, List]
173 if not isinstance(message.content, str):
174 # parse as dict
KeyError: 'function'"
}
描述
当我将FunctionMessage添加到chathistory并再次运行代理时,会出现上述错误。
例如:
- 第一次运行)输入:苹果股票rn - 运行正常
- 第二次运行)输入:谷歌股票rn - 出现上述错误
2条答案
按热度按时间bmvo0sr51#
从堆栈跟踪来看,似乎不是来自astream事件API,而是聊天人类实现中的某个地方。
你是否介意检查一下,如果你生成一个与聊天历史记录相对应的BaseMessage列表,然后用
async for chunk in model.astream(messages)
和list(model.stream(messages))
调用它们会发生什么。我怀疑它也会触发错误,并帮助产生一个最小可复现的例子
ecfdbz9o2#
在周三,2024年7月10日凌晨3点,Eugene Yurtsev ***@***.***>写道:从堆栈跟踪来看,似乎这不是来自astream事件API的错误,而是聊天anthropic实现中的问题。你是否介意检查一下,如果你生成一个与聊天历史记录相对应的BaseMessage列表,然后使用async for chunk in model.astream(messages)和list(model.stream(messages))调用它们,会发生什么。我怀疑这也会触发错误,并帮助产生一个最小可复现的例子——直接回复此邮件,查看GitHub上的<#24007 (comment)>,或取消订阅< https://github.com/notifications/unsubscribe-auth/AUY3AOKCW4GTA54VGNIJLJDZLRJARAVCNFSM6AAAAABKSGSBKSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMJYG43DINBXGI >。你收到这个消息是因为你创建了这条线程。
消息ID:***@***.***>