azure 使用持久函数序列化类型时的传输大小

qxsslcnc  于 2023-10-22  发布在  其他
关注(0)|答案(1)|浏览(119)

我想检查持久功能中的业务流程和活动功能之间交换的数据大小。
我想创建代码将5MB Dataframe 转换为字典类型并检查其大小,如以下代码所示。然而,当我实际运行它时,它返回一个值216,并确认它是216字节。如果我将100010005部分更改为其他数字,它仍然返回216。
为什么是216?另外,在将其转换为可序列化类型后,如何修改它以检查大小?

import azure.functions as func
import azure.durable_functions as df
import logging
import numpy as np
import pandas as pd

# Client function is irrelevant
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:

    instance_id = await client.start_new("orchestrator", None, {})
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)

    status = await client.get_status(instance_id)

    runtime = status.runtime_status
    output = status.output
    return f"runtime: {runtime}\n\noutput:{output}" 

@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext):
    result  =  yield context.call_activity("activity1", "") # Receive size of dictionary type
    return  result

@app.activity_trigger(input_name="blank")
def  activity1(blank: str) -> str:
    data  =  np.random.rand(1024*1024*5) # Create random data
    df  =  pd.DataFrame(data) # Data frame creation
    df_  =  df.to_dict()
    return df_.__sizeof__()  # Check the size of dictionary types
guz6ccqo

guz6ccqo1#

sys.getsizeof() function返回内存中对象的大小,而不是序列化为字节时的大小。这表明它计算Python对象的大小,包括Python内部数据结构和引用的开销。

您可以使用pickle模块,用于序列化np.random.rand(1024 * 1024 * 5)产生的数据。接下来,通过使用len函数测量序列化数据的长度来计算数据的字节大小

我的function_app.py:-

import sys
import azure.functions as func
import azure.durable_functions as df
import logging
import numpy as np
import pandas as pd
import pickle

# Client function is irrelevant
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:

    instance_id = await client.start_new("orchestrator", None, {})
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)

    status = await client.get_status(instance_id)

    runtime = status.runtime_status
    output = status.output
    return f"runtime: {runtime}\n\noutput:{output}" 

@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext):
    result  =  yield context.call_activity("activity1", "") # Receive size of dictionary type
    return  result

@app.activity_trigger(input_name="blank")
def activity1(blank: str) -> str:
    data = np.random.rand(1024 * 1024 * 5)  # Create random data
    serialized_data = pickle.dumps(data)  # Serialize the data
    serialized_size = len(serialized_data)  # Measure the size in bytes
    return str(serialized_size)

输出:-

runtime: OrchestrationRuntimeStatus.Completed

output:41943202

相关问题