I have the below config of kafka stream
spring:
cloud:
stream:
function:
definition: handleCatalogEvent
bindings:
handleCatalogEvent-in-0:
content-type: application/json
destination: catalog_change
group: back-group
consumer:
configuration:
json.value.type: com.test.domain.ChangeNotification
json.fail.invalid.schema: true
useNativeEncoding: true
kafka:
binder:
brokers: kafka-dev.test.priv:7887
auto-create-topics: false
consumer-properties:
auto.offset.reset: latest
auto.commit.interval.ms: 1000
specific.avro.reader: true
auto.register.schemas: false
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
schema.registry.url: https://schema-registry.dev.test.priv:8787
I have the below bean to consume the data
@Bean
public Consumer<Message<ChangeNotification>> handleCatalogEvent() {
return event -> {
log.info("- - - - - - - - - - - - - - - - - - - Nature Of Change : {} - - - - - - - - - - - - - - - - -", event.getPayload().getNatureOfChange());
log.info("- - - - - - - - - - - - - - - - - - - - - - PAYLOAD : {} - - - - - - - - - - - - - - - - -", event.getPayload().getItem());
};
}
When i receive the data i get this error
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('i' (code 105)): was expecting double-quote to start field name at [Source: (String)"{idCatalog=5c824fc5b0efe060e87f056b, code=008, codeContext=[01], label=Spécialité PS, natureOfChange=INSERT, item={id=6373b315d9428a128d0ffc6c, code=kn17, label=string, startEffectiveDate=2022-11-15T10:25:23.21Z}}"; line: 1, column: 3] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddName(ReaderBasedJsonParser.java:1860) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:734) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176) ~[jackson-databind-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[jackson-databind-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[jackson-databind-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) ~[jackson-databind-2.13.1.jar:2.13.1] at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:232) ~[spring-messaging-5.3.23.jar:5.3.23] ... 45 common frames omitted
1条答案
按热度按时间dxxyhpgq1#
如果我们查看日志,我们会发现您有一个字符串有效负载,如
{key1=value, key2=value}
。这不是有效的JSON,因此您将无法使用JSON解析器,尤其是Confluent的JSONSchema解析器,因为该数据没有模式。
您还需要将
StringDeserializer
用于您的值,并解析String,或者您需要修复您的生成器以正确使用Confluent的KafkaJsonSchemaSerializer
类