使用org.springframework.messaging.Message时出现ClassCast错误,Protobuf对象为Kafka message

7eumitmz  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(330)

我使用spring-kafka和producer属性,如下所示

key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

字符串
我的代码看起来像这样

Message<Event> message = MessageBuilder
                        .withPayload(protobufStruct)
                        .setHeader(MESSAGE_TIMESTAMP, Instant.now().toString())
                        .build();

                kafkaTemplate.send(topic, new MessageKey(key).toString(), message);


在生成消息时,我得到一个错误,
原因:java.lang.ClassCastException:class org.springframework.messaging.support.GenericMessage不能强制转换为class com.google.protobuf.Message(org.springframework.messaging.support.GenericMessage和com.google.protobuf.Message在loader 'app'的未命名模块中)at io.confluent.Kafka ka.serializers.protobuf.KafkaProtobufSerializer.serialize(KafkaProtobufSerializer.java:34)at org. apache. Kafka.common.Serializer.Serialize(Serializer.java:62)at org.apache.Kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015)
我犯了什么错?

gz5pxeao

gz5pxeao1#

没有重载的send(topic, key, Message<?>)方法。
正在使用send(topic, K, V),因此您正在发送GenericMessage作为有效负载(record.value())。
使用kafkaTemplate.send(topic, new MessageKey(key).toString(), protobufStruct);代替。
如果您想自己设置时间戳,请使用send(ProducerRecord<K, V>)变体。

相关问题