azure 如何使用持久功能输出到Blob服务

q1qsirdb  于 2023-10-22  发布在  其他
关注(0)|答案(1)|浏览(116)

我是Azure函数和持久函数的新手。我想写一段代码,由orchestrator函数调用的activity函数将计算一些值并连接到Blob服务。
如何修改下面的代码,将activity函数生成的值输出到Blob服务?
我使用的环境是,

  • Azure函数,持久函数
  • Visual Studio Code
  • Python编程模型v2

以下是我正在考虑使用的简化代码。请修改这个。顺便说一句,客户端功能和编排功能可能彼此没有太多关系

import  azure.functions  as  func
import  azure.durable_functions  as  df
import  logging  

app  =  df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)  

### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async  def  client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id  =  await  client.start_new("orchestrator", None, {})
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    await  client.wait_for_completion_or_create_check_status_response(req, instance_id)
    status  =  await  client.get_status(instance_id)
    return  f"output: {status.output}"  

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext) -> dict:
    test  =  yield  context.call_activity("main", "")
    return {"Test": test} 

### activity function ###
@app.blob_output(arg_name="outputblob",
                path="newblob/test.txt",
                connection="BlobStorageConnection")

@app.activity_trigger(input_name="blank")
def  main(blank: str, outputblob: func.Out[str]):
    string  =  "Data is successfully Inserted"
    logging.info(f'Python Queue trigger function processed {len(string)} bytes')
    outputblob.set(string)
    return  "Completed"

local_setting.json

{
  "IsEncrypted": false,
  "Values": {
  "AzureWebJobsStorage": "UseDevelopmentStorage=true",
  "FUNCTIONS_WORKER_RUNTIME": "python",
  "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
  "BlobStorageConnection": "DefaultEndpointsProtocol=https;AccountNa***"
  }
}

错误消息enter image description here

wnavrhmk

wnavrhmk1#

  • 我使用下面的代码将activity函数生成的值写入blob存储。*
    function_app.py
import  azure.functions  as  func
import  azure.durable_functions  as  df
import  logging  

app  =  df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)  

### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async  def  client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
instance_id  =  await  client.start_new("orchestrator", None, {})
logging.info(f"Started orchestration with ID = '{instance_id}'.")
await  client.wait_for_completion_or_create_check_status_response(req, instance_id)
status  =  await  client.get_status(instance_id)
return  f"output: {status.output}"  

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext) -> dict:
test  =  yield  context.call_activity("main", "")
return {"Test": test} 

### activity function ###
# I want to calculate the value and output it to the bound service.
@app.blob_output(arg_name="outputblob",
path="newblob/test.txt",
connection="BlobStorageConnection")

@app.activity_trigger(input_name="blank")
def  main(blank: str, outputblob: func.Out[str]):
string  =  "Data is successfully Inserted"
logging.info(f'Python Queue trigger function processed {len(string)} bytes')
outputblob.set(string)
return  "Completed"

local.settings.json

{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"BlobStorageConnection": "DefaultEndpointsProtocol=**********"
}
}

获取如下所示的连接字符串的值并将其添加到local.settings中-

Output:

相关问题