Kafka继续生产的要求,即使经纪人是关闭

7fhtutme  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(327)

当前,当我创建producer来发送我的记录时,例如由于某些原因kafka不可用,producer会无限期地发送相同的消息。如何停止生成消息(例如,在我收到此错误3次后):

Connection to node -1 could not be established. Broker may not be available.

我用的是React堆Kafka制作人:

@Bean
    public KafkaSender<String, String> createSender() {
        return KafkaSender.create(senderOptions());
    }

    private SenderOptions<String, String> senderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducerRetries());
        return SenderOptions.create(props);
    }

然后用它来发送记录:

sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
            .flatMap(result -> {
                if (result.exception() != null) {
                    return Flux.just(ResponseEntity.badRequest()
                        .body(result.exception().getMessage()));
                }
                return Flux.just(ResponseEntity.ok().build());
            })
            .next();
olhwl3o2

olhwl3o21#

而不是专注于错误。解决问题-它没有连接到代理
您没有在撰写文件中覆盖此项,因此您的应用程序正在尝试连接到自身

bootstrap-servers: ${KAFKA_BOOTSTRAP_URL:localhost:9092}

在作曲中,你好像忘了这个

rest-proxy:
   environment:
       KAFKA_BOOTSTRAP_URL: kafka:9092

或者,如果可能,可以使用现有的融合rest代理docker映像,而不是重新创建轮子

bxgwgixi

bxgwgixi2#

您可以使用断路器模式来解决此类问题,但在应用此模式之前,请尝试查找根本原因,并且您的producerconfig.retries\u config属性似乎在某个地方被重写。

dvtswwa3

dvtswwa33#

我担心 clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); 不参与重试,它将迭代到 maxBlockTimeMs = 60000 默认情况下。您可以通过 ProducerConfig.MAX_BLOCK_MS_CONFIG 属性:

public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
    private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
                                                    + "These methods can be blocked either because the buffer is full or metadata unavailable."
                                                    + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";

更新
我们可以这样解决问题:

@PostMapping(path = "/v1/{topicName}")
public Mono<ResponseEntity<?>> postData(
    @PathVariable("topicName") String topicName, String message) {
    return sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
        .flatMap(result -> {
            if (result.exception() != null) {
                sender.close();
                return Flux.just(ResponseEntity.badRequest()
                    .body(result.exception().getMessage()));
            }
            return Flux.just(ResponseEntity.ok().build());
        })
        .next();
}

注意安全 sender.close(); 如果出现错误。
我认为是时候提出一个反对KafkaReact堆项目的问题,以允许关闭错误的生产者。

相关问题