scala—为什么会出现这个编译错误:“找不到kstream.consumered的隐式值”,我该如何修复它?

nwlls2ji  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(444)

我们有以下依赖关系:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"

我们使用代码生成器从avro模式文件生成scala case类。一个这样生成的case类具有一个值,作为其字段之一。在avro模式中,这是用type=[t1,t2]表示的,因此生成似乎是合适的,即sum类型:可以是t1类型,也可以是t2类型。
问题是从主题到case类(binary->avro map->case类)的反序列化路径缺少什么。
基本上我现在遇到了这个错误:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")

第一个想法是kafka streams avro serde,但是这个库可能只确保avro map的serde[genericord],而不是case类。因此,另一个依赖关系是帮助avro genericord到case类的Map和返回。我们还有一些手工编写的代码,可以从模式中生成case类,这些代码似乎可以直接与spray json一起使用。
我认为在(binary<->avro genericrecord<->case类示例)转换中,有一个缺口,可能是case类中有一个字段?
我现在正在尝试创建一个serde[userevent]示例。因此,在我的理解中,需要在userevent和avro genericrecord之间进行转换,类似于map,然后在avro record和binary之间进行转换——这很可能包含在kafka streams avro serde依赖项中,就像应该有serde[genericrecord]或类似项一样。
进口方面,我们有以下几点需要进口:

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
yiytaume

yiytaume1#

对我来说,我必须更好地遵循指示,并添加一个隐式的serde实现。链接中的示例如下所示:

// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde

有关更完整的示例,请参见其avro lib的scala测试:

// Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
    implicit val genericAvroSerde: Serde[GenericRecord] = {
      val gas = new GenericAvroSerde
      val isKeySerde: Boolean = false
      gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
      gas
    }
kwvwclae

kwvwclae2#

你导入了相应的包吗?

import org.apache.kafka.streams.scala.ImplicitConversions._

查阅https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-数字用户线

ve7v8dk2

ve7v8dk23#

事实上,缺少一个导入。现在可以编译了。以下是进口产品:

import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._

相关问题