java Reactor Kafka中基于分区排序的并发处理未按预期工作

yjghlzjz  于 2022-12-21  发布在  Java
关注(0)|答案(1)|浏览(329)

我正在开发一个示例ReactKafka应用程序,它将从Kafka主题的多个分区(在我的例子中是5个分区)中读取数据,同时处理必须按分区排序的记录,然后将其发布到另一个主题。
请参考下面的示例代码:

@Bean
    Map<String, Object> kafkaConsumerConfiguration() {
        Map<String, Object> configuration = new HashMap<>();
        configuration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configuration.put(ConsumerConfig.GROUP_ID_CONFIG, "sampleGroupId");
        configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return configuration;
    }

    @Bean
    ReceiverOptions kafkaReceiverOptions(@Value("${kafka.topic.in}") String inTopicName) {
        ReceiverOptions<String, String> options = ReceiverOptions.create(kafkaConsumerConfiguration());
        return options.addAssignListener(assignments -> log.info("Assigned: " + assignments))
                .subscription(Collections.singletonList(inTopicName));
    }

    @Bean
    KafkaReceiver<String, String> reactiveKafkaReceiver(ReceiverOptions<String, String> kafkaReceiverOptions) {
        return KafkaReceiver.create(kafkaReceiverOptions);
    }

    @EventListener(ApplicationStartedEvent.class)
    public void onMessage() {
        reactiveKafkaReceiver
                .receive()
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        partitionFlux.publishOn(scheduler)
                                .map(r -> processRecord(partitionFlux.key(), r))
                                .sample(Duration.ofMillis(5000))
                                .concatMap(offset -> offset.commit()))
                .subscribe();
    }

在运行应用程序时,从日志中,我注意到调度程序为每个分区创建了5个线程,每个线程负责使用来自该分区的事件。
我现在面临的问题是所有5个线程不是同时运行的,因此即使每个分区都有1000条记录需要处理,也无法并行地使用所有分区。这会导致总处理时间的显著增加。
有谁能帮我解决一下我在这里遗漏了什么吗?或者我们如何并行地从所有分区中读取数据,同时还要处理分区内的排序。

ffdz8vbo

ffdz8vbo1#

请尝试使用ReactiveKafkaConsumerTemplate创建多个消费者,同时使用一个公共的Scheduler执行业务流,这样每个消费者将独立消费数据。
配置:

@Bean
@MessageFlowScheduler
public Scheduler messageFlowScheduler() {
    return Schedulers.newParallel("message-flow-scheduler", customKafkaProperties.getFlowSchedulerConcurrency());
}

@Bean
public List<ReactiveKafkaConsumerTemplate<String, RawMessage>> kafkaConsumerTemplates(@MessageFlowScheduler Scheduler messageFlowScheduler) {
    final var consumerConcurrency = customKafkaProperties.getReactiveConsumer().getConsumerConcurrency();
    LOG.info("Kafka consumer concurrency level is {}", consumerConcurrency);
    final var consumers = new ArrayList<ReactiveKafkaConsumerTemplate<String, RawMessage>>();
    for (int i = 0; i < consumerConcurrency; i++) {
        final var kafkaReceiverOptions = ReceiverOptions.<String, RawMessage>create(kafkaProperties.buildConsumerProperties())
                                             .schedulerSupplier(() -> messageFlowScheduler)
                                             .subscription(singleton(topicsProperties.getGlobalRawTopic()));
        consumers.add(new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions));
    }
    return consumers;
}

代码

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaMessageFlowListener {
    private final List<ReactiveKafkaConsumerTemplate<String, RawMessage>> consumers;

    @EventListener(ApplicationStartedEvent.class)
    public void startFlow() {
        consumers.forEach(consumer -> {
            consumer
                .receive()
                // ...
                .flatMap(receiverRecord -> receiverRecord.receiverOffset().commit())
                .subscribe();
        });
    }
}

相关问题