在尝试同时使用udf和to-json时获取“task not serializable:java.io.notserializableexception”

juud5qan  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(536)

我一直在试图找出确切的问题在哪里,但无法做到这一点。尝试按照这样的方式生成行为不一致的json字符串,但仍然无法理解问题。
下面是我的代码片段,

  1. val writingDataset = sparkSession
  2. .readStream
  3. .format("kafka")
  4. .option(kafkaBootstrapServers, urls)
  5. .option("subscribe", inputTopics)
  6. .option("startingOffsets", "earliest")
  7. .load()
  8. .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  9. // .withColumn("value", parser.parseUDF('value).as("value")) //combination of this two line doesn't work either
  10. // .withColumn("value", to_json('value).as("value")) //combination of this two line doesn't work either
  11. .select(col("key"), to_json(parser.parseUDF('value)).as("value"))
  12. .writeStream
  13. .format("console")
  14. .start()
  15. writingDataset.awaitTermination

下面是我的自定义项代码

  1. val parse = (value: String) => {
  2. Some(CompanyDetail("something", "something"))
  3. }
  4. import org.apache.spark.sql.functions.udf
  5. val parseUDF = udf(parse)
  6. val keyUDF = udf(keyParse)

不知道这里发生了什么,但我不断得到下面的错误

  1. org.apache.spark.SparkException: Writing job aborted.
  2. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
  3. at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
  4. at org.apache.spark.sql.execution.SparkPlan$$Lambda$7251/0000000000000000.apply(Unknown Source)
  5. at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
  6. at org.apache.spark.sql.execution.SparkPlan$$Lambda$7280/0000000000000000.apply(Unknown Source)
  7. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  8. at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  9. at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  10. at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  11. at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
  12. at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
  13. at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2782)
  14. at org.apache.spark.sql.Dataset$$Lambda$7166/000000006C38DB10.apply(Unknown Source)
  15. at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
  16. at org.apache.spark.sql.Dataset$$Lambda$7169/000000006C38F210.apply(Unknown Source)
  17. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  18. at org.apache.spark.sql.execution.SQLExecution$$$Lambda$7140/000000006C25F080.apply(Unknown Source)
  19. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  20. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  21. at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
  22. at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
  23. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:540)
  24. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$7136/000000006C25E1B0.apply(Unknown Source)
  25. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  26. at org.apache.spark.sql.execution.SQLExecution$$$Lambda$7140/000000006C25F080.apply(Unknown Source)
  27. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  28. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  29. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
  30. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$7135/000000006C25DA80.apply(Unknown Source)
  31. at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  32. at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  33. at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  34. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
  35. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
  36. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$6895/000000006C02DE80.apply$mcV$sp(Unknown Source)
  37. at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
  38. at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  39. at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  40. at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  41. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
  42. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$6893/000000006C02CF10.apply$mcZ$sp(Unknown Source)
  43. at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
  44. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
  45. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
  46. at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
  47. Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.runtime.LazyRef
iszxjhcz

iszxjhcz1#

我自己想出来的。Spark代码没有问题。这是scala版本的问题。一旦我降低了信用等级,它就起作用了 scala version2.11.0

相关问题