sparkjava:如何在sparkdataframe中添加数组列

x6h2sr28  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(406)

我正在尝试向spark数据框添加一个新列。添加的新列的大小将基于变量(例如 salt )我将使用该列分解数据集以用于salted join的post。
目前,我正在使用连续 lit 在一个 array 函数,但它有一个问题,即它不能参数化,作为一个编码实践看起来最糟糕。我当前的实现如下所示。

int salt =3;

Dataset<Row> Reference_with_Salt_Col = Reference.withColumn("salt_array", array(lit(0), lit(1), lit(2)));

我已经参考并研究了各种方法,但它们似乎都不能解决java中的问题。 functions.typedlit 这种方法虽然在python/scala中有效,但在java中似乎不起作用。进一步传递一个数组或列表也无助于spark给出相同的错误。
我使用的是spark 2.2.0和Java1.8版本

fjnneemd

fjnneemd1#

你可以用 array 函数,但首先将列表中的每个元素转换为 lit . 使用示例 map 流函数:

import org.apache.spark.sql.*;

import java.util.Arrays;
import java.util.List;

// example of input dataframe
Reference.show();

//+-----+
//|label|
//+-----+
//|    a|
//|    b|
//|    c|
//+-----+

List<Integer> salt_array = Arrays.asList(0, 1, 2);

Reference.withColumn(
        "salt_array",
        functions.array(salt_array.stream().map(functions::lit).toArray(Column[]::new))
).show();

//+-----+----------+
//|label|salt_array|
//+-----+----------+
//|    a| [0, 1, 2]|
//|    b| [0, 1, 2]|
//|    c| [0, 1, 2]|
//+-----+----------+

生成包含从0到 salt - 1 ,您可以使用 IntStream.rangeClosed 这样地:

import java.util.stream.IntStream;

int salt = 3;

Dataset<Row> Reference_with_Salt_Col = Reference.withColumn(
        "salt_array",
        functions.array(IntStream.rangeClosed(0, salt - 1).mapToObj(functions::lit).toArray(Column[]::new))
);

相关问题