推断复杂结构字段spark的数据类型的问题

fruv7luv  于 2021-07-13  发布在  Spark
关注(0)|答案(4)|浏览(342)

我有一个SparkDataframe如下。它在zipped\u feature列中有array struct数组。

+--------------------+
|zipped_feature      |
+--------------------+
|[[A, 1], [ABC, 33]] |
|[[A, 1], [ABS, 24]] |
|[[B, 2], [ABE, 17]] |
|[[C, 3], [ABC, 33]] |
+--------------------+

我尝试使用index在数组struct的这个数组上获取一个项(也是一个数组)。我试着在自定义项下获取基于索引的值。如果第一行的索引为0,则应检索“[a,1]”作为数组。

val getValueUdf = udf { (zippedFeature: Seq[Seq[String]], index: Int) => zippedFeature(index) }

但我的错误率越来越低

data type mismatch: argument 1 requires array<array<string>> type, however, '`zipped_feature`' is of array<struct<_1:string,_2:string>> type.

当我打印模式时,它显示如下

|-- zipped_feature: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

有人能帮我找出我做错了什么吗。我想得到基于索引的值(同样是数组)。

roejwanj

roejwanj1#

zipped\u feature是array类型的列。如果要将每个嵌套列值作为一个数组获取,则需要修改udf,如下所示。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// constructing sample dataframe
val rows=
    List(Row(Array(Row("A","1"),Row("ABC","33"))),
    Row(Array(Row("A","1"),Row("ABS","24"))),
    Row(Array(Row("B","2"),Row("ABE","17"))),
    Row(Array(Row("C","3"),Row("ABC","33"))))
val rdd=spark.sparkContext.parallelize(rows)

val schema=new StructType().add("zipped_feature",ArrayType(new StructType().add("_1",StringType).add("_2",StringType)))
val df=spark.createDataFrame(rdd,schema)
df.show()
/*
+-------------------+
|     zipped_feature|
+-------------------+
|[[A, 1], [ABC, 33]]|
|[[A, 1], [ABS, 24]]|
|[[B, 2], [ABE, 17]]|
|[[C, 3], [ABC, 33]]|
+-------------------+

* /

df.printSchema()
/*
root
|-- zipped_feature: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: string (nullable = true)
|    |    |-- _2: string (nullable = true)

* /

// udf
 val getValueUdf = udf { (zippedFeature: Seq[Row],index:Int) =>zippedFeature(index).toSeq.map(_.toString)}

 df.withColumn("first_column",getValueUdf('zipped_feature,lit(0)))
  .withColumn("second_column",getValueUdf('zipped_feature,lit(1)))
  .show(false)

 /* output
 +-------------------+------------+-------------+
 |zipped_feature     |first_column|second_column|
 +-------------------+------------+-------------+
 |[[A, 1], [ABC, 33]]|[A, 1]      |[ABC, 33]    |
 |[[A, 1], [ABS, 24]]|[A, 1]      |[ABS, 24]    |
 |[[B, 2], [ABE, 17]]|[B, 2]      |[ABE, 17]    |
 |[[C, 3], [ABC, 33]]|[C, 3]      |[ABC, 33]    |
 +-------------------+------------+-------------+

* /
ugmeyewa

ugmeyewa2#

根据我的说法,这个用例不需要用户定义的函数。你可以很容易地使用 withColumn 以及 select 完成任务的声明。

//Source data
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val df = Seq((Seq(Array("A","1"),Array("ABC","33"))),(Seq(Array("A","1"),Array("ABS","24")))).toDF("zipped_feature")
// 1) getting the value using select statements
val df1 = df.select($"zipped_feature"(0).as("ArrayZero"),$"zipped_feature"(1).as("ArrayOne"))
// 2) getting the values using withColumn
val df2 = df.withColumn("Array_Zero",$"zipped_feature"(0)).withColumn("Array_One",$"zipped_feature"(1))
// 3) Getting the value of the Inner array
val df3 = df1.select($"ArrayZero"(0).as("InnerArrayZero"))
// 4) Getting the value of the first element
val value = df1.select($"ArrayZero"(0)).first.getString(0)

输出1:

输出2:

输出3:

输出4:

tp5buhyn

tp5buhyn3#

从错误消息中,列 zipped_feature 是结构数组类型,而不是数组类型。不需要自定义项按索引访问数组元素,可以使用以下选项之一:

col("zipped_feature")(idx) // opt1
col("zipped_feature").getItem(idx)  // opt2
element_at(col("zipped_feature"), idx) // opt3

要将结构数组转换为数组数组,可以使用 transform 功能:

val df1 = df.withColumn(
    "zipped_feature",
    expr("transform(zipped_feature, x -> array(x._1, x._2))")
  ).select(
    col("zipped_feature")(0).as("idx0"),
    col("zipped_feature")(1).as("idx1")
  )

df1.show
//+------+---------+
//|  idx0|     idx1|
//+------+---------+
//|[A, 1]|[ABC, 33]|
//|[A, 1]|[ABS, 24]|
//|[B, 2]|[ABE, 17]|
//|[C, 3]|[ABC, 33]|
//+------+---------+

df1.printSchema
//root
// |-- idx0: array (nullable = true)
// |    |-- element: string (containsNull = true)
// |-- idx1: array (nullable = true)
// |    |-- element: string (containsNull = true)

或者直接不变换数组:

val df1 = df.select(
  expr("array(zipped_feature[0]._1, zipped_feature[0]._2)").as("idx0"),
  expr("array(zipped_feature[1]._1, zipped_feature[1]._2)").as("idx1")
)
hmtdttj4

hmtdttj44#

您可以尝试使用dataset api map 方法:

def getValue(zippedFeature: Seq[(String, String)], index: Int): Seq[String] = {
    zippedFeature(index).productIterator.toList.toSeq.map(_.toString)
}

df.as[Seq[(String, String)]].map(x => (x, getValue(x, 0))).show
+-------------------+------+
|                 _1|    _2|
+-------------------+------+
|[[A, 1], [ABC, 33]]|[A, 1]|
|[[A, 1], [ABS, 24]]|[A, 1]|
|[[B, 2], [ABE, 17]]|[B, 2]|
|[[C, 3], [ABC, 33]]|[C, 3]|
+-------------------+------+

相关问题