目前,我在spring kafka中使用json来处理消息,这非常简单,几乎不需要编码:
@KafkaListener(topics = "foo.t",)
public void receive(Foo payload) {
LOG.info("received payload='{}'", payload);
doMagic(payload);
}
@KafkaListener(topics = "bar.t",)
public void receive(Bar payload) {
LOG.info("received payload='{}'", payload);
doMagic(payload);
}
还有小配置:
# Kafka Config
spring.kafka.bootstrap-servers=broker.kafka
spring.kafka.consumer.group-id=some-app
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.someapp.dto
这很有效,因为内容/类型信息是用json编码的,因此可以单独从字节中恢复(这也有问题,但它是有效的)然而在protobuf中,我没有这些元信息,或者至少我不知道在哪里可以找到它们。
问题:
有没有一种方法可以声明一个适用于多种类型的通用kafka messageconverter,而不必从spring中抛出所有漂亮的抽象/自动配置?
(我还想将其用于json,因为对消息的内容/数据类型进行编码i消息同时存在一些安全性和兼容性问题)
我想避免这样的解决方案:https://stackoverflow.com/a/46670801/4573065 .
选择
编写消息转换器/ Deserializer
它尝试了所有的Kafka课程。
@Override
public T deserialize(String topic, byte[] data) {
try {
return (T) Foo.parse(data);
} catch (Exception e) {
try {
return (T) Bar.parse(data);
} catch (Exception e1) {
throw e
}
}
}
不过,这可能会抵消我希望通过使用二进制格式获得的所有性能提升。
另一种选择是静态Maptopic->content type,但是这仍然是容易出错的,或者至少很难让spring为您进行配置。
编辑:我的制作人看起来像这样:
public void send(String message) {
kafkaTemplate.send("string.t", message);
}
暂无答案!
目前还没有任何答案,快来回答吧!