我正在开发一个示例ReactKafka应用程序,它将从Kafka主题的多个分区(在我的例子中是5个分区)中读取数据,同时处理必须按分区排序的记录,然后将其发布到另一个主题。
请参考下面的示例代码:
@Bean
Map<String, Object> kafkaConsumerConfiguration() {
Map<String, Object> configuration = new HashMap<>();
configuration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configuration.put(ConsumerConfig.GROUP_ID_CONFIG, "sampleGroupId");
configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configuration;
}
@Bean
ReceiverOptions kafkaReceiverOptions(@Value("${kafka.topic.in}") String inTopicName) {
ReceiverOptions<String, String> options = ReceiverOptions.create(kafkaConsumerConfiguration());
return options.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.subscription(Collections.singletonList(inTopicName));
}
@Bean
KafkaReceiver<String, String> reactiveKafkaReceiver(ReceiverOptions<String, String> kafkaReceiverOptions) {
return KafkaReceiver.create(kafkaReceiverOptions);
}
@EventListener(ApplicationStartedEvent.class)
public void onMessage() {
reactiveKafkaReceiver
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()))
.subscribe();
}
在运行应用程序时,从日志中,我注意到调度程序为每个分区创建了5个线程,每个线程负责使用来自该分区的事件。
我现在面临的问题是所有5个线程不是同时运行的,因此即使每个分区都有1000条记录需要处理,也无法并行地使用所有分区。这会导致总处理时间的显著增加。
有谁能帮我解决一下我在这里遗漏了什么吗?或者我们如何并行地从所有分区中读取数据,同时还要处理分区内的排序。
1条答案
按热度按时间ffdz8vbo1#
请尝试使用
ReactiveKafkaConsumerTemplate
创建多个消费者,同时使用一个公共的Scheduler
执行业务流,这样每个消费者将独立消费数据。配置:
代码