我一直在将Azure HTTP触发函数转换为更健壮的函数,这可能需要超过230秒。
我很难将代码划分为函数,不确定如何在我的情况下构建活动、编排器和客户端函数。我真的很感激你能帮我。
google_search模块定义如下:
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging
def calculate_score(link, term):
if term and term in link:
return 100
elif 'xxx' in link and 'yyy' in link:
return 75
elif 'xxx' in link:
return 50
elif link:
return 25
else:
return None
def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
results = []
error_values = {key: 'Error' for key in ['urls', 'score']}
success = True
error_code = 0
for term in tqdm(search_terms):
try:
if term is None:
row = {
'search_item': term,
'urls': [],
'score': []
}
else:
result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items[:num_results]]
scores = [calculate_score(link, term) for link in top_results]
row = {
'search_item': term,
'urls': top_results,
'score': scores
}
logging.info('Search completed successfully')
except Exception as e:
success = False
error_code = 74
row = {'search_item': term,
**error_values}
logging.error(f'An error occurred during calling the Search function. {e}')
results.append(row)
return success, results, error_code
init.py函数:
import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os
def main(req: func.HttpRequest) -> func.HttpResponse:
try:
logging.info('Python HTTP trigger function processed a request.')
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
req_body = req.get_json()
search_terms = req_body.get('search_terms')
num_results = int(req_body.get('num_results', 5))
country_code = req_body.get('country_code', 'uk')
params = req_body.get('params', {})
if not search_terms or not api_key or not cse_id:
logging.error('Missing required parameters')
return func.HttpResponse('Missing required parameters', status_code=400)
success, results, error_code = search(search_terms=search_terms,
num_results=num_results,
country_code=country_code,
api_key=api_key,
cse_id=cse_id)
response_data = {
'success': int(success),
'error_code': int(error_code),
**params,
'results': results
}
response_json = json.dumps(response_data)
logging.info('API Call completed successfully')
return func.HttpResponse(response_json, mimetype='application/json')
except Exception as e:
logging.error(f'An error occurred: {str(e)}')
error_code = 66
response_data = {
'success': 0,
'error_code': int(error_code),
**params
}
response_json = json.dumps(response_data)
return func.HttpResponse(response_json, status_code=500, mimetype='application/json')
示例请求:
{
"search_terms": ["term1", "term2", "term3"],
"num_results": 3,
"params": {
"search_id": "123",
"engine_name": "Google Search"}
}
所需输出示例:
{
"success": 1,
"error_code": 0,
"search_id": "123",
"engine_name": "Google Search",
"results": [
{
"search_item": "term1",
"urls": [
"https://sampleresult3.com",
"https://sampleresult2.com",
"https://sampleresult3.com"
],
"score": [
25,
25,
25
]
},
{
"search_item": "term2",
"urls": [
"https://whatever1.com",
"https://whatever.2.com",
"https://whatever3.com"
],
"score": [
25,
25,
75
]
},
{
"search_item": "term3",
"urls": [
"https://www.link1.com",
"https://link2.com",
"https://www.link3.com"
],
"score": [
25,
25,
25
]
}
]
}
编辑
我尝试了下面的活动功能:
from google_search_scoring_clean import search
import os
def main(search_terms, num_results, country_code):
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
if not search_terms or not api_key or not cse_id:
return False, []
success, results = search(search_terms=search_terms,
num_results=num_results,
country_code=country_code,
api_key=api_key,
cse_id=cse_id)
return success, results
但收到错误消息:结果:失败异常:FunctionLoadError:无法加载ActivityFunction函数:下面的参数是在Python中声明的,而不是在function.json中声明的:© 2019 www.qqq.com版权所有并保留所有权利
将function.json编辑为
{
"bindings": [
{
"name": "search_terms",
"type": "string[]",
"direction": "in"
},
{
"name": "num_results",
"type": "int",
"direction": "in"
},
{
"name": "country_code",
"type": "string",
"direction": "in"
}
]
}
但是,我收到:
The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.
编辑2:
下面的也不行:
import os
from googleapiclient import discovery
import logging
def main(searchTerm: str) -> str:
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
try:
if searchTerm is None:
results = {
'search_term': searchTerm,
'urls': [],
'scores': []
}
else:
result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items]
results = {
'search_term': searchTerm,
'urls': top_results,
}
return results
except Exception as e:
error_values = {key: 'Error' for key in ['urls']}
results = {'search_term': searchTerm, **error_values}
logging.error(f'An error occurred during the search: {e}')
return results
我在function.json中将名称从'name'调整为'searchTerm'。输出为:
{
"name": "Orchestrator",
"instanceId": "4de8cc4818554208ad599e8687ca77a7",
"runtimeStatus": "Running",
"input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
"customStatus": null,
"output": null,
"createdTime": "2023-05-31T10:37:24Z",
"lastUpdatedTime": "2023-05-31T10:37:24Z"
}
编辑3:它与以下调整工作
1.在Activity Function的function.json中,我将'name'更改为activityVar -不知何故,它不接受activity_var名称,不知道为什么
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "activityVar",
"type": "activityTrigger",
"direction": "in"
}
]
}
协调器功能:
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms= (requestBody['search_terms'])
print("Orchestrator " + str(search_terms))
tasks = []
for search_term in search_terms:
activity_var = {}
activity_var['search_term'] = search_term
activity_var['num_results'] = requestBody['num_results']
activity_var['params'] = requestBody['params']
print(activity_var)
tasks.append(context.call_activity("ActivityFunction", activity_var))
results = yield context.task_all(tasks)
return results
main = df.Orchestrator.create(orchestrator_function)
其中我的活动函数文件夹名为“ActivityFunction”。
现在的活动函数,因为我必须美化它:
import os
from googleapiclient import discovery
import logging
def main(activityVar: dict) -> dict:
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
try:
if activityVar['search_term'] is None:
results = {
'search_term': activityVar['search_term'],
'urls': [],
'scores': []
}
else:
result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items]
results = {
'search_term': activityVar['search_term'],
'urls': top_results,
}
return results
except Exception as e:
error_values = {key: 'Error' for key in ['urls']}
results = {'search_term': activityVar['search_term'], **error_values}
logging.error(f'An error occurred during the search: {e}')
return results
天啊,今天真是漫长的一天。我得再想想。
1条答案
按热度按时间5jvtdoz21#
您需要遵循的模式是扇出扇入模式。我不会为你写完整的代码,但你可以按照here的例子。我下面的回复应该可以指导你编写你需要的代码。
其目的是将搜索项列表拆分为单独的变量,以便您可以触发多个活动函数,并且每个函数都可以独立地搜索单个变量。由于这些活动函数不是http触发的函数,它们可以超过230s的限制。
你的http触发函数看起来像这样。它需要将请求主体传递到编排器中,以便您可以在调用活动函数之前将搜索项拆分到那里。
Orchestrator现在将以字典的形式重新创建主体,并将其作为变量传递给活动函数。唯一的区别是,每个活动功能将仅接收1个搜索词。您将在结果中得到一个列表,您可以在返回响应之前将其格式化为您需要的内容。
最后,activity函数将包含执行搜索的主逻辑,并返回单个搜索词的结果。
需要记住的一点是,由于整个过程是异步的,当您调用http启动器函数时,您将立即返回一个链接字典,而实际的过程在后台运行。您需要在“statusQueryGetUri”链接上实现某种轮询,以固定或指数回退间隔来获取执行状态。一旦结果被设置为“Completed”,您将在“output”变量中找到结果。
下面是调用“statusQueryGetUri”链接的示例。