在PandasUDF内拟合sklearn模型花费的时间太长-PYSPARK-

dw1jzc5e  于 2023-01-25  发布在  Spark
关注(0)|答案(2)|浏览(173)

我有一个spark.DataFrame,有多个时间序列。我想在groupby apply上为每个时间序列应用一个sklearn模型。对于每个时间序列,该模型需要大约0.05s,但是当我尝试在pandas_udf中求解这个问题时,它比顺序应用要花费更长的时间。下面是一个示例

def forecaster_spark(data_group: pd.DataFrame):
    # index for reports
    item_id = data_group["item_id"].iloc[0]
    # Indexing by time index
    data_group = data_group.set_index(pd.DatetimeIndex(data_group['ds'])).sort_index()
    # Here we extract the time series
    y = data_group["y"].astype(float)
    # transform is a transformation to build regressor features (example extracting lags)
    X = transform(y)
    model = xg.XGBRegressor(max_depth = 50)
    # For each item, the model takes app 0.05s
    model.fit(X[:-1],y[-2])
    return pd.DataFrame({"item_id": item_id, "pred": model.predict(X.iloc[-1], y[-1])})

这里,transform是应用于y的转换,用于构建回归矩阵,例如,可以将其视为构建滞后的转换。
然后,我在包含所有数据的spark.DataFrame上应用此函数

predictions = data.applyInPandas(forecaster_spark, schema="item_id string, y_pred double")

predictions上执行操作时(例如toPandasshow(1000)),这会花费很多时间。当我在forecaster_spark上注解model.fit时(改变pred返回任何值)它完成得非常快,所以问题是在UDF内部拟合模型。我想可能是使用groupby时 Dataframe 的分区有问题,但是我尝试了许多简单的UDF函数(例如取平均值),结果都很正常。

def test_function_for_udf(data_group: pd.DataFrame):
    """Same function as above, but commenting fit method"""
    # index for reports
    item_id = data_group["item_id"].iloc[0]
    # Indexing by time index
    data_group = data_group.set_index(pd.DatetimeIndex(data_group['ds'])).sort_index()
    # Here we extract the time series
    y = data_group["y"].astype(float)
    # transform is a transformation to build regressor features (example extracting lags)
    X = transform(y)
    model = xg.XGBRegressor(max_depth = 50)
    # Note that I commented the fit
    # model.fit(X[:-1],y[-2])
    return pd.DataFrame({"item_id": item_id, "mean": y.mean()})
lokaqttq

lokaqttq1#

1.如果你想让它运行得更快,试着翻译成spark pandas而不是panda(当然,你还不清楚是否可以使用spark panda)。

  1. UDF通常执行得不好。(它们以前不被矢量化,但最近有了一些工作。)如果你想要更好的性能,考虑将其重写为.mapPartitions。这将允许你将重对象保存在内存中(model = xg.XGBRegressor(max_depth = 50)),而不是创建它们后丢弃它们--〉UDF中发生的事情。
    1.如果您将代码改为使用yield而不是return(在.mapParitions中),您也可能会获得更好的性能。这是因为spark使用了惰性迭代器,这有助于缓解JVM内存压力。(这也允许从内存中拆分数据。)这不是一个灵丹妙药,而是一个您可能会从中受益的工具,这可能会根据您的组的大小而受益。
nom7f22z

nom7f22z2#

您可以使用以下任一工具执行XGBoost模型的分布式 * 训练 *:

  • xgboost.spark-可能是最简单的选项,但仍处于试验阶段,需要版本〉=1.7.0
  • sparkdl.xgboost

Databricks有一些关于这方面的有用教程:

如果您需要其他算法,MLlib提供了许多最常用算法的分布式版本

相关问题