使用kafka 0.11.0.x调试(“组{}的协调器发现失败,刷新元数据”,groupid)

inb24sb2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(226)

我正在使用kafka(版本0.11.0.2)服务器api在localhost中启动kafka代理。因为它运行时没有任何问题。producer也可以成功发送消息。但consumer无法获取消息,并且控制台中没有任何错误日志。所以我调试了代码并循环**“刷新元数据”**。
这是源代码

while (coordinatorUnknown()) {
        RequestFuture<Void> future = lookupCoordinator();
        client.poll(future, remainingMs);

        if (future.failed()) {
            if (future.isRetriable()) {
                remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
                if (remainingMs <= 0)
                    break;

                log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId);
                client.awaitMetadataUpdate(remainingMs);
            } else
                throw future.exception();
        } else if (coordinator != null && client.connectionFailed(coordinator)) {
            // we found the coordinator, but the connection has failed, so mark
            // it dead and backoff before retrying discovery
            coordinatorDead();
            time.sleep(retryBackoffMs);
        }

        remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
        if (remainingMs <= 0)
            break;
    }

添加:我把Kafka的版本改为****0.10.x,运行正常。
这是我的Kafka服务器代码。

private static void startKafkaLocal() throws Exception {
    final File kafkaTmpLogsDir = File.createTempFile("zk_kafka", "2");
    if (kafkaTmpLogsDir.delete() && kafkaTmpLogsDir.mkdir()) {
        Properties props = new Properties();
        props.setProperty("host.name", KafkaProperties.HOSTNAME);
        props.setProperty("port", String.valueOf(KafkaProperties.KAFKA_SERVER_PORT));
        props.setProperty("broker.id", String.valueOf(KafkaProperties.BROKER_ID));
        props.setProperty("zookeeper.connect", KafkaProperties.ZOOKEEPER_CONNECT);
        props.setProperty("log.dirs", kafkaTmpLogsDir.getAbsolutePath());
        //advertised.listeners=PLAINTEXT://xxx.xx.xx.xx:por
  // flush every message.

        // flush every 1ms
        props.setProperty("log.default.flush.scheduler.interval.ms", "1");
        props.setProperty("log.flush.interval", "1");
        props.setProperty("log.flush.interval.messages", "1");
        props.setProperty("replica.socket.timeout.ms", "1500");
        props.setProperty("auto.create.topics.enable", "true");
        props.setProperty("num.partitions", "1");

        KafkaConfig kafkaConfig = new KafkaConfig(props);

        KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig);
        kafka.startup();
        System.out.println("start kafka ok "+kafka.serverConfig().numPartitions());
    }
}

谢谢。

c3frrgcw

c3frrgcw1#

Kafka0.11,如果你 num.partitions 要设置为1,还需要设置以下3个设置:

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

运行0.11时,从服务器日志中可以明显看出这一点。

相关问题