我有一个spark.DataFrame
,有多个时间序列。我想在groupby apply上为每个时间序列应用一个sklearn
模型。对于每个时间序列,该模型需要大约0.05s,但是当我尝试在pandas_udf
中求解这个问题时,它比顺序应用要花费更长的时间。下面是一个示例
def forecaster_spark(data_group: pd.DataFrame):
# index for reports
item_id = data_group["item_id"].iloc[0]
# Indexing by time index
data_group = data_group.set_index(pd.DatetimeIndex(data_group['ds'])).sort_index()
# Here we extract the time series
y = data_group["y"].astype(float)
# transform is a transformation to build regressor features (example extracting lags)
X = transform(y)
model = xg.XGBRegressor(max_depth = 50)
# For each item, the model takes app 0.05s
model.fit(X[:-1],y[-2])
return pd.DataFrame({"item_id": item_id, "pred": model.predict(X.iloc[-1], y[-1])})
这里,transform
是应用于y的转换,用于构建回归矩阵,例如,可以将其视为构建滞后的转换。
然后,我在包含所有数据的spark.DataFrame
上应用此函数
predictions = data.applyInPandas(forecaster_spark, schema="item_id string, y_pred double")
在predictions
上执行操作时(例如toPandas
或show(1000)
),这会花费很多时间。当我在forecaster_spark
上注解model.fit
时(改变pred
返回任何值)它完成得非常快,所以问题是在UDF内部拟合模型。我想可能是使用groupby时 Dataframe 的分区有问题,但是我尝试了许多简单的UDF函数(例如取平均值),结果都很正常。
def test_function_for_udf(data_group: pd.DataFrame):
"""Same function as above, but commenting fit method"""
# index for reports
item_id = data_group["item_id"].iloc[0]
# Indexing by time index
data_group = data_group.set_index(pd.DatetimeIndex(data_group['ds'])).sort_index()
# Here we extract the time series
y = data_group["y"].astype(float)
# transform is a transformation to build regressor features (example extracting lags)
X = transform(y)
model = xg.XGBRegressor(max_depth = 50)
# Note that I commented the fit
# model.fit(X[:-1],y[-2])
return pd.DataFrame({"item_id": item_id, "mean": y.mean()})
2条答案
按热度按时间lokaqttq1#
1.如果你想让它运行得更快,试着翻译成spark pandas而不是panda(当然,你还不清楚是否可以使用spark panda)。
.mapPartitions
。这将允许你将重对象保存在内存中(model = xg.XGBRegressor(max_depth = 50)
),而不是创建它们后丢弃它们--〉UDF中发生的事情。1.如果您将代码改为使用
yield
而不是return
(在.mapParitions中),您也可能会获得更好的性能。这是因为spark使用了惰性迭代器,这有助于缓解JVM内存压力。(这也允许从内存中拆分数据。)这不是一个灵丹妙药,而是一个您可能会从中受益的工具,这可能会根据您的组的大小而受益。nom7f22z2#
您可以使用以下任一工具执行XGBoost模型的分布式 * 训练 *:
xgboost.spark
-可能是最简单的选项,但仍处于试验阶段,需要版本〉=1.7.0sparkdl.xgboost
Databricks有一些关于这方面的有用教程:
如果您需要其他算法,
MLlib
提供了许多最常用算法的分布式版本