如何在Spring Cloud Stream Kafka应用程序中根据条件向多个主题之一发送消息

hjzp0vay  于 2022-12-28  发布在  Spring
关注(0)|答案(1)|浏览(159)

目前我有一个spring clound函数,它消耗一个主题并发布到另一个主题。现在我有多个主题,需要根据spring cloud函数的某些检查将消息发布到多个主题中的一个。我该如何实现呢?这里是当前的实现。

@Bean("producerBean")
    public Function<Message<SourceMessage>, Message<SinkMessage>> producerBean(SinkService<SourceMessage> sinkService) {
        return sinkService::processMessage;
    }

@Service("SinkService")
public class SinkService<T> {

    public Message<SinkMessage> processMessage(Message<SourceMessage> message) {
        log.info("Message consumed at {} \n{}", message.getHeaders().getTimestamp(), message.getPayload());
        try {
            if (message.getPayload().isManaged()) {
                /*
                Need to add one more check here.
                if (type==2)
                    send to topic1
                else if(type==4)
                    send to topic2
                else
                    Just log the type, do not send to any topic.
                 */
                Message<SinkMessage> output = new GenericMessage<>(new SinkMessage());
                output.getPayload().setPayload(message.getPayload());
                return output;
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return null;
    }
}
    • 应用程序.属性**
spring.cloud.stream.kafka.binder.brokers=${bootstrap.servers}
spring.cloud.stream.kafka.binder.configuration.enable.idempotence=false
spring.cloud.stream.binders.test_binder.type=kafka

spring.cloud.stream.bindings.producerBean.binder=test_binder
spring.cloud.stream.bindings.producerBean-in-0.destination=${input-destination}
spring.cloud.stream.bindings.producerBean-in-0.group=${input-group}
spring.cloud.stream.bindings.producerBean-out-0.destination=topic1
spring.cloud.stream.bindings.producerBean-out-1.destination=topic2
    • 聚合物. xml**
<dependency>
   <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <version>3.2.5</version>
</dependency>
gcxthw6b

gcxthw6b1#

你可以使用带kafka-topicname的StreamBridge,spring-cloud会在运行时自动绑定它。这种方法也会自动创建主题,如果不存在,你可以关闭它。
@自动连线专用最终StreamBridge;

public void sendDynamically(Message message, String topicName) {
    streamBridge.send(route, topicName);
}

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_streambridge_and_dynamic_destinations

相关问题