java 使用Spring Cloud Streams功能的路由条件

cig3rfwq  于 2024-01-05  发布在  Java
关注(0)|答案(1)|浏览(183)

在旧的命令式编程类型被弃用后,我遇到了一些问题。我有两个微服务(一个作为发布者,另一个作为订阅者),在旧的方式下,通过注解@StreamListener(target = "events", condition = "headers['type']=='consumerPermissionEvent'"),我可以让两个函数只监听记录,现在我不知道如何做到这一点。
我正在阅读所有文档event routing并尝试使用routing-expression,但两个消费者正在阅读所有记录。
第一个微服务器的应用yaml:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. output:
  6. destination: topicEvents

字符串
第二个应用yaml是:

  1. spring:
  2. cloud:
  3. function:
  4. routing-expression: headers['type']
  5. definition: consumerPermissionEvent;consumerApiEvent
  6. stream:
  7. bindings:
  8. consumerPermissionEvent-in-0:
  9. destination: topicUsers
  10. consumerApiEvent-in-0:
  11. destination: topicUsers


我从第一个微服务发送这样的消息:

  1. @Autowired
  2. private StreamBridge bridge;
  3. public void send(PermissionEvent event){
  4. Message<PermissionEvent> message = MessageBuilder.withPayload(event)
  5. .setHeader("type","consumerPermissionEvent").build();
  6. bridge.send("output", message);
  7. }


第二个微服务有两个消费者:

  1. @Bean
  2. public Consumer<Message<ApiEvent>> consumerApiEvent() {
  3. return e -> log.debug("READED API EVENT: {}", e.getPayload());
  4. }
  5. @Bean
  6. public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
  7. return e -> log.debug("READED PERMISSION EVENT: {}", e.getPayload());
  8. }


第二个微服务的输出日志:

  1. [KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
  2. [KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)


有什么办法吗?
Thanks in advance

q43xntqr

q43xntqr1#

你将需要启用路由首先通过使用以下属性:

  1. --spring.cloud.stream.function.routing.enabled=true

字符串
有关更多详细信息,请参阅https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#spring_cloud_function

相关问题