我的问题有点复杂。让我解释一下:
我正在开发一种迭代变量选择方法。在每个迭代中,变量选择器从给定给模型的列车数据的变量集合中提出n个不同的变量子集。
这就是为什么我决定通过pyspark上的udf对每个子集进行这些评估,因为它们是非常昂贵的过程,包括从主训练集中训练和测试模型。
我的代码如下(我想 test_data
筛选数据集 X
,在 X(0, 0.7]
并用 X(0.7, 1.0]
公制单位为 metric
列):
def fitness(self, vectors, X):
df_vectors = spark.createDataFrame(
pd.DataFrame(
data=vectors, columns=[f"var_{x}" for x in range(vectors.shape[1])]
)
)
df = VectorAssembler(
inputCols=[x for x in df_vectors.columns if "var_" in x], outputCol="vars_sel",
).transform(df_vectors)
@udf(returnType=ArrayType(FloatType()))
def vector_to_array(v):
# convert column of vectors into column of arrays
a = v.values.tolist()
return a
df = (
df.withColumn("vars_array", vector_to_array("vars_sel"))
.drop("vars_sel")
.withColumn("featuresCol", lit(self.featuresCol))
.withColumn("labelCol", lit(self.labelCol))
.withColumn("metric", lit(self.metric))
.withColumn("fitness", lit(0.0))
)
@pandas_udf(df.schema, functionType=PandasUDFType.GROUPED_MAP)
def test_data(pdf):
df_to_return = pdf.copy()
df_to_return["fitness"] = 1.0
return df_to_return
returns = df.groupBy("vars_array").apply(test_data)
return
如您所见,我已经设法将选择向量和post模型所需的其他附加数据传递给了udf函数,但是我想不出如何将主训练集传递给udf,以便在udf中根据选择向量过滤训练集变量,对提出的模型进行了训练和预测。
我曾考虑过将主训练集保存到磁盘上,以便通过udf读取,但除了它对我来说非常慢之外,我认为它在并发运行这个变量选择器对象时可能会导致问题。
暂无答案!
目前还没有任何答案,快来回答吧!