kafka使用者失败,错误为java.lang.abstractmethoderror

iq3niunx  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(380)

我试图从最早的偏移量到最新的偏移量查询kafka,然后将其写入hadoop。
但问题是我越来越 java.lang.AbstractMethodError 即使我没有在代码中调用任何抽象方法。
这是我试图查询Kafka的代码

  1. import org.apache.log4j.Logger
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.{SaveMode, SparkSession}
  4. import org.json.JSONObject
  5. case class Metric(timestamp: String, hostName: String)
  6. object KafkaMetrcs {
  7. val logger = Logger.getLogger(this.getClass.getName)
  8. def getSparkConf(appName: String): SparkConf = {
  9. new SparkConf().setAppName(appName)
  10. //.setMaster("local[*]")
  11. }
  12. def main(args: Array[String]): Unit = {
  13. val sparkConf = new SparkConf()
  14. implicit val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  15. sparkSession.sparkContext.setLogLevel("INFO")
  16. import sparkSession.implicits._
  17. val bootstrapServers = "kafka-vip.com:1234"
  18. val outputLocation = "hdfs://kafka_metrics/output"
  19. val inputds = sparkSession.read.format("kafka")
  20. .option("kafka.bootstrap.servers", bootstrapServers)
  21. .option("subscribe", "fpti.platform.metrics")
  22. .option("startingOffsets", "earliest")
  23. .option("endingOffsets", "latest")
  24. .option("fetchOffset.numRetries", 5)
  25. .option("failOnDataLoss", "false")
  26. .load()
  27. .selectExpr("CAST(value AS STRING)")
  28. .as[String]
  29. val outputds = inputds.map(record => extractValuesFromRecord(record))
  30. outputds.write.mode(SaveMode.Overwrite).json(outputLocation)
  31. }
  32. def extractValuesFromRecord(record: String): Metric = {
  33. val jsonRecord = new JSONObject(record)
  34. val timestamp = jsonRecord.getString("timestamp")
  35. val hostName = jsonRecord.getString("hostName")
  36. Metric(timestamp, hostName)
  37. }
  38. }

依赖项:

  1. lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
  2. lazy val excludeJpountz2 = ExclusionRule(organization = "org.lz4", name = "lz4-java")
  3. lazy val excludeKafka = ExclusionRule(organization = "kafka-clients", name = "org.apache.kafka")
  4. libraryDependencies ++= Seq(
  5. "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" excludeAll(excludeJpountz, excludeKafka),
  6. "org.apache.spark" %% "spark-sql" % "2.3.0" excludeAll(excludeJpountz, excludeJpountz2),
  7. "com.typesafe.play" %% "play-json" % "2.4.0" excludeAll(excludeJpountz, excludeJpountz2),
  8. "org.apache.kafka" %% "kafka" % "0.10.2.1" excludeAll (excludeJpountz),
  9. "org.apache.logging.log4j" % "log4j-api" % "2.8.2",
  10. "org.json" % "json" % "20190722"
  11. )
  12. assemblyMergeStrategy in assembly := {
  13. case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  14. case x => MergeStrategy.first
  15. }

堆栈跟踪:

  1. java.lang.AbstractMethodError
  2. at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
  3. at org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
  4. at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
  5. at org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
  6. at org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
  7. at org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
  8. at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
  9. at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
  10. at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:64)
  11. at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:308)
  12. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  13. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
  14. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  15. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  16. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
  17. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  18. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  19. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  20. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  21. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  22. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  23. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  24. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  25. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  26. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  27. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  28. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  29. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  30. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  31. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  32. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  33. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  34. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  35. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  36. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  37. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  38. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  39. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  40. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  41. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  42. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  43. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  44. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  45. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  46. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  47. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  48. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  49. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  50. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  51. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  52. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  53. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  54. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  55. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  56. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  57. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  58. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  59. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  60. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  61. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  62. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  63. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  64. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  65. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  66. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  67. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  68. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  69. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  70. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  71. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  72. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  73. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  74. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  75. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  76. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  77. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  78. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  79. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  80. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  81. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  82. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  83. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
  84. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
  85. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  86. at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  87. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  88. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  89. at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  90. at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
  91. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
  92. at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
  93. at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  94. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  95. at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
  96. at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  97. at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  98. at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  99. at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  100. at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:208)
  101. at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:208)
  102. at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
  103. at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:208)
  104. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
  105. at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
  106. at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
  107. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
  108. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
  109. at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:526)
  110. ... 51 elided

它是非常简洁的代码块,任何地方都没有抽象函数。但这是混乱的,因为我得到的错误。感谢您的帮助。提前多谢了

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题