如何在Python中将HTTP触发器Azure函数转换为Azure持久函数?

cygmwpex  于 2023-06-07  发布在  Python
关注(0)|答案(1)|浏览(192)

我一直在将Azure HTTP触发函数转换为更健壮的函数,这可能需要超过230秒。
我很难将代码划分为函数,不确定如何在我的情况下构建活动、编排器和客户端函数。我真的很感激你能帮我。
google_search模块定义如下:

from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging

def calculate_score(link, term):
    if term and term in link:
        return 100
    elif 'xxx' in link and 'yyy' in link:
        return 75
    elif 'xxx' in link:
        return 50
    elif link:
        return 25
    else:
        return None

def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
    results = []
    error_values = {key: 'Error' for key in ['urls', 'score']}
    success = True
    error_code = 0
    for term in tqdm(search_terms):
        try:
            if term is None:
                row = {
                    'search_item': term,
                    'urls': [],
                    'score': []
                }
            else:
                result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
                items = result.get('items', [])
                top_results = [item.get('link') for item in items[:num_results]]
                scores = [calculate_score(link, term) for link in top_results]

                row = {
                    'search_item': term,
                    'urls': top_results,
                    'score': scores
                }
                logging.info('Search completed successfully')
        except Exception as e:
            success = False
            error_code = 74
            row = {'search_item': term, 
                   **error_values}
            logging.error(f'An error occurred during calling the Search function. {e}')

        results.append(row)

    return success, results, error_code

init.py函数:

import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os

def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        logging.info('Python HTTP trigger function processed a request.')
        api_key = os.getenv('apikey')
        cse_id = os.getenv('cseid')
        req_body = req.get_json()
        search_terms = req_body.get('search_terms')
        num_results = int(req_body.get('num_results', 5))
        country_code = req_body.get('country_code', 'uk')
        params = req_body.get('params', {})

        if not search_terms or not api_key or not cse_id:
            logging.error('Missing required parameters')
            return func.HttpResponse('Missing required parameters', status_code=400)

        success, results, error_code = search(search_terms=search_terms,
                                              num_results=num_results,
                                              country_code=country_code,
                                              api_key=api_key,
                                              cse_id=cse_id)

        response_data = {
            'success': int(success),
            'error_code': int(error_code),
            **params,
            'results': results
        }
        response_json = json.dumps(response_data)

        logging.info('API Call completed successfully')
        return func.HttpResponse(response_json, mimetype='application/json')
    
    except Exception as e:
        logging.error(f'An error occurred: {str(e)}')
        error_code = 66
        response_data = {
            'success': 0,
            'error_code': int(error_code),
            **params
        }
        response_json = json.dumps(response_data)
        return func.HttpResponse(response_json, status_code=500, mimetype='application/json')

示例请求:

{
  "search_terms": ["term1", "term2", "term3"],
  "num_results": 3,
  "params": {
    "search_id": "123",
    "engine_name": "Google Search"}   
}

所需输出示例:

{
    "success": 1,
    "error_code": 0,
    "search_id": "123",
    "engine_name": "Google Search",
    "results": [
        {
            "search_item": "term1",
            "urls": [
                "https://sampleresult3.com",
                "https://sampleresult2.com",
                "https://sampleresult3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        },
        {
            "search_item": "term2",
            "urls": [
                "https://whatever1.com",
                "https://whatever.2.com",
                "https://whatever3.com"
            ],
            "score": [
                25,
                25,
                75
            ]
        },
        {
            "search_item": "term3",
            "urls": [
                "https://www.link1.com",
                "https://link2.com",
                "https://www.link3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        }
    ]
}

编辑
我尝试了下面的活动功能:

from google_search_scoring_clean import search
import os

def main(search_terms, num_results, country_code):
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    if not search_terms or not api_key or not cse_id:
        return False, []

    success, results = search(search_terms=search_terms,
                              num_results=num_results,
                              country_code=country_code,
                              api_key=api_key,
                              cse_id=cse_id)

    return success, results

但收到错误消息:结果:失败异常:FunctionLoadError:无法加载ActivityFunction函数:下面的参数是在Python中声明的,而不是在function.json中声明的:© 2019 www.qqq.com版权所有并保留所有权利
将function.json编辑为

{
  "bindings": [
    {
      "name": "search_terms",
      "type": "string[]",
      "direction": "in"
    },
    {
      "name": "num_results",
      "type": "int",
      "direction": "in"
    },
    {
      "name": "country_code",
      "type": "string",
      "direction": "in"
    }
  ]
}

但是,我收到:

The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.

编辑2:
下面的也不行:

import os
from googleapiclient import discovery
import logging

def main(searchTerm: str) -> str:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if searchTerm is None:
            results = {
                'search_term': searchTerm,
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': searchTerm,
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': searchTerm, **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

我在function.json中将名称从'name'调整为'searchTerm'。输出为:

{
    "name": "Orchestrator",
    "instanceId": "4de8cc4818554208ad599e8687ca77a7",
    "runtimeStatus": "Running",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": null,
    "createdTime": "2023-05-31T10:37:24Z",
    "lastUpdatedTime": "2023-05-31T10:37:24Z"
}

编辑3:它与以下调整工作
1.在Activity Function的function.json中,我将'name'更改为activityVar -不知何故,它不接受activity_var名称,不知道为什么

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "activityVar",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

协调器功能:

import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("ActivityFunction", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

其中我的活动函数文件夹名为“ActivityFunction”。
现在的活动函数,因为我必须美化它:

import os
from googleapiclient import discovery
import logging

def main(activityVar: dict) -> dict:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if activityVar['search_term'] is None:
            results = {
                'search_term': activityVar['search_term'],
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': activityVar['search_term'],
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': activityVar['search_term'], **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

天啊,今天真是漫长的一天。我得再想想。

5jvtdoz2

5jvtdoz21#

您需要遵循的模式是扇出扇入模式。我不会为你写完整的代码,但你可以按照here的例子。我下面的回复应该可以指导你编写你需要的代码。
其目的是将搜索项列表拆分为单独的变量,以便您可以触发多个活动函数,并且每个函数都可以独立地搜索单个变量。由于这些活动函数不是http触发的函数,它们可以超过230s的限制。
你的http触发函数看起来像这样。它需要将请求主体传递到编排器中,以便您可以在调用活动函数之前将搜索项拆分到那里。

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    requestBody = json.loads(req.get_body().decode())
    instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)

    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Orchestrator现在将以字典的形式重新创建主体,并将其作为变量传递给活动函数。唯一的区别是,每个活动功能将仅接收1个搜索词。您将在结果中得到一个列表,您可以在返回响应之前将其格式化为您需要的内容。

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("Activity", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

最后,activity函数将包含执行搜索的主逻辑,并返回单个搜索词的结果。
需要记住的一点是,由于整个过程是异步的,当您调用http启动器函数时,您将立即返回一个链接字典,而实际的过程在后台运行。您需要在“statusQueryGetUri”链接上实现某种轮询,以固定或指数回退间隔来获取执行状态。一旦结果被设置为“Completed”,您将在“output”变量中找到结果。
下面是调用“statusQueryGetUri”链接的示例。

{
    "name": "Orchestrator1",
    "instanceId": "1a98f11135494cf88fa1d3241b8cc4f3",
    "runtimeStatus": "Completed",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": [
        "Hello {'search_term': 'term1', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
        "Hello {'search_term': 'term2', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
        "Hello {'search_term': 'term3', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!"
    ],
    "createdTime": "2023-05-30T12:35:22Z",
    "lastUpdatedTime": "2023-05-30T12:35:24Z"
}

相关问题