SpringKafka反序列化中的java异常

t3irkdon  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(428)

我正在为kafka消费者创建自定义反序列化程序。但我有个例外-

2019-04-05 16:36:51.064 ERROR 13256 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = freshTopic, partition = 0, offset = 229860, CreateTime = 1554462411064, serialized key size = -1, serialized value size = 214, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"date":null,"deviceAddress":"10.95.251.8","iPAddress":" ","userName":"z-nbpvs1","group":" ","eventCategoryName":"User.Activity.Privileged Use.Successful","message":"User authentication succeeded: Uname: z-nbpvs1"})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.test.engine.RawEventConsumer.consume(com.test.models.CSVDataModel) throws java.io.IOException]
Bean [com.test.engine.RawEventConsumer@31e130bf]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.test.models.CSVDataModel] for GenericMessage [payload={"date":null,"deviceAddress":"10.95.251.8","iPAddress":" ","userName":"z-nbpvs1","group":" ","eventCategoryName":"User.Activity.Privileged Use.Successful","message":"User authentication succeeded: Uname: z-nbpvs1"}, headers={kafka_offset=229860, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7773cad2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=freshTopic, kafka_receivedTimestamp=1554462411064}], failedMessage=GenericMessage [payload={"date":null,"deviceAddress":"10.95.251.8","iPAddress":" ","userName":"z-nbpvs1","group":" ","eventCategoryName":"User.Activity.Privileged Use.Successful","message":"User authentication succeeded: Uname: z-nbpvs1"}, headers={kafka_offset=229860, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7773cad2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=freshTopic, kafka_receivedTimestamp=1554462411064}]

Kafka制片人代码-

Properties producerProperities = new Properties();
        producerProperities.setProperty("bootstrap.servers", "127.0.0.1:9092");
        producerProperities.setProperty("acks", "1");
        producerProperities.setProperty("retries", "10");
        producerProperities.setProperty("key.serializer", StringSerializer.class.getName());
        producerProperities.setProperty("value.serializer", CSVDataModelSerializer.class.getName());

        try(Producer<String, CSVDataModel> producer = new KafkaProducer<>(producerProperities)){
            producer.send(new ProducerRecord<String, CSVDataModel>(TOPIC, dataModel));

        }catch(Exception e) {
            e.printStackTrace();
        }

Kafka消费代码-

@KafkaListener(topics = "freshTopic", groupId = "group_id")
    public void consume(CSVDataModel dataModel) throws IOException {

         Properties producerProperities = new Properties();
         producerProperities.setProperty("key.deserializer", StringDeserializer.class.getName());
         producerProperities.setProperty("value.deserializer", CSVDataModelDeSerializer.class.getName());

         try (KafkaConsumer<String, CSVDataModel> consumer = new KafkaConsumer<>(producerProperities)) {
              //  consumer.subscribe(Collections.singletonList("freshTopic"));
                while (true) {
                    ConsumerRecords<String, CSVDataModel> messages = consumer.poll(100);
                    for (ConsumerRecord<String, CSVDataModel> message : messages) {
                      System.out.println("Message received " + message.value().toString());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

    }

我还编写了自定义序列化程序和反序列化程序。
序列化程序-

@Override
    public byte[] serialize(String topic, CSVDataModel data) {
          byte[] retVal = null;
            ObjectMapper objectMapper = new ObjectMapper();
            try {
              retVal = objectMapper.writeValueAsString(data).getBytes();
            } catch (Exception e) {
              e.printStackTrace();
            }
            return retVal;
          }

反序列化程序-

@Override
public CSVDataModel deserialize(String topic, byte[] data) {
     ObjectMapper mapper = new ObjectMapper();
        CSVDataModel csvDataModel = null;
        try {
            csvDataModel = mapper.readValue(data, CSVDataModel.class);
        } catch (Exception e) {

          e.printStackTrace();
        }
        return csvDataModel;
}

谁能告诉我我做错了什么吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题