我们有以下依赖关系:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
我们使用代码生成器从avro模式文件生成scala case类。一个这样生成的case类具有一个值,作为其字段之一。在avro模式中,这是用type=[t1,t2]表示的,因此生成似乎是合适的,即sum类型:可以是t1类型,也可以是t2类型。
问题是从主题到case类(binary->avro map->case类)的反序列化路径缺少什么。
基本上我现在遇到了这个错误:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")
第一个想法是kafka streams avro serde,但是这个库可能只确保avro map的serde[genericord],而不是case类。因此,另一个依赖关系是帮助avro genericord到case类的Map和返回。我们还有一些手工编写的代码,可以从模式中生成case类,这些代码似乎可以直接与spray json一起使用。
我认为在(binary<->avro genericrecord<->case类示例)转换中,有一个缺口,可能是case类中有一个字段?
我现在正在尝试创建一个serde[userevent]示例。因此,在我的理解中,需要在userevent和avro genericrecord之间进行转换,类似于map,然后在avro record和binary之间进行转换——这很可能包含在kafka streams avro serde依赖项中,就像应该有serde[genericrecord]或类似项一样。
进口方面,我们有以下几点需要进口:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
3条答案
按热度按时间yiytaume1#
对我来说,我必须更好地遵循指示,并添加一个隐式的serde实现。链接中的示例如下所示:
有关更完整的示例,请参见其avro lib的scala测试:
kwvwclae2#
你导入了相应的包吗?
查阅https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-数字用户线
ve7v8dk23#
事实上,缺少一个导入。现在可以编译了。以下是进口产品: