我得到了一个带有schema的框架,如下所示:
root
|-- clip_id: string (nullable = true)
|-- frames: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- frame_id: string (nullable = true)
| | |-- data_source_info: array (nullable = true)
| | | |-- element: struct (containsNull = false)
| | | | |-- data_source_path: string (nullable = true)
| | | | |-- sub_rules: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- device: string (nullable = true)
| | | | |-- file_type: string (nullable = true)
| | | | |-- md5: string (nullable = true)
下面是我的代码,我的spark版本是3.0.2
data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
.withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
.drop("data_source_path", "sub_rules", "device", "file_type", "md5")
.groupBy("clip_id", "frame_id")
.agg(collect_list("data_source_info").as("data_source_info"))
.withColumn("frames", struct(col("frame_id"),col("data_source_info")))
.sort(col("clip_id").asc,col("frame_id").asc).groupBy(col("clip_id")
.agg(collect_list("frames").asc_null_first.as("frames"))
我想要的是按frame_id对帧进行排序,但我得到了这样的错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 4 times, most recent failure: Lost task 0.3 in stage 37.0 (TID 2447, 10.134.64.140, executor 39): java.lang.UnsupportedOperationException: Cannot evaluate expression: input[1, array<struct<frame_id:string,data_source_info:array<struct<data_source_path:string,sub_rules:array<string>,device:string,file_type:string,md5:string>>>>, true] ASC NULLS FIRST
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:301)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:300)
at org.apache.spark.sql.catalyst.expressions.SortOrder.eval(SortOrder.scala:62)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:76)
at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:32)
at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:658)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1492)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
另一种使用udf的方法:
def frameIdSort(frames: WrappedArray[GenericRowWithSchema]): WrappedArray[GenericRowWithSchema] = frames.map(x => (x.getAs[String]("frame_id"), x)).sortBy(_._1).map(_._2)
但也得到了另一个错误:
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not supported
那么我可以做什么来排序列帧由帧_id?
2条答案
按热度按时间au9on6nz1#
错误消息
Cannot evaluate expression: input[1, array...
意味着您不能在agg
(或select
)中使用asc_nulls_first
。它是一个描述如何对一个数组进行排序的表达式,只能在orderBy
或sort
函数中使用。然而,您似乎想要的不是对嵌套框架进行排序,而是对嵌套框架内的数组列进行排序。为此,你可以使用
array_sort
,因为你想按frame_id
排序,这是第一个元素,你不必在代码的其余部分做任何修改:注意:我注解了排序,因为group by不能维护顺序(参见does groupBy after orderBy maintain that order?)。如果你愿意的话,可以把它放在后面。
gorkyyrv2#
这通常对我有用: