当从kafka输入主题中读取带有唯一avro模式的消息类型a时,服务a执行操作a;当从同一主题中读取消息类型b时,服务b执行操作b。
我不希望服务a知道b的模式,反之亦然。如果我有两个独立的主题,我会很容易地设置 spring.kafka.properties.specific.avro.reader
至 true
并在特定记录中使用,但由于我不希望模式b出现在服务a中,反之亦然(解耦原因),所以我理想地寻找的是如下所示的内容。
内部服务a:
@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='A'")
public void consumeInSpecificRecord(TypeACompiledAvroClass a){
// Some logic
}
@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='B'")
public void consumeInGenericRecord(GenericRecord b){
// Log and ignore (leave it for service B to process)
}
考虑到设置 specific.avro.reader
标记为true将导致消息b的反序列化错误,并将其设置为false将强制我使用泛型记录,即使对于我尝试避免的第一个流侦听器也是如此。
如果不可能,这里还可以提出什么其他的解决办法?
Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1
1条答案
按热度按时间kx7yvsdv1#
根据gary的评论,我可以设计一个定制的反序列化程序,它可以根据标题中的内容在通用记录和特定记录之间切换。这是它的代码
A
.以及
spring.kafka.cloud.stream.kafka.binder.configuration.value.deserializer
在你的应用程序配置必须设置为GenericSpecificFlexibleDeserializer
当然。