promptflow [BUG] 使用PFClient.run与AsyncIterator结合的流程引发TypeError:无法pickle '_thread.lock'对象

yqkkidmi  于 2个月前  发布在  其他
关注(0)|答案(2)|浏览(50)

描述bug

如果流程调用产生的AsyncIterator最终被promptflow.tracing.TracedAsyncIterator Package ,批处理运行将无法成功完成。从数据文件中的每个运行都会因TypeError: cannot pickle 'thread.lock' object而出错。

如何重现bug

以下代码始终产生错误:

import tempfile

from openai import AzureOpenAI
from promptflow.tracing import trace
from promptflow.core import AzureOpenAIModelConfiguration, Prompty
from promptflow.client import PFClient

class ChatFlow:
    def __init__(
        self, model_config: AzureOpenAIModelConfiguration
    ):
        self.model_config = model_config

    @trace
    async def __call__(
        self,
        topic: str,
    ) -> str:
        """Flow entry function."""

        client = AzureOpenAI(
            azure_endpoint=self.model_config.azure_endpoint,
            api_key=self.model_config.api_key,
            api_version=self.model_config.api_version,
        )
        
        response = client.chat.completions.create(
            model=self.model_config.azure_deployment,
            messages = [
                {"role": "system", "content": "Create a story about the topic provided by the user"},
                {"role": "user", "content": f"Tell me a story about {topic}"},
            ],
            max_tokens=150,
        )

        for chunk in response:
            if len(chunk.choices) > 0 and (message := chunk.choices[0].message):
                content = message.content
                yield content + "\n"

def main():
    f = tempfile.NamedTemporaryFile(suffix=".csv", mode="w+t")
    try:
        f.write("topic\nlittle league\n")
        f.seek(0)
	    config = AzureOpenAIModelConfiguration(
	        connection="aoai_connection", azure_deployment="gpt-35-turbo"
	    )
        chat_flow = ChatFlow(model_config=config)
        result = PFClient().run(chat_flow, data=f.name)
    finally:
        f.delete()

if __name__ == "__main__":
    main()

错误可以追溯到run_info提交到队列的位置:
promptflow/src/promptflow-core/promptflow/storage/_queue_run_storage.py的第24行:
| | self.queue.put(run_info) |
对于示例代码,run_info中的结果属性具有一个promptflow.tracing.TracedAsyncIterator示例,当将其提交到多进程队列时,无法对其进行pickle操作,从而引发了上述提到的错误。
批处理运行的错误文件附加如下:
error.json

预期行为

批处理运行的成功执行

运行信息(请填写以下信息):

  • 使用pf -v的Promptflow包版本:1.12.0
  • 操作系统:macOS Sonoma 14.5
  • 使用python --version的Python版本:3.12.2
oyjwcjzk

oyjwcjzk1#

你好,@bwilliams2 ,看起来你在完成API中忘记将"streaming"设置为true,并且遍历结果的方法不正确。请尝试将你的代码更改为以下内容:

response = client.chat.completions.create(
            model="gpt-35-turbo",
            messages=[
                {"role": "system", "content": "Create a story about the topic provided by the user"},
                {"role": "user", "content": f"Tell me a story about {topic}"},
            ],
            max_tokens=150,
            stream=True
        )

        for chunk in response:
            print(f"chunk: {chunk}")
            if len(chunk.choices) > 0 and (message := chunk.choices[0].delta.content):
                yield str(message)
vqlkdk9b

vqlkdk9b2#

我更新了这些更改,但仍然得到相同的错误。由于原始问题中的pickling错误,我认为该函数从未执行过。似乎promptflow批处理运行不能与流式流程一起使用。

相关问题