我正在使用以下使用者设置使用Kafka主题中的数据:
val consumer = {
val properties = new Properties()
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "some_consumer_group")
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, streamingConfig.getString("kafka.brokers"))
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaJsonDeserializer[Sample]])
properties.put("json.value.type",classOf[Sample])
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, streamingConfig.getInt("kafka.maxPollRecords"): java.lang.Integer)
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, streamingConfig.getInt("kafka.sessionTimeoutMs"): java.lang.Integer)
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, streamingConfig.getInt("kafka.heartbeatIntervalMs"): java.lang.Integer)
val consumer = new KafkaConsumer[String, Sample](properties)
consumer.subscribe(util.Arrays.asList(topic))
consumer
}
而有效负载由下面的case类表示
case class Sample(properties: Map[String, String],eventType: String)
问题是在反序列化过程中会产生以下错误
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.service.Sample` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: (byte[])"{}"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.service.Sample` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: (byte[])"{}"; line: 1, column: 2]
如果我使用的是java,那么解决方案就是straighforward:只需添加一个默认构造函数。但是在scala case类中会发生什么呢?我知道jackson对象Map器可以配置一个 jackson-module-scala
它负责处理案例类,但由于 KafkaJsonSerializer
配置为我不能弄乱它的对象Map器-这是它的 configure
方法:
protected void configure(KafkaJsonSerializerConfig config) {
boolean prettyPrint = config.getBoolean("json.indent.output");
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT, prettyPrint);
}
我该怎么处理这个错误呢?
暂无答案!
目前还没有任何答案,快来回答吧!