将Azure函数中的OpenAI响应保存到Blob存储

rkttyhzu  于 2023-10-22  发布在  其他
关注(0)|答案(1)|浏览(85)

我使用blob触发器模板创建了一个Azure函数,该函数由Azure blob存储中更新的新文件触发。我正在使用Python v2在VScode中创建Azure函数。我的问题是无法将openai结果存储到新的blob文件中。我注意到我的函数没有function.json文件。我不确定这是否是我的问题。我尝试了几次重新部署,仍然没有创建一个function.json。

import azure.functions as func
import logging
import os
import openai
import PyPDF2
import io
openai.api_type = "azure"
openai.api_version = "2023-05-15" 
openai.api_base = os.getenv("OPENAI_API_BASE")  # Your Azure OpenAI resource's endpoint value.
openai.api_key = os.getenv("OPENAI_API_KEY")

app = func.FunctionApp()

@app.blob_trigger(arg_name="myblob", path="mypath/{name}",
                               connection="<My_BLOB_CONNECTION_SETTING>") 
@app.blob_trigger_output(arg_name="outputblob", path="openai-outputs/text.txt",
                                connection="<My_BLOB_CONNECTION_SETTING>")
def blob_trigger(myblob: func.InputStream, outputblob: func.Out[str]) -> str:
    logging.info(f"Python blob trigger function processed blob"
                f"Name: {myblob.name}"
                f"Blob Size: {myblob.length} bytes")
    pdf_reader = PyPDF2.PdfReader(io.BytesIO(myblob.read()))
    text = ""
    for page in pdf_reader.pages:
        text += page.extract_text()

    logging.info(
                f"textyyy: {text}")
    messages= [
       # my prompt
    ]

    response = openai.ChatCompletion.create(
        engine="testing3",
        messages=messages
    )
    
    logging.info(f"response1111: {response}"
                f"response['choices'][0]['message']['content']: {response['choices'][0]['message']['content']}")
    
    outputblob.set(response['choices'][0]['message']['content'])

我按照这个azure documentation写了上面的代码。它开始无法部署到Azure功能门户。

idv4meu8

idv4meu81#

我能够使用此代码将OpenAI的响应保存到blob存储。
我用的是python v2。在python V2中,function.json文件不可用。绑定是使用@app.blob_trigger进行触发器绑定,@app.blob_input进行输入绑定,@app.blob_output进行输出绑定。
PyPDF2软件包用于处理.pdf文件并对文件执行操作。
我使用@app.blob_trigger来触发函数代码,@app.blob_input读取文件内容,@app.blob_output修改文件。

function_app.py

import azure.functions as func
import logging
import os
import openai
openai.api_type = "azure"
openai.api_base = os.getenv("OPENAI_API_BASE")
openai.api_version = "2023-07-01-preview"
openai.api_key = os.getenv("OPENAI_API_KEY")

app = func.FunctionApp()

@app.blob_trigger(arg_name="myblob", path="response",
                               connection="BlobStorageConnectionString")
@app.blob_input(arg_name="inputblob",path="response/hello.txt",connection="BlobStorageConnectionString")
@app.blob_output(arg_name="output",path="chatgpt-conversation/conversation.txt",connection="BlobStorageConnectionString2")
def blob_trigger(myblob: func.InputStream, inputblob: str, output:func.Out[bytes]):
    logging.info(f"Python blob trigger function processed blob"
                f"Name: {myblob.name}"
                f"Blob Size: {myblob.length} bytes")
    
    content= inputblob
    logging.info(f"Contentof the file is : {content}")

    ai_responses=[]
    response = openai.ChatCompletion.create(
        engine="test_chatgpt",
        messages=[
            {"role": "system", "content": "You are an AI assistant that helps people find information."},
            {"role":"user","content": content}
        ])
    ai_responses.append(f"Input: {content}\nResponse: {response['choices'][0]['message']['content']}\n\n")
    logging.info(ai_responses)
    output.set("\n".join(ai_responses))

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    "OPENAI_API_BASE":"https://<xxxxxx>.openai.azure.com/",
    "OPENAI_API_KEY": "xxxxxxxxxxxxxxxxxxxx",
    "BlobStorageConnectionString":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "BlobStorageConnectionString2":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  }
}

Input

Output

相关问题