使用默认spark编码器将嵌套对象转换为dataset结构

wdebmtf2  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(330)

我试图在spark中持久化一个基于lombok的嵌套类。定义如下:

@Data
@NoArgsConstructor
public class FetchInfo implements Comparable<FetchInfo>, Serializable {
    private int fetchTime = -1;
    private String status = FetchStatus.DISCOVERED.toString();
    private String exceptionClass = "";

    @Override
    public int compareTo(@NotNull FetchInfo o) {
        return Integer.compare(fetchTime, o.fetchTime);
    }
}

但我有以下例外:

Caused by: java.lang.IllegalArgumentException: The value (FetchInfo(fetchTime=1593890877, status=SUCCESS, exceptionClass=null)) of the type (foo.bar.FetchInfo) cannot be converted to struct<exceptionClass:string,fetchTime:int,status:string>
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:268)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:174)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:158)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:425)
    at org.apache.spark.sql.SQLContext$.$anonfun$beansToRows$3(SQLContext.scala:1070)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at org.apache.spark.sql.SQLContext$.$anonfun$beansToRows$2(SQLContext.scala:1069)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

有人能告诉我怎么了吗?因为这个模式似乎是正确的。我也删除了Lombok山,但没有机会。还请注意,这个类嵌套在另一个类中。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题