我有一个生产者使用java11/spring和apacheavro,另一个消费者使用scala2.12和akka。如果我没说错的话,avro的有效负载应该是语言不可知的。但是,当我试图在scala consumer中获取consumerrecord的值时,我得到了一个classcastexception:
java.lang.ClassCastException: class com.example.schema.DetailsSchema cannot be cast to class com.example.schema.scala.DetailsSchema (com.example.schema.DetailsSchema and com.example.schema.scala.DetailsSchema are in unnamed module of loader 'app'
我正在使用几个插件从我的模式中生成java和scala类
“com.commercehub.gradle.plugin.avro”版本“0.9.1”
“com.zlad.gradle.avrohugger”版本“0.5.0”
对于scala类,我只是在名称空间的末尾附加一个.scala包,以避免与java生成的类发生冲突。
avroschema和java/scala类位于使用上述插件的公共jar中。java类是使用com.example.schema包生成的,scala类使用com.example.schema.scala,两者都扩展org.apache.avro.specific.specificrecordbase
我的producer是一个使用springkafka&confluent的kafkaavro序列化程序的标准spring应用程序
我的消费者是akka应用程序,使用akka stream kafka消费代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.kafka.common.serialization._
val kafkaAvroSerDeConfig = Map[String, Any](
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString
)
val kafkaConsumerSettings: ConsumerSettings[String, DetailsChema] = {
val kafkaAvroDeserializer = new KafkaAvroDeserializer()
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[DetailsChema]]
ConsumerSettings(system, new StringDeserializer, deserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("xt-collector")
.withProperties((ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"),
(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"),
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
}
val control = Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics("details"))
.map(msg => {
try {
val details = msg.value() // HERE I get the class cast exception
// java.lang.ClassCastException: class com.example.schema.DetailsSchema cannot be cast to class com.example.schema.scala.DetailsSchema (com.example.schema.DetailsSchema and com.example.schema.scala.DetailsSchema are in unnamed module of loader 'app'
log.info(s"Received ${details._id} to sink into mongo/elastic")
details
} catch {
case e: Exception => log.error("Error while deserializing details", e)
e.printStackTrace()
}
})
// .via(detailsCollection.flow) Just sink into mongo/elastic
.recover {
case t : Throwable => log.error("Error while storing in mongo/elastic", t)
}
.toMat(Sink.seq)(DrainingControl.apply)
.run()
我的问题是:如果两个avro记录都是从同一个schema生成的,那么是否有可能在java中生成一个avro记录并在另一端使用scala case类进行反序列化?
暂无答案!
目前还没有任何答案,快来回答吧!