pyspark变量选择与udf

jq6vz3qz  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(313)

我的问题有点复杂。让我解释一下:
我正在开发一种迭代变量选择方法。在每个迭代中,变量选择器从给定给模型的列车数据的变量集合中提出n个不同的变量子集。
这就是为什么我决定通过pyspark上的udf对每个子集进行这些评估,因为它们是非常昂贵的过程,包括从主训练集中训练和测试模型。
我的代码如下(我想 test_data 筛选数据集 X ,在 X(0, 0.7] 并用 X(0.7, 1.0] 公制单位为 metric 列):

  1. def fitness(self, vectors, X):
  2. df_vectors = spark.createDataFrame(
  3. pd.DataFrame(
  4. data=vectors, columns=[f"var_{x}" for x in range(vectors.shape[1])]
  5. )
  6. )
  7. df = VectorAssembler(
  8. inputCols=[x for x in df_vectors.columns if "var_" in x], outputCol="vars_sel",
  9. ).transform(df_vectors)
  10. @udf(returnType=ArrayType(FloatType()))
  11. def vector_to_array(v):
  12. # convert column of vectors into column of arrays
  13. a = v.values.tolist()
  14. return a
  15. df = (
  16. df.withColumn("vars_array", vector_to_array("vars_sel"))
  17. .drop("vars_sel")
  18. .withColumn("featuresCol", lit(self.featuresCol))
  19. .withColumn("labelCol", lit(self.labelCol))
  20. .withColumn("metric", lit(self.metric))
  21. .withColumn("fitness", lit(0.0))
  22. )
  23. @pandas_udf(df.schema, functionType=PandasUDFType.GROUPED_MAP)
  24. def test_data(pdf):
  25. df_to_return = pdf.copy()
  26. df_to_return["fitness"] = 1.0
  27. return df_to_return
  28. returns = df.groupBy("vars_array").apply(test_data)
  29. return

如您所见,我已经设法将选择向量和post模型所需的其他附加数据传递给了udf函数,但是我想不出如何将主训练集传递给udf,以便在udf中根据选择向量过滤训练集变量,对提出的模型进行了训练和预测。
我曾考虑过将主训练集保存到磁盘上,以便通过udf读取,但除了它对我来说非常慢之外,我认为它在并发运行这个变量选择器对象时可能会导致问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题