java Spring云流的功能消费者和生产者

kpbwa7wx  于 2023-03-16  发布在  Java
关注(0)|答案(1)|浏览(131)

在我的springboot应用程序中,我需要使用来自2个不同Kafka主题的消息。
在第一个主题中,所有消息都遵循相同的详细说明。在第二个主题中,我需要通过一个头值来做两个不同的详细说明。
过去我使用@StreamListener,但现在已经弃用了。我如何使用干净的代码在application.yml中配置新的消费者?
我只看到了一个MessageRoutingCallback的示例,但是我不喜欢对所有主题使用单个Router,因为我不想保留所有的“if”语句来划分详细说明。
我试图定义2个路由器和2个消费者,但应用程序没有启动,因为不能接受两个bean。

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 3 of method functionRouter in org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration required a single bean, but 2 were found:
    - firstMessageRouter: defined in file [C:\Workspace-IntelliJ\TEST\test-kafka-cloud-stream\target\classes\org\example\router\FirstMessageRouter.class]
    - secondMessageRouter: defined in file [C:\Workspace-IntelliJ\TEST\test-kafka-cloud-stream\target\classes\org\example\router\SecondMessageRouter.class]

Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
@Component
@Slf4j
public class FirstMessageRouter implements MessageRoutingCallback {
    @Override
    public FunctionRoutingResult routingResult(Message<?> message) {
        log.info("First router");
        log.info("Headers: " + message.getHeaders());
        log.info("Payload: " + message.getPayload());

        return new MessageRoutingCallback.FunctionRoutingResult("firstconsumer");
    }
}
@Component
@Slf4j
public class SecondMessageRouter implements MessageRoutingCallback {
    @Override
    public FunctionRoutingResult routingResult(Message<?> message) {
        log.info("Second router");
        log.info("Headers: " + message.getHeaders());
        log.info("Payload: " + message.getPayload());

        return new FunctionRoutingResult("second-consumer");
    }
}
@Slf4j
@Component
public class FirstConsumer implements Consumer<Message<?>> {
    @Override
    public void accept(Message<?> message) {
        log.info("Received message in FirstConsumer with payload: " + message.getPayload() + " and headers: " + message.getHeaders());
    }
}
@Slf4j
@Component
public class SecondConsumer implements Consumer<Message<?>> {
    @Override
    public void accept(Message<?> message) {
        log.info("Received message in SecondConsumer with payload: " + message.getPayload() + " and headers: " + message.getHeaders());
    }
}

我的配置如下:

spring:
  cloud:
    stream:
      bindings:
        firstMessageRouter-in-0:
          destination: topic-1-input
          group: group-3
        secondMessageRouter-in-0:
          destination: topic-2-input
          group: group-4
      kafka:
        streams:
          defaultBrokerPort: 9092
          brokers: localhost

我肯定我错过了一些东西,尤其是函数式编程。有人能解释一下用什么方法可以实现不同的消费者吗?
我也尝试过在Configuration类中使用这个application.yml声明消费者,但是如果我需要不同的详细说明,我需要在这里导入所有的服务,我宁愿不这样做。

@Configuration
@Slf4j
public class KafkaConfiguration {

    @Bean("first-process")
    public Consumer<Message<?>> process() {
        return message -> {
            log.info("Received headers: " + message.getHeaders());
            log.info("Received payload: " + message.getPayload());
            
        };
    }

    @Bean("second-process")
    Consumer<String> topic1() {
        return str -> {
            log.info("Received message in topic1: " + str);
        };
    }
}
spring:
  cloud:
    stream:
      function:
        definition: first-process; second-process
      bindings:
        first-process-in-0:
          destination: topic.input
          group: group-1
        second-process-in-0:
          destination: topic-1
          group: group-2
      kafka:
        streams:
          defaultBrokerPort: 9092
          brokers: localhost
kgqe7b3p

kgqe7b3p1#

我不明白,任何路由器的关键是提供路由到其他功能,所以它只能通过IF/ELSE语句来完成。在StreamListener中,您暴露于相同的IF/ELSE机制,尽管是通过直接连接到消息通道,这是一个内部细节,一开始就不应该暴露。
除了支持MessageRoutingCallback之外,我们还支持SpEL表达式,当基于头值进行路由时,SpEL表达式工作得非常好,但如果您的路由逻辑很复杂,则MessageRoutingCallback是一个不错的选择。-https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_routing_to_consumer拥有多个MessageRoutingCallback示例只会污染您的代码,感觉像是反模式,因为您根本没有在那里进行任何路由。

相关问题