消息转换异常Kafka使用者

q3aa0525  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(226)

I have the below config of kafka stream

spring:
  cloud:
    stream:
      function:
        definition: handleCatalogEvent
      bindings:
        handleCatalogEvent-in-0:
          content-type: application/json
          destination: catalog_change
          group: back-group
          consumer:
            configuration:
              json.value.type: com.test.domain.ChangeNotification
              json.fail.invalid.schema: true
            useNativeEncoding: true
      kafka:
        binder:
          brokers: kafka-dev.test.priv:7887
          auto-create-topics: false
          consumer-properties:
            auto.offset.reset: latest
            auto.commit.interval.ms: 1000
            specific.avro.reader: true
            auto.register.schemas: false
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
            schema.registry.url: https://schema-registry.dev.test.priv:8787

I have the below bean to consume the data

@Bean
public Consumer<Message<ChangeNotification>> handleCatalogEvent() {
    return event -> {
        log.info("- - - - - - - - - - - - - - - - - - - Nature Of Change : {} - - - - - - - - - - - - - - - - -", event.getPayload().getNatureOfChange());
        log.info("- - - - - - - - - - - - - - - - - - - - - - PAYLOAD : {} - - - - - - - - - - - - - - - - -", event.getPayload().getItem());
    };
}

When i receive the data i get this error
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('i' (code 105)): was expecting double-quote to start field name at [Source: (String)"{idCatalog=5c824fc5b0efe060e87f056b, code=008, codeContext=[01], label=Spécialité PS, natureOfChange=INSERT, item={id=6373b315d9428a128d0ffc6c, code=kn17, label=string, startEffectiveDate=2022-11-15T10:25:23.21Z}}"; line: 1, column: 3] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddName(ReaderBasedJsonParser.java:1860) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:734) ~[jackson-core-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176) ~[jackson-databind-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[jackson-databind-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[jackson-databind-2.13.1.jar:2.13.1] at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) ~[jackson-databind-2.13.1.jar:2.13.1] at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:232) ~[spring-messaging-5.3.23.jar:5.3.23] ... 45 common frames omitted

dxxyhpgq

dxxyhpgq1#

如果我们查看日志,我们会发现您有一个字符串有效负载,如{key1=value, key2=value}
这不是有效的JSON,因此您将无法使用JSON解析器,尤其是Confluent的JSONSchema解析器,因为该数据没有模式。
您还需要将StringDeserializer用于您的值,并解析String,或者您需要修复您的生成器以正确使用Confluent的KafkaJsonSchemaSerializer

相关问题