spark流错误:java.lang.illegalargumentexception:java.net.urisyntaxexception:绝对uri中的相对路径:

dauxcl2d  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(281)

我试着从Kafkaavro主题流数据。
下面是我的代码片段:

  1. val sparkStreamingContext = new StreamingContext(sc, Durations.seconds(60))
  2. val brokers = "Broker info"
  3. val schemaRegistryURL = "URL schema registry "
  4. val subjectValueName = "topicname" + "-value"
  5. val restService = new RestService(schemaRegistryURL)
  6. val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
  7. val parser = new Schema.Parser
  8. val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
  9. val kafkaParam = new mutable.HashMap[String, String]()
  10. kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  11. kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  12. kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  13. kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "streaming-kafka")
  14. kafkaParam.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  15. kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  16. val topicList = List("topicname")
  17. val messageStream = KafkaUtils.createDirectStream(sparkStreamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicList, kafkaParam))
  18. val TRANSACTION_SCHEMA: StructType = SchemaConverters.toSqlType(topicValueAvroSchema).dataType.asInstanceOf[StructType]
  19. messageStream.foreachRDD { rdd =>
  20. val streamData = spark.read.schema(TRANSACTION_SCHEMA).avro(rdd.map(x => x.value()).toString())
  21. streamData.repartition(1).write.format("com.databricks.spark.avro").mode("Append") saveAsTable ("tablename")
  22. }
  23. }
  24. sparkStreamingContext.start()
  25. sparkStreamingContext.awaitTermination()

但我得到下面的错误,有人可以帮助解决这个问题。

  1. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: MapPartitionsRDD[75] at map at <console>:54

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题