kafkajsonserializer:case类的默认构造函数

yjghlzjz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(275)

我正在使用以下使用者设置使用Kafka主题中的数据:

val consumer = {
    val properties = new Properties()
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "some_consumer_group")
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, streamingConfig.getString("kafka.brokers"))
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaJsonDeserializer[Sample]])
    properties.put("json.value.type",classOf[Sample])
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, streamingConfig.getInt("kafka.maxPollRecords"): java.lang.Integer)
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, streamingConfig.getInt("kafka.sessionTimeoutMs"): java.lang.Integer)
    properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, streamingConfig.getInt("kafka.heartbeatIntervalMs"): java.lang.Integer)
    val consumer = new KafkaConsumer[String, Sample](properties)
    consumer.subscribe(util.Arrays.asList(topic))
    consumer
}

而有效负载由下面的case类表示

case class Sample(properties: Map[String, String],eventType: String)

问题是在反序列化过程中会产生以下错误

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.service.Sample` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{}"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.service.Sample` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{}"; line: 1, column: 2]

如果我使用的是java,那么解决方案就是straighforward:只需添加一个默认构造函数。但是在scala case类中会发生什么呢?我知道jackson对象Map器可以配置一个 jackson-module-scala 它负责处理案例类,但由于 KafkaJsonSerializer 配置为我不能弄乱它的对象Map器-这是它的 configure 方法:

protected void configure(KafkaJsonSerializerConfig config) {
    boolean prettyPrint = config.getBoolean("json.indent.output");
    this.objectMapper = new ObjectMapper();
    this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT, prettyPrint);
}

我该怎么处理这个错误呢?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题