我在我的项目中使用了Spring Cloud(Hoxton.SR8)Stream和Kafka-Streams活页夹。
我们可以在反序列化有效负载之前检查消息头吗?我想这样做是为了实现基于报头的传入消息过滤。反序列化可能会占用一些CPU,在此之前,如果Header中的某个值与条件不匹配,我们是否可以检查Header并丢弃消息?
我尝试这样使用ListernContainerCustomizer。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
return (container, dest, group) -> {
container.setRecordInterceptor(record -> {
System.out.println(">>>> Received record, checking headers");
Headers headers = record.headers();
System.out.println(">>>> Header length: "+headers.toArray().length);
for(Header header: headers)
{
if(header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if(!value.equalsIgnoreCase("PUBLISHED")) {
System.out.println("Event type from header not PUBLISHED, skipping record");
return null;
}
}
}
System.out.println("Processing record");
return record;
});
};
}
但它什么也做不了。我尝试打印加载的Bean,这个定制器Bean确实加载了,但它什么也不做。
请帮帮忙。
1条答案
按热度按时间rjzwgtxy1#
您可以使用setRecordFilterStrategy来代替使用setRecordInterceptor:
您可以按消息的参数进行筛选:
或按标题: