kafka消费者尝试连接到随机主机名,而不是正确的主机名

k5ifujac  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(576)

我刚到Kafka,开始探索与样本程序。它以前工作时没有任何问题,但突然consumer.poll()命令挂起,永远不会返回。谷歌建议检查服务器是否可以访问。生产者和消费者java代码在同一台机器上运行,生产者可以将记录发布到kafka,但消费者轮询方法挂起。
环境:
Kafka版本:1.1.0
客户端:java
在windows中的ubuntu docker容器中运行
zookeeper和两个代理服务器运行在同一个容器中
当我为客户端代码启用日志记录时,我看到以下异常:

  1. 2018-07-06 21:24:18 DEBUG NetworkClient:802 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Error connecting to node 4bdce773eb74:9095 (id: 2 rack: null)
  2. java.io.IOException: Can't resolve address: 4bdce773eb74:9095
  3. at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
  4. at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
  5. .................
  6. .................

我不知道为什么消费者试图连接到 4bdce773eb74 即使我的代理服务器 192.168.99.100:9094,192.168.99.100:9095 . 以及我的完整消费代码:

  1. final String BOOTSTRAP_SERVERS = "192.168.99.100:9094,192.168.99.100:9095";
  2. final Properties props = new Properties();
  3. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  4. props.put(ConsumerConfig.GROUP_ID_CONFIG, "Event_Consumer");
  5. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
  6. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  7. KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
  8. TopicPartition tpLogin = new TopicPartition("login1", 0);
  9. TopicPartition tpLogout = new TopicPartition("logout1", 1);
  10. List<TopicPartition> tps = Arrays.asList(tpLogin, tpLogout);
  11. consumer.assign(tps);
  12. while (true) {
  13. final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
  14. if (consumerRecords.count()==0) {
  15. continue;
  16. }
  17. consumerRecords.forEach(record -> {
  18. System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
  19. record.partition(), record.offset());
  20. });
  21. consumer.commitAsync();
  22. Thread.sleep(5000);
  23. }
  24. }

请帮忙解决这个问题。
如前所述,我有2个代理,比如broker-1和broker-2。如果我停止broker-1,那么上面的异常将不会被记录,但仍然是 poll() 方法未返回。如果我停止broker-1,将无限期地记录以下消息:

  1. 2018-07-07 11:31:24 DEBUG AbstractCoordinator:579 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Sending FindCoordinator request to broker 192.168.99.100:9094 (id: 1 rack: null)
  2. 2018-07-07 11:31:24 DEBUG AbstractCoordinator:590 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Received FindCoordinator response ClientResponse(receivedTimeMs=1530943284196, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=573), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
  3. 2018-07-07 11:31:24 DEBUG AbstractCoordinator:613 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Group coordinator lookup failed: The coordinator is not available.
  4. 2018-07-07 11:31:24 DEBUG AbstractCoordinator:227 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Coordinator discovery failed, refreshing metadata

提前谢谢,索曼

jc3wubiy

jc3wubiy1#

我发现了问题。当我创建主题时,broker-0(在port:9093; broker id:0)和broker-2(运行于port:9094; 代理id:2)正在运行。今天我错误地启动了broker-1(运行在port:9095; broker id:1)和broker-2。在停止broker-1并启动broker-0之后,解决问题。现在消费者可以得到事件。
我这边肯定是人为失误,但我有两点意见:
我认为Kafka应该优雅地使用broker-2(端口号:9094)而忽略broker-1(端口号:9095)
为什么Kafka试图联系4bdce773eb74:9095,而不是正确的ip地址(192.168.99.100)?
谢谢。

相关问题