使用udf在sparkDataframe中创建一个新列

qxsslcnc  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(304)

我有一个自定义项如下-

val myUdf = udf((col_abc: String, col_xyz: String) => {
    array(
      struct(
        lit("x").alias("col1"),
        col(col_abc).alias("col2"),
        col(col_xyz).alias("col3")
      )
    )
  }

现在,我想在下面的函数中使用这个-

def myfunc(): Column = {
    val myvariable = myUdf($"col_abc", $"col_xyz")
    myvariable
}

然后使用此函数在我的Dataframe中创建一个新列

val df = df..withColumn("new_col", myfunc())

总之,我希望我的列“new\u col”是一个类型数组,其值为x,x,x
我得到下面的错误。我做错什么了?
原因:java.lang.unsupportedoperationexception:不支持org.apache.spark.sql.column类型的架构

ippsafx7

ippsafx71#

两种方式。
不要使用自定义项,因为您使用的是纯spark函数:

val myUdf = ((col_abc: String, col_xyz: String) => {
    array(
      struct(
        lit("x").alias("col1"),
        col(col_abc).alias("col2"),
        col(col_xyz).alias("col3")
      )
    )
  }
)

def myfunc(): Column = {
    val myvariable = myUdf("col_abc", "col_xyz")
    myvariable
}

df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz|        new_col|
+-------+-------+---------------+
|    abc|    xyz|[[x, abc, xyz]]|
+-------+-------+---------------+

使用接受字符串并返回case class的seq的自定义项:

case class cols (col1: String, col2: String, col3: String)

val myUdf = udf((col_abc: String, col_xyz: String) => Seq(cols("x", col_abc, col_xyz)))

def myfunc(): Column = {
    val myvariable = myUdf($"col_abc", $"col_xyz")
    myvariable
}

df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz|        new_col|
+-------+-------+---------------+
|    abc|    xyz|[[x, abc, xyz]]|
+-------+-------+---------------+

如果要将列传递给函数,下面是一个示例:

val myUdf = ((col_abc: Column, col_xyz: Column) => {
    array(
      struct(
        lit("x").alias("col1"),
        col_abc.alias("col2"),
        col_xyz.alias("col3")
      )
    )
  }
)

相关问题