基于带注解的方法值类型定义泛型protobuf转换器

30byixjq  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(302)

目前,我在spring kafka中使用json来处理消息,这非常简单,几乎不需要编码:

@KafkaListener(topics = "foo.t",)
public void receive(Foo payload) {
    LOG.info("received payload='{}'", payload);
    doMagic(payload);
}

@KafkaListener(topics = "bar.t",)
public void receive(Bar payload) {
    LOG.info("received payload='{}'", payload);
    doMagic(payload);
}

还有小配置:


# Kafka Config

spring.kafka.bootstrap-servers=broker.kafka
spring.kafka.consumer.group-id=some-app
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.someapp.dto

这很有效,因为内容/类型信息是用json编码的,因此可以单独从字节中恢复(这也有问题,但它是有效的)然而在protobuf中,我没有这些元信息,或者至少我不知道在哪里可以找到它们。
问题:
有没有一种方法可以声明一个适用于多种类型的通用kafka messageconverter,而不必从spring中抛出所有漂亮的抽象/自动配置?
(我还想将其用于json,因为对消息的内容/数据类型进行编码i消息同时存在一些安全性和兼容性问题)
我想避免这样的解决方案:https://stackoverflow.com/a/46670801/4573065 .
选择
编写消息转换器/ Deserializer 它尝试了所有的Kafka课程。

@Override
public T deserialize(String topic, byte[] data) {
    try {
        return (T) Foo.parse(data);
    } catch (Exception e) {
        try {
            return (T) Bar.parse(data);
        } catch (Exception e1) {
            throw e
        }
    }
}

不过,这可能会抵消我希望通过使用二进制格式获得的所有性能提升。
另一种选择是静态Maptopic->content type,但是这仍然是容易出错的,或者至少很难让spring为您进行配置。
编辑:我的制作人看起来像这样:

public void send(String message) {
    kafkaTemplate.send("string.t", message);
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题