我的apachekafka生产者(0.9.0.1)间歇性地抛出
org.apache.kafka.common.errors.NotLeaderForPartitionException
我执行Kafka发送的代码类似于
final Future<RecordMetadata> futureRecordMetadata = KAFKA_PRODUCER.send(new ProducerRecord<String, String>(kafkaTopic, UUID.randomUUID().toString(), jsonMessage));
try {
futureRecordMetadata.get();
} catch (final InterruptedException interruptedException) {
interruptedException.printStackTrace();
throw new RuntimeException("sendKafkaMessage(): Failed due to InterruptedException(): " + sourceTableName + " " + interruptedException.getMessage());
} catch (final ExecutionException executionException) {
executionException.printStackTrace();
throw new RuntimeException("sendKafkaMessage(): Failed due to ExecutionException(): " + sourceTableName + " " + executionException.getMessage());
}
我接住了 NotLeaderForPartitionException
在 catch (final ExecutionException executionException) {}
阻止。
忽略这个特殊的例外可以吗?
我的Kafka信息发送成功了吗?
1条答案
按热度按时间vcirk6k61#
如果你收到
NotLeaderForPartitionException
,未成功写入数据。每个主题分区由一个或多个代理存储(有一个leader;其余的代理称为追随者(followers),具体取决于您的复制因子。生产者需要向leader broker发送新消息(到follower的数据复制在内部进行)。
您的producer客户机没有连接到正确的代理,即连接到跟随者而不是领导者(或者连接到不再是跟随者的代理),并且此代理拒绝您的发送请求。如果领导者发生了变化,但是生产者仍然有过时的缓存元数据,关于哪个代理是分区的领导者,就会发生这种情况。