我已经创建了一个kafkaproducer,使用reactor kafka(kafka的一个函数JavaAPI)创建了(reactor.kafka.sender.kafkasender)。使用以下生产者配置,
max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all
当我试图将记录发布到无效主题时,出现超时异常
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms
正如所料。但是我已经为没有发生的重试进行了配置。我的假设是 max.block.ms
/ request.timeout.ms
已过期,将在以后重试 retry.backoff.ms
直到 metadata.max.age.ms
或者 retries
你已经筋疲力尽了。仅供参考,代码:
String topic = "order/";
int count = 1;
Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
Event event = new Event();
return SenderRecord.create(
new ProducerRecord<String, Event>(topic, event.getX(),
event), event.getEvent());
});
kafkaSender.send(source).subscribe(x -> System.out.println(x));
kafkaSender.close();
启用重试的配置是否正确?
之后什么时候重试 request.timeout.ms
/ max.block.ms
?
需要对上述代码进行哪些更改才能允许重试?
1条答案
按热度按时间vq8itlhq1#
我相信你也应该设置“delivery.timeout.ms”
请参阅此处的文档:https://docs.confluent.io/current/installation/configuration/producer-configs.html#retries
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.