scala spark array_sort with complex schema

dwbf0jvd  于 2023-10-18  发布在  Scala
关注(0)|答案(2)|浏览(129)

我得到了一个带有schema的框架,如下所示:

root
|-- clip_id: string (nullable = true)
|-- frames: array (nullable = true)
|    |-- element: struct (containsNull = false)
|    |    |-- frame_id: string (nullable = true)
|    |    |-- data_source_info: array (nullable = true)
|    |    |    |-- element: struct (containsNull = false)
|    |    |    |    |-- data_source_path: string (nullable = true)
|    |    |    |    |-- sub_rules: array (nullable = true)
|    |    |    |    |    |-- element: string (containsNull = true)
|    |    |    |    |-- device: string (nullable = true)
|    |    |    |    |-- file_type: string (nullable = true)
|    |    |    |    |-- md5: string (nullable = true)

下面是我的代码,我的spark版本是3.0.2

data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
.withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
.drop("data_source_path", "sub_rules", "device", "file_type", "md5")
.groupBy("clip_id", "frame_id")
.agg(collect_list("data_source_info").as("data_source_info"))
.withColumn("frames", struct(col("frame_id"),col("data_source_info")))
.sort(col("clip_id").asc,col("frame_id").asc).groupBy(col("clip_id")
.agg(collect_list("frames").asc_null_first.as("frames"))

我想要的是按frame_id对帧进行排序,但我得到了这样的错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 4 times, most recent failure: Lost task 0.3 in stage 37.0 (TID 2447, 10.134.64.140, executor 39): java.lang.UnsupportedOperationException: Cannot evaluate expression: input[1, array<struct<frame_id:string,data_source_info:array<struct<data_source_path:string,sub_rules:array<string>,device:string,file_type:string,md5:string>>>>, true] ASC NULLS FIRST
        at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:301)
        at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:300)
        at org.apache.spark.sql.catalyst.expressions.SortOrder.eval(SortOrder.scala:62)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
        at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:76)
        at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:32)
        at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:658)
        at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
        at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1492)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

另一种使用udf的方法:

def frameIdSort(frames: WrappedArray[GenericRowWithSchema]): WrappedArray[GenericRowWithSchema] = frames.map(x => (x.getAs[String]("frame_id"), x)).sortBy(_._1).map(_._2)

但也得到了另一个错误:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not supported

那么我可以做什么来排序列帧由帧_id?

au9on6nz

au9on6nz1#

错误消息Cannot evaluate expression: input[1, array...意味着您不能在agg(或select)中使用asc_nulls_first。它是一个描述如何对一个数组进行排序的表达式,只能在orderBysort函数中使用。
然而,您似乎想要的不是对嵌套框架进行排序,而是对嵌套框架内的数组列进行排序。为此,你可以使用array_sort,因为你想按frame_id排序,这是第一个元素,你不必在代码的其余部分做任何修改:

data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
    .withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
    .drop("data_source_path", "sub_rules", "device", "file_type", "md5")
    .groupBy("clip_id", "frame_id")
    .agg(collect_list("data_source_info").as("data_source_info"))
    .withColumn("frames", struct(col("frame_id"),col("data_source_info")))
    // .sort(col("clip_id").asc,col("frame_id").asc)
    .groupBy(col("clip_id")
    .agg(collect_list("frames") as "frames")
    .withColumn("frames", array_sort(col("frames")))

注意:我注解了排序,因为group by不能维护顺序(参见does groupBy after orderBy maintain that order?)。如果你愿意的话,可以把它放在后面。

gorkyyrv

gorkyyrv2#

这通常对我有用:

from pyspark.sql import functions as F

df_exploded = df.select("clip_id", F.explode("frames").alias("frame_data"))

df_sorted = df_exploded.orderBy("clip_id", "frame_data.frame_id")

df_final = df_sorted.groupBy("clip_id").agg(F.collect_list("frame_data").alias("frames"))

df_final.show(truncate=False)

相关问题