无法在spring cloud stream@streamlistener注解中使用condition属性筛选接收到的消息

cwxwcias  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(584)

我正在尝试创建一个基于事件的系统,用于在使用apachekafka作为消息传递系统的服务和spring云流kafka之间进行通信。
我已经编写了如下的receiver类方法,

  1. @StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
  2. public void handleEmployeeCreatedEvent(@Payload String payload) {
  3. logger.info("Received EmployeeCreatedEvent: " + payload);
  4. }

此方法专门用于捕获与employeecreatedevent相关的消息或事件。

  1. @StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
  2. public void handleEmployeeTransferredEvent(@Payload String payload) {
  3. logger.info("Received EmployeeTransferredEvent: " + payload);
  4. }

此方法专门用于捕获与employeetransferredevent相关的消息或事件。

  1. @StreamListener(target = Sink.INPUT)
  2. public void handleDefaultEvent(@Payload String payload) {
  3. logger.info("Received payload: " + payload);
  4. }

这是默认方法。
当我运行应用程序时,我无法看到使用condition属性注解的方法被调用。我只看到handledefaultevent方法被调用。
我正在使用下面的custommessagesource类从发送/源应用程序向此接收器应用程序发送消息,如下所示,

  1. @Component
  2. @EnableBinding(Source.class)
  3. public class CustomMessageSource {
  4. @Autowired
  5. private Source source;
  6. public void sendMessage(String payload,String eventType) {
  7. Message<String> myMessage = MessageBuilder.withPayload(payload)
  8. .setHeader("eventType", eventType)
  9. .build();
  10. source.output().send(myMessage);
  11. }
  12. }

我在源应用程序中从我的控制器调用方法,如下所示,

  1. customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");

custommessagesource示例如下所示自动连接,

  1. @Autowired
  2. CustomMessageSource customMessageSource;

基本上,我想过滤sink/receiver应用程序接收到的消息并相应地处理它们。
为此,我使用@streamlistener注解with condition属性来模拟处理不同事件的行为。
我使用的是spring cloud stream chelsea.sr2版本。
有人能帮我解决这个问题吗。

ncgqoxb0

ncgqoxb01#

似乎头没有传播。确保在中包含自定义标题 spring.cloud.stream.kafka.binder.headers http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/chelsea.sr2/reference/htmlsingle/#_kafka_binder_properties .

相关问题