我在kubernetes(aws)上运行了一个kafka集群。每个代理都有一个对应的外部负载平衡器(elb)和afaict,kafka的 advertised.listeners
已适当设置,以便在客户端查询代理信息时返回elb的dns名称。大多数设置与这里提到的类似。
我创建了一个kafka消费者,没有指定任何group-id。对于这个消费者,从一个主题中读取消息就可以了。但是,如果在创建kafka使用者时设置了组id,则返回以下错误消息:
2018-01-30 22:04:16,763.763.313055038:kafka.cluster:140735643595584:INFO:74479:Group coordinator for my-group-id is BrokerMetadata(nodeId=2, host=u'a17ee9a8a032411e8a3c902beb474154-867008169.us-west-2.elb.amazonaws.com', port=32402, rack=None)
2018-01-30 22:04:16,763.763.804912567:kafka.coordinator:140735643595584:INFO:74479:Discovered coordinator 2 for group my-group-id
2018-01-30 22:04:16,764.764.270067215:kafka.coordinator.consumer:140735643595584:INFO:74479:Revoking previously assigned partitions set([]) for group my-group-id
2018-01-30 22:04:16,866.866.26291275:kafka.coordinator:140735643595584:INFO:74479:(Re-)joining group my-group-id
2018-01-30 22:04:16,898.898.787975311:kafka.coordinator:140735643595584:INFO:74479:Joined group 'my-group-id' (generation 1) with member_id kafka-python-1.3.5-e31607c2-45ec-4461-8691-260bb84c76ba
2018-01-30 22:04:16,899.899.425029755:kafka.coordinator:140735643595584:INFO:74479:Elected group leader -- performing partition assignments using range
2018-01-30 22:04:16,936.936.614990234:kafka.coordinator:140735643595584:WARNING:74479:Marking the coordinator dead (node 2) for group my-group-id: [Error 15] GroupCoordinatorNotAvailableError.
2018-01-30 22:04:17,069.69.8890686035:kafka.cluster:140735643595584:INFO:74479:Group coordinator for my-group-id is BrokerMetadata(nodeId=2, host=u'my-elb.us-west-2.elb.amazonaws.com', port=32402, rack=None)
``` `my-elb.us-west-2.elb.amazonaws.com:32402` 可从客户端访问。我曾经 `kafkacat` 并设置 `my-elb.us-west-2.elb.amazonaws.com:32402` 作为代理地址,它能够列出主题、消费主题等。
有什么问题吗?
2条答案
按热度按时间ubof19bj1#
问题是server.properties中的3个配置设置设置不正确。
默认的最小同步副本数为2(
min.insync.replicas=2
). 但是,内部主题设置的复制因子为1(offsets.topic.replication.factor=1
).当消费者连接到一个组ip时,它的相应条目必须是
__consumer_offsets
主题。更新此主题时,只编写了一个副本。这引发了同步副本数量低于所需数量的错误。我将所需的同步副本数更改为1,然后一切都开始正常工作。
xoefb8l82#
当使用者客户端和协调器之间存在网络通信错误时,会将协调器标记为死亡(当协调器死亡并且组需要重新平衡时,也会发生这种情况)。有多种情况(偏移提交请求、获取偏移量等)会导致此问题。因此,要找到根本原因问题,需要将日志记录级别设置为跟踪和调试:
logging.level.org.apache.kafka=跟踪