我正在尝试创建一个基于事件的系统,用于在使用apachekafka作为消息传递系统的服务和spring云流kafka之间进行通信。
我已经编写了如下的receiver类方法,
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}
此方法专门用于捕获与employeecreatedevent相关的消息或事件。
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}
此方法专门用于捕获与employeetransferredevent相关的消息或事件。
@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}
这是默认方法。
当我运行应用程序时,我无法看到使用condition属性注解的方法被调用。我只看到handledefaultevent方法被调用。
我正在使用下面的custommessagesource类从发送/源应用程序向此接收器应用程序发送消息,如下所示,
@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;
public void sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}
我在源应用程序中从我的控制器调用方法,如下所示,
customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");
custommessagesource示例如下所示自动连接,
@Autowired
CustomMessageSource customMessageSource;
基本上,我想过滤sink/receiver应用程序接收到的消息并相应地处理它们。
为此,我使用@streamlistener注解with condition属性来模拟处理不同事件的行为。
我使用的是spring cloud stream chelsea.sr2版本。
有人能帮我解决这个问题吗。
1条答案
按热度按时间ncgqoxb01#
似乎头没有传播。确保在中包含自定义标题
spring.cloud.stream.kafka.binder.headers
http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/chelsea.sr2/reference/htmlsingle/#_kafka_binder_properties .