目前我有一个spring clound函数,它消耗一个主题并发布到另一个主题。现在我有多个主题,需要根据spring cloud函数的某些检查将消息发布到多个主题中的一个。我该如何实现呢?这里是当前的实现。
@Bean("producerBean")
public Function<Message<SourceMessage>, Message<SinkMessage>> producerBean(SinkService<SourceMessage> sinkService) {
return sinkService::processMessage;
}
@Service("SinkService")
public class SinkService<T> {
public Message<SinkMessage> processMessage(Message<SourceMessage> message) {
log.info("Message consumed at {} \n{}", message.getHeaders().getTimestamp(), message.getPayload());
try {
if (message.getPayload().isManaged()) {
/*
Need to add one more check here.
if (type==2)
send to topic1
else if(type==4)
send to topic2
else
Just log the type, do not send to any topic.
*/
Message<SinkMessage> output = new GenericMessage<>(new SinkMessage());
output.getPayload().setPayload(message.getPayload());
return output;
}
} catch (Exception exception) {
exception.printStackTrace();
}
return null;
}
}
- 应用程序.属性**
spring.cloud.stream.kafka.binder.brokers=${bootstrap.servers}
spring.cloud.stream.kafka.binder.configuration.enable.idempotence=false
spring.cloud.stream.binders.test_binder.type=kafka
spring.cloud.stream.bindings.producerBean.binder=test_binder
spring.cloud.stream.bindings.producerBean-in-0.destination=${input-destination}
spring.cloud.stream.bindings.producerBean-in-0.group=${input-group}
spring.cloud.stream.bindings.producerBean-out-0.destination=topic1
spring.cloud.stream.bindings.producerBean-out-1.destination=topic2
- 聚合物. xml**
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>3.2.5</version>
</dependency>
1条答案
按热度按时间gcxthw6b1#
你可以使用带kafka-topicname的StreamBridge,spring-cloud会在运行时自动绑定它。这种方法也会自动创建主题,如果不存在,你可以关闭它。
@自动连线专用最终StreamBridge;
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_streambridge_and_dynamic_destinations