scala 如何在spark中创建List[Row]类型的编码器?

lsmepo6l  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(168)

我尝试在Spark DataFrame上使用flatMapGroupWithState函数来自定义聚合。
flatMapGroupState的lambda中,我试图保存Row: GroupState[List[Row]]的List。上面提到的代码,是通过抱怨得到Runtime Exception
找不到org.apache.spark.sql.Row的编码器。
但是对于outputEncoder它是工作的。
问题是:如何创建一个类型的编码器:List[Row]在Spark?

  1. val outputEncoder = RowEncoder(stateSchema)
  2. val stateEncoder: Encoder[List[Row]] = ExpressionEncoder()
  3. df.flatMapGroupsWithState(OutputMode.Update(), timeoutConf = GroupStateTimeout.EventTimeTimeout())(func = aggregationOperations)(stateEncoder, outputEncoder)
  4. ~
  5. ~
  6. ~
  7. private def aggregationOperations(key: String, values: Iterator[Row], state: GroupState[List[Row]]): Iterator[Row] = {
  8. ~
  9. ~
  10. ~
  11. }
jm81lzqq

jm81lzqq1#

尝试RowEncoder
StructType(List(StructField(“state”,ArrayType(stateSchema)

相关问题