我使用spring-kafka
和producer属性,如下所示
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
字符串
我的代码看起来像这样
Message<Event> message = MessageBuilder
.withPayload(protobufStruct)
.setHeader(MESSAGE_TIMESTAMP, Instant.now().toString())
.build();
kafkaTemplate.send(topic, new MessageKey(key).toString(), message);
型
在生成消息时,我得到一个错误,
原因:java.lang.ClassCastException:class org.springframework.messaging.support.GenericMessage不能强制转换为class com.google.protobuf.Message(org.springframework.messaging.support.GenericMessage和com.google.protobuf.Message在loader 'app'的未命名模块中)at io.confluent.Kafka ka.serializers.protobuf.KafkaProtobufSerializer.serialize(KafkaProtobufSerializer.java:34)at org. apache. Kafka.common.Serializer.Serialize(Serializer.java:62)at org.apache.Kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015)
我犯了什么错?
1条答案
按热度按时间gz5pxeao1#
没有重载的
send(topic, key, Message<?>)
方法。正在使用
send(topic, K, V)
,因此您正在发送GenericMessage
作为有效负载(record.value()
)。使用
kafkaTemplate.send(topic, new MessageKey(key).toString(), protobufStruct);
代替。如果您想自己设置时间戳,请使用
send(ProducerRecord<K, V>)
变体。