get使用streambridge处理kafka消息的分区和偏移量

cngwdvgl  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(709)

我需要打印/记录/存储Kafka分区和偏移量,在其中我的消息正在被处理。我怎样才能做到这一点?我使用streambridge从producer发送消息,还使用函数spring-kafka-streams方法

Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}
kd3sttzy

kd3sttzy1#

记录元数据通过元数据通道(异步)可用:

@SpringBootApplication
public class So66436499Application {

    public static void main(String[] args) {
        SpringApplication.run(So66436499Application.class, args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            this.bridge.send("myBinding", "test");
            Thread.sleep(5000);
        };
    }

    @ServiceActivator(inputChannel = "meta")
    void meta(Message<?> sent) {
        System.out.println("Sent: " + sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
    }

}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5

相关问题