Spring Cloud Stream Kafka-Streams绑定器,反序列化消息前检查报头

hfyxw5xn  于 2022-09-21  发布在  Spring
关注(0)|答案(1)|浏览(167)

我在我的项目中使用了Spring Cloud(Hoxton.SR8)Stream和Kafka-Streams活页夹。

我们可以在反序列化有效负载之前检查消息头吗?我想这样做是为了实现基于报头的传入消息过滤。反序列化可能会占用一些CPU,在此之前,如果Header中的某个值与条件不匹配,我们是否可以检查Header并丢弃消息?

我尝试这样使用ListernContainerCustomizer。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() 
{
        return (container, dest, group) -> {
                container.setRecordInterceptor(record -> {
                    System.out.println(">>>> Received record, checking headers");
                    Headers headers = record.headers();
                    System.out.println(">>>> Header length: "+headers.toArray().length);
                    for(Header header: headers)
                    {
                        if(header.key().equalsIgnoreCase("eventtype"))
                        {
                            String value = String.valueOf(header.value());
                            if(!value.equalsIgnoreCase("PUBLISHED")) {
                                System.out.println("Event type from header not PUBLISHED, skipping record");
                            return null;
                        }
                    }
                }
                System.out.println("Processing record");
                return record;
            });
        };
    }

但它什么也做不了。我尝试打印加载的Bean,这个定制器Bean确实加载了,但它什么也不做。

请帮帮忙。

rjzwgtxy

rjzwgtxy1#

您可以使用setRecordFilterStrategy来代替使用setRecordInterceptor:

您可以按消息的参数进行筛选:

factory.setRecordFilterStrategy(message -> {

          String messageValue = (String) message.value().getParamName();

          if ( // Your business logic) {
            return false;
          }

          LOGGER.info("Message discarded : {}", messageValue);
          return true;
        });

或按标题:

factory.recordFilterStrategy(message -> {
                Header header = message.headers().lastHeader("XXX");

                if(String.valueOf(header.value()).equals(// Your business logic)){
                   return false;
                }

                LOGGER.info("Message discarded : {}", String.valueOf(header.value()));
                return true;
     });

相关问题