langchain Astream Events在AzureMLChatOnlineEndpoint上无法工作,

oymdgrw7  于 5个月前  发布在  其他
关注(0)|答案(2)|浏览(45)

检查其他资源

  • 我为这个问题添加了一个非常描述性的标题。
  • 我使用LangChain文档中的集成搜索进行了搜索。
  • 我在GitHub上搜索了一个类似的问题,但没有找到它。
  • 我确信这是LangChain中的一个错误,而不是我的代码。
  • 该错误未通过更新到LangChain的最新稳定版本(或特定集成包)得到解决。

示例代码

llm =build_llm(load_model_from="azure")
type(llm)# Outputs: langchain_community.chat_models.azureml_endpoint.AzureMLChatOnlineEndpoint
llm.invoke("Hallo") # Outputs: BaseMessage(content='Hallo! Wie kann ich Ihnen helfen?', type='assistant', id='run-f606d912-b21f-4c0c-861d-9338fa001724-0')

from backend_functions.langgraph_rag_workflow import create_workflow_app
from backend_functions.rag_functions import serialize_documents
from langchain_core.messages import HumanMessage
import json

question = "Hello, who are you?"
thread_id = "id_1"
model_type_for_astream_event = "chat_model"
    
chain = create_workflow_app(retriever=retriever, model=llm)
input_message = HumanMessage(content=question)
config = {
        "configurable": {"thread_id": thread_id}, #for every user, a different thread_id should be selected
    }
    #print(f"Updated State from previous question: {chain.get_state(config).values}")
async for event in chain.astream_events(
    #{"messages":  [input_message]},
     {"messages":  question}, #test für azure
    version="v1",
    config=config
):
    print(event)
    if event["event"] == f"on_{model_type_for_astream_event}_start" and event.get("metadata", {}).get("langgraph_node") == "generate":
        print("Stream started...")
        if model_type_for_astream_event == "llm":
            prompt_length = len(event["data"]["input"]["prompts"][0])
        else:
            prompt_length= len(event["data"]["input"]["messages"][0][0].content)
        print(f'data: {json.dumps({"type": "prompt_length_characters", "content": prompt_length})}\n\n')
        print(f'data: {json.dumps({"type": "prompt_length_tokens", "content": prompt_length / 4})}\n\n')
    if event["event"] == f"on_{model_type_for_astream_event}_stream" and event.get("metadata", {}).get("langgraph_node") == "generate":
        if model_type_for_astream_event == "llm":
            chunks = event["data"]['chunk']
        else:
            chunks = event["data"]['chunk'].content
        print(f'data: {json.dumps({"type": "chunk", "content": chunks})}\n\n')
    elif event["event"] == "on_chain_end" and event.get("metadata", {}).get("langgraph_node") == "format_docs" and event["name"] == "format_docs":
        retrieved_docs = event["data"]["input"]["raw_docs"]
        serialized_docs = serialize_documents(retrieved_docs)
        print(f'data: {{"type": "docs", "content": {serialized_docs}}}\n\n')

# 以下是示例代码块

错误信息和堆栈跟踪(如果适用)

APIStatusError Traceback (most recent call last)
/Users/mweissenba001/Documents/GitHub/fastapi_rag_demo/test.ipynb Line 49 Trace:
12 config = {
13 "configurable": {"thread_id": thread_id}, #for every user, a different thread_id should be selected
14 }
---> 16 async for event in chain.astream_events(
17 #{"messages": [input_message]},
18 {"questions": question}, #test für azure
19 version="v1",
20 config=config
21 ):
22 print(event)
23 if event["event"] == f"on_{model_type_for_astream_event}_start" and event.get("metadata", {}).get("langgraph_node") == "generate":
File ~/anaconda3/lib/python3.11/site-packages/langchain_core/runnables/base.py:1246 , in Runnable.astream_events(self, input, config, version, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
-> 246 async for event in event_stream:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^58 async for chunk in output:
File ~/anaconda3/lib/python3.11/site-packages/langchain_core/tracers/event_stream.py:778 , in _astream_log_implementation(runnable, input, config, stream, diff, with_streamed_output_list, **kwargs)
-> 778 async for log in _astream_log_implementation( # type: ignore[misc]
779 runnable,
800 input,
801 config=config,
802 stream=stream,
803 diff=True,
804 with_streamed_output_list=True,
805 **kwargs,
806 ):
807 run_log = run_log + log
809 if not encountered_start_event:
File ~/anaconda3/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py:670 , in _astream_log_implementation..consume_astream()
-> 824 async for chunk in runnable.astream(input, config, **kwargs):
File ~/anaconda3/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py:624 , in Pregel.astream(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug)
-> 1333 del fut, task
-> 1335 # panic on failure or timeout
-> 1336 _panic_or_proceed(done, inflight, step)
-> 1337 # don't keep futures around in memory longer than needed
-> 1338 del done, inflight, futures
File ~/anaconda3/lib/python3.11/site-packages/langgraph/pregel/init.py:1336 , in arun_with_retry(task, retry_policy, stream)
-> 115 # run the task
-> 116 async for _ in task.proc.astream(task.input, task.config):
File ~/anaconda3/lib/python3.11/site-packages/langgraph/pregel/init.py:1540 , in RunnableSequence.astream(self, input, config, **kwargs)
-> 3275 async def input_aiter() -> AsyncIterator[Input]:
-> 3278 async for chunk in self.atransform(input_aiter(), config, **kwargs):
-> 3279 yield chunk
File ~/anaconda3/lib/python3.11/site-packages/langgraph/pregel/retry.py:117 , in Runnable.astream(self, input, config) # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get_state(config).values) # 这将打印出当前状态的所有值。async for chunk in self.astream(input): # 将此行更改为以下内容以查看输出结果:print(chain.get

260 if run_id != self.root_id:
261 # 如果我们无法找到这个运行,就静默地忽略它
262 # eg. 因为这个运行没有被包含在日志中
263 if key := self._key_map_by_run_id.get(run_id):
-> 264 final_pipeline = step.atransform(final_pipeline, config)
292 else:
-> 293 final_pipeline = step.atransform(final_pipeline, config)
329 else:
-> 330 final_pipeline = step.atransform(final_pipeline, config)
331 async for output in final_pipeline:
-> 332 yield output
856 async def astream(self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> AsyncIterator[Output]:
857 """
858 Default implementation of astream, which calls ainvoke.
859 Subclasses should override this method if they support streaming output. (...)
872 The output of the Runnable.
873 """
-> 874 yield await self.ainvoke(input, config, **kwargs)
115 kwargs["config"] = config
116 if sys.version_info >= (3, 11):
-> 117 ret = await asyncio.create_task(
118 self.afunc(input, **kwargs), context=context
119 )
120 else:
121 ret = await self.afunc(input, **kwargs)
143 got_first_val = False
-> 144 async for ichunk in input:
145 # The default implementation of transform is to buffer input and then call stream.
146 # It'll attempt to gather all input into a single chunk using the xm0n0x operator.
147 # If the input is not addable, then we'll assume that we can only operate on the last chunk,
148 # and we'll iterate until we get to the last chunk.
149 if not got_first_val:
150 final = ichunk
-> 151 final = ichunk
153 async for output in self.astream(final, config, **kwargs):
-> 154 yield output
155 async def astream(self, self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> AsyncIterator[Output]:
156 """
157 Default implementation of astream, which calls ainvoke.
158 Subclasses should override this method if they support streaming output. (...)
159 ...
160 """
-> 161 yield await self.ainvoke(input, config, **kwargs)

从你的描述来看,你在使用Azure ML的模型时遇到了问题。根据你提供的错误信息,问题可能出在on_chat_model_end事件的处理上。你可以尝试在on_chat_model_end事件中添加一些调试信息,以便了解模型结束时发生了什么。

首先,你需要在AsyncStream[ChatCompletionChunk]类中重写on_chat_model_end方法,以便在模型结束时执行一些操作。例如:

class AsyncStream[T]:
    # ... 其他方法 ...

    async def on_chat_model_end(self):
        print("on_chat_model_end called")
        # 在这里添加一些调试信息,例如:
        print("on_chat_model_end received data:", self.data)
        # ... 其他调试信息 ...

然后,确保在调用await self._post()时传入正确的参数。例如:

body = await async_maybe_transform(
    {
        "messages": messages,
        "model": model,
        "frequency_penalty": frequency_penalty,
        "function_call": function_call,
        "functions": functions,
        "logit_bias": logit_bias,
        "logprobs": logprobs,
        "max_tokens": max_tokens,
        "n": n,
        "presence_penalty": presence_penalty,
        "response_format": response_format,
        "seed": seed,
        "stop": stop,
        "stream": stream,
        "temperature": temperature,
        "tool_choice": tool_choice,
        "tools": tools,
        "top_logprobs": top_logprobs,
        "top_p": top_p,
        "user": user,
    },
    completion_create_params.CompletionCreateParams,
)
options = make_request_options(
    extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
)
return await self.request(cast_to=ChatCompletion, opts=options, stream=stream, stream_cls=stream_cls)

这样,当on_chat_model_end事件被触发时,你应该能看到一些调试信息。这将帮助你更好地了解问题所在。如果仍然无法解决问题,请随时提问。

mi7gmzs6

mi7gmzs61#

你能检查一下在异步流中调用模型会发生什么吗?

$x_1^a_0 b_1^x$

7d7tgy0s

7d7tgy0s2#

这导致了相同的错误。值得注意的是,我将"azureml_endpoint.py"更改为使其与Azure专用端点一起工作。如果没有它,专用端点根本无法工作。请参阅此处:#23899(评论)

相关问题