pyspark变量选择与udf

jq6vz3qz  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(283)

我的问题有点复杂。让我解释一下:
我正在开发一种迭代变量选择方法。在每个迭代中,变量选择器从给定给模型的列车数据的变量集合中提出n个不同的变量子集。
这就是为什么我决定通过pyspark上的udf对每个子集进行这些评估,因为它们是非常昂贵的过程,包括从主训练集中训练和测试模型。
我的代码如下(我想 test_data 筛选数据集 X ,在 X(0, 0.7] 并用 X(0.7, 1.0] 公制单位为 metric 列):

def fitness(self, vectors, X):
    df_vectors = spark.createDataFrame(
        pd.DataFrame(
            data=vectors, columns=[f"var_{x}" for x in range(vectors.shape[1])]
        )
    )

    df = VectorAssembler(
        inputCols=[x for x in df_vectors.columns if "var_" in x], outputCol="vars_sel",
    ).transform(df_vectors)

    @udf(returnType=ArrayType(FloatType()))
    def vector_to_array(v):
        # convert column of vectors into column of arrays
        a = v.values.tolist()
        return a

    df = (
        df.withColumn("vars_array", vector_to_array("vars_sel"))
        .drop("vars_sel")
        .withColumn("featuresCol", lit(self.featuresCol))
        .withColumn("labelCol", lit(self.labelCol))
        .withColumn("metric", lit(self.metric))
        .withColumn("fitness", lit(0.0))
    )

    @pandas_udf(df.schema, functionType=PandasUDFType.GROUPED_MAP)
    def test_data(pdf):

        df_to_return = pdf.copy()
        df_to_return["fitness"] = 1.0
        return df_to_return

    returns = df.groupBy("vars_array").apply(test_data)

    return

如您所见,我已经设法将选择向量和post模型所需的其他附加数据传递给了udf函数,但是我想不出如何将主训练集传递给udf,以便在udf中根据选择向量过滤训练集变量,对提出的模型进行了训练和预测。
我曾考虑过将主训练集保存到磁盘上,以便通过udf读取,但除了它对我来说非常慢之外,我认为它在并发运行这个变量选择器对象时可能会导致问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题