pyspark 在Azure数据块Spark上使用Pandas UDF并行运行MLflow项目

dgenwo3n  于 2023-01-25  发布在  Spark
关注(0)|答案(1)|浏览(163)

我正在尝试使用Spark on Azure Databricks并行化多个时间序列的训练
除了培训,我还想使用MLflow记录指标和模型
代码结构非常简单(基本上采用了this example)。
1.数据块笔记本触发MLflow项目

mlflow.run(
    uri="/dbfs/mlflow-project",
    parameters={"data_path": "dbfs:/data/", "experiment_name": "test"}, 
    experiment_id=575501044793272,
    use_conda=False,
    backend="databricks",
    backend_config={
        "new_cluster": {
            "spark_version": "9.1.x-cpu-ml-scala2.12",
            "num_workers": 8,
            "node_type_id": "Standard_DS4_v2",
        },
        "libraries": [{"pypi": {"package": "pyarrow"}}]
    },
    synchronous=False
)

1.* * 主函数叫做**,它主要执行三个步骤:
1.读取由提供的 * data_path * 指示的增量表
1.定义触发MLflow项目 "列车进入" 的功能
1.将此函数作为Pandas UDF应用于Spark数据框
下面是代码:

sc = sparkContext('local')
spark = SparkSession(sc)

@click.argument("data_path")
@click.argument("experiment_name")
def run(data_path: str, experiment_name: str):
            
    df = spark.read.format("delta").load(f"{data_path}")
    result_schema = StructType([StructField("key", StringType())])

    def forecast(data: pd.DataFrame) -> pd.DataFrame:
        child_run = client.create_run(
            experiment_id=experiment,
            tags={MLFLOW_PARENT_RUN_ID: parent_run_id},
        )
        p = mlflow.projects.run(
            run_id=child_run.info.run_id, 
            uri=".",
            entry_points="train",
            parameters={"data": data.to_json(), "run_id": child_run.info.run_id}, 
            experiment_id=experiment,
            backend="local",
            usa_conda=False,
            synchronous=False,
        )

        # Just a placeholder to use pandas UDF
        out = pd.DataFrame(data={"key": ["1"]})
        return out

    client = MLflowClient()
    experiment_path = f"/mlflow/experiments/{experiment_name}"
    experiment = client.create_experiment(experiment_path)

    parent_run = client.create_run(experiment_id=experiment)
    parent_run_id = parent_run.run_id

    # Apply pandas UDF (count() used just to avoid lazy evaluation)
    df.groupBy("key").applyInPandas(forecast, result_schema).count()

1.* * train函数在每个键上调用**。
这基本上为每个时间序列(即,为每个键)训练Prophet模型,其记录参数和模型两者。

    • 从cluster stderr和stdout中,我可以看到PandasUDF被正确应用了**,因为它基于"key"列正确地划分了整个数据,即一次处理一个时间序列。

问题是监视群集使用情况时只使用了一个节点,即驱动程序节点:工作没有分配给可用的工作者,尽管panda UDF看起来应用正确。
问题出在哪里?我能否提供更多细节?
先谢谢你,马特奥

6jygbczu

6jygbczu1#

看起来你需要重新划分输入 Dataframe 。否则spark将看到一个单独的分区 Dataframe ,并将相应地进行处理。

相关问题