我尝试在Spark DataFrame上使用flatMapGroupWithState
函数来自定义聚合。
在flatMapGroupState
的lambda中,我试图保存Row: GroupState[List[Row]]
的List。上面提到的代码,是通过抱怨得到Runtime Exception
找不到org.apache.spark.sql.Row的编码器。
但是对于outputEncoder
它是工作的。
问题是:如何创建一个类型的编码器:List[Row]
在Spark?
val outputEncoder = RowEncoder(stateSchema)
val stateEncoder: Encoder[List[Row]] = ExpressionEncoder()
df.flatMapGroupsWithState(OutputMode.Update(), timeoutConf = GroupStateTimeout.EventTimeTimeout())(func = aggregationOperations)(stateEncoder, outputEncoder)
~
~
~
private def aggregationOperations(key: String, values: Iterator[Row], state: GroupState[List[Row]]): Iterator[Row] = {
~
~
~
}
1条答案
按热度按时间jm81lzqq1#
尝试RowEncoder
StructType(List(StructField(“state”,ArrayType(stateSchema)