我有一个Dataframe如下
val initialData = Seq(
Row("ABC1",List(Row("Java","XX",120),Row("Scala","XA",300))),
Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250)))
)
val arrayStructSchema = new StructType().add("name",StringType)
.add("SortedDataSet",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)))
val df = spark
.createDataFrame(spark.sparkContext.parallelize(initialData),arrayStructSchema)
df.printSchema()
df.show(5, false)
+-------+-----------------------------------+
|name |SortedDataSet |
+-------+-----------------------------------+
|ABC1 |[[Java, XX, 120], [Scala, XA, 300]]|
|Michael|[[Java, XY, 200], [Scala, XB, 500]]|
|Robert |[[Java, XZ, 400], [Scala, XC, 250]]|
+-------+-----------------------------------+
我现在需要将结构的每个元素提取为一个单独的索引列,我将执行以下操作
val newDf = df.withColumn("Col1", sort_array('SortedDataSet).getItem(0))
.withColumn("Col2", sort_array('SortedDataSet).getItem(1))
.withColumn("name_1",$"Col1.name")
.withColumn("author_1",$"Col1.author")
.withColumn("pages_1",$"Col1.pages")
.withColumn("name_2",$"Col2.name")
.withColumn("author_2",$"Col2.author")
.withColumn("pages_2",$"Col2.pages")
这很简单,因为我只有2个数组和5列。当我有多个数组和列时该怎么办?
如何以编程方式执行此操作?
2条答案
按热度按时间0tdrvxhp1#
如果数组的大小相同,则可以通过动态选择数组和结构元素来避免执行昂贵的分解、分组和透视:
x0fgdtte2#
一种方法是使用
posexplode
,然后是groupBy
以及pivot
在生成的索引上,如下所示:给定示例数据集:
注意,我稍微概括了示例数据,以展示大小不均匀的数组。
修改由创建的列名的步骤
pivot
通缉名单: