在我的springboot应用程序中,我需要使用来自2个不同Kafka主题的消息。
在第一个主题中,所有消息都遵循相同的详细说明。在第二个主题中,我需要通过一个头值来做两个不同的详细说明。
过去我使用@StreamListener,但现在已经弃用了。我如何使用干净的代码在application.yml中配置新的消费者?
我只看到了一个MessageRoutingCallback的示例,但是我不喜欢对所有主题使用单个Router,因为我不想保留所有的“if”语句来划分详细说明。
我试图定义2个路由器和2个消费者,但应用程序没有启动,因为不能接受两个bean。
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 3 of method functionRouter in org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration required a single bean, but 2 were found:
- firstMessageRouter: defined in file [C:\Workspace-IntelliJ\TEST\test-kafka-cloud-stream\target\classes\org\example\router\FirstMessageRouter.class]
- secondMessageRouter: defined in file [C:\Workspace-IntelliJ\TEST\test-kafka-cloud-stream\target\classes\org\example\router\SecondMessageRouter.class]
Action:
Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
@Component
@Slf4j
public class FirstMessageRouter implements MessageRoutingCallback {
@Override
public FunctionRoutingResult routingResult(Message<?> message) {
log.info("First router");
log.info("Headers: " + message.getHeaders());
log.info("Payload: " + message.getPayload());
return new MessageRoutingCallback.FunctionRoutingResult("firstconsumer");
}
}
@Component
@Slf4j
public class SecondMessageRouter implements MessageRoutingCallback {
@Override
public FunctionRoutingResult routingResult(Message<?> message) {
log.info("Second router");
log.info("Headers: " + message.getHeaders());
log.info("Payload: " + message.getPayload());
return new FunctionRoutingResult("second-consumer");
}
}
@Slf4j
@Component
public class FirstConsumer implements Consumer<Message<?>> {
@Override
public void accept(Message<?> message) {
log.info("Received message in FirstConsumer with payload: " + message.getPayload() + " and headers: " + message.getHeaders());
}
}
@Slf4j
@Component
public class SecondConsumer implements Consumer<Message<?>> {
@Override
public void accept(Message<?> message) {
log.info("Received message in SecondConsumer with payload: " + message.getPayload() + " and headers: " + message.getHeaders());
}
}
我的配置如下:
spring:
cloud:
stream:
bindings:
firstMessageRouter-in-0:
destination: topic-1-input
group: group-3
secondMessageRouter-in-0:
destination: topic-2-input
group: group-4
kafka:
streams:
defaultBrokerPort: 9092
brokers: localhost
我肯定我错过了一些东西,尤其是函数式编程。有人能解释一下用什么方法可以实现不同的消费者吗?
我也尝试过在Configuration类中使用这个application.yml声明消费者,但是如果我需要不同的详细说明,我需要在这里导入所有的服务,我宁愿不这样做。
@Configuration
@Slf4j
public class KafkaConfiguration {
@Bean("first-process")
public Consumer<Message<?>> process() {
return message -> {
log.info("Received headers: " + message.getHeaders());
log.info("Received payload: " + message.getPayload());
};
}
@Bean("second-process")
Consumer<String> topic1() {
return str -> {
log.info("Received message in topic1: " + str);
};
}
}
spring:
cloud:
stream:
function:
definition: first-process; second-process
bindings:
first-process-in-0:
destination: topic.input
group: group-1
second-process-in-0:
destination: topic-1
group: group-2
kafka:
streams:
defaultBrokerPort: 9092
brokers: localhost
1条答案
按热度按时间kgqe7b3p1#
我不明白,任何路由器的关键是提供路由到其他功能,所以它只能通过
IF/ELSE
语句来完成。在StreamListener中,您暴露于相同的IF/ELSE
机制,尽管是通过直接连接到消息通道,这是一个内部细节,一开始就不应该暴露。除了支持
MessageRoutingCallback
之外,我们还支持SpEL表达式,当基于头值进行路由时,SpEL表达式工作得非常好,但如果您的路由逻辑很复杂,则MessageRoutingCallback
是一个不错的选择。-https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_routing_to_consumer拥有多个MessageRoutingCallback
示例只会污染您的代码,感觉像是反模式,因为您根本没有在那里进行任何路由。