问题,同时发送数据流量的ReactKafka

vzgqcmou  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(218)

我使用reactor库从网络中获取大量数据流,并使用reactive kafka方法将其发送给kafka代理。
下面是我正在使用的Kafka制作人

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) {

        AtomicInteger sentCount = new AtomicInteger(0);
        AtomicInteger fCount = new AtomicInteger(0);

        records.doOnNext(r -> fCount.incrementAndGet()).subscribe();
        System.out.println("Total Records: " + fCount);

        sender.send(records.doOnNext(r -> sentCount.incrementAndGet())
                .map(record -> {
                    LogRecord lrec = record.getRecords().get(0);
                    String id = lrec.getId();
                    return SenderRecord.create(new ProducerRecord<>(topic, id,
                            lrec.toString()), id);
                })).then()
                .doOnError(e -> {
                    log.error("[FAIL]: Send to the topic: '{}' failed. "
                            + e, topic);
                })
                .doOnSuccess(s -> {
                    log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
                })
                .subscribe();
    }

}

flux中的记录总数(fcount)和发送到kafka主题的记录总数(sentcount)不匹配,它不会给出任何错误并成功完成。
例如:在其中一种情况下,通量中的记录总数是2758,而发送到Kafka的计数是256。有没有Kafka的配置,需要修改,还是我遗漏了什么?

根据评论更新

sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            sleep(5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic, id,
                    lrec.toString()), id);
        })).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' failed. "
                    + e, topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
        })
        .subscribe();
sleep(10); // sleep for 10 ns

上面的代码在一个系统中运行良好,但在另一个系统中无法发送所有消息。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题