Kafka消费群体不断重新平衡

bnl4lu3b  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(396)

我有一个简单的kafka设置,但是在日志中,我看到由于元数据更新,消费群体不断地重新平衡。我在日志中也看到了很多消费者的失败。
有多个使用者在屏幕会话中使用不同的主题运行,并且所有使用者都具有相同的组id。
这与我的消费群体身份相同有关系吗?另外,为什么我经常在Kafka日志中看到这些元更新?它只是每隔几分钟就不断发生。

[2020-11-18 18:49:32,489] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-11-18 18:51:14,303] INFO [GroupCoordinator 1001]: Stabilized group production generation 9283 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 18:51:14,320] INFO [GroupCoordinator 1001]: Assignment received from leader for group production for generation 9283 (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 18:57:56,868] INFO [GroupCoordinator 1001]: Preparing to rebalance group production in state PreparingRebalance with old generation 9283 (__consumer_offsets-3) (reason: removing member kafka-python-1.4.5-5e907847-8122-4687-94b6-75515611aa35 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 18:59:32,489] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-11-18 18:59:39,292] INFO [GroupCoordinator 1001]: Stabilized group production generation 9284 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 18:59:39,305] INFO [GroupCoordinator 1001]: Assignment received from leader for group production for generation 9284 (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 18:59:39,586] INFO [GroupCoordinator 1001]: Preparing to rebalance group production in state PreparingRebalance with old generation 9284 (__consumer_offsets-3) (reason: Updating metadata for member kafka-python-1.4.5-c17e3d67-0746-4f11-93e5-acbf5a4c63bc) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:00:06,623] INFO [GroupCoordinator 1001]: Stabilized group production generation 9285 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:00:06,640] INFO [GroupCoordinator 1001]: Assignment received from leader for group production for generation 9285 (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:00:28,892] INFO [GroupCoordinator 1001]: Preparing to rebalance group production in state PreparingRebalance with old generation 9285 (__consumer_offsets-3) (reason: Adding new member kafka-python-1.4.5-ea59412c-a4bb-4e1b-8f87-77d91d1d20a5) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:00:43,025] INFO [GroupCoordinator 1001]: Stabilized group production generation 9286 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:00:43,037] INFO [GroupCoordinator 1001]: Assignment received from leader for group production for generation 9286 (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:00:43,347] INFO [GroupCoordinator 1001]: Preparing to rebalance group production in state PreparingRebalance with old generation 9286 (__consumer_offsets-3) (reason: Updating metadata for member kafka-python-1.4.5-c17e3d67-0746-4f11-93e5-acbf5a4c63bc) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:01:58,125] INFO [GroupCoordinator 1001]: Stabilized group production generation 9287 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2020-11-18 19:01:58,140] INFO [GroupCoordinator 1001]: Assignment received from leader for group production for generation 9287 (kafka.coordinator.group.GroupCoordinator)

这是我制作人的代码

from confluent_kafka.admin import AdminClient, NewTopic
def get_kafka_producer(**kwargs):
    bootstrap_servers = get_bootstrap_servers()
    return KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'),**kwargs)

kafka_producer = kafka_client.get_kafka_producer(api_version=api_version)

topic_key = bytes(tag_id, 'utf-8')
res = kafka_producer.send(topic, kafka_stream_data, key=topic_key).add_callback(
                    on_send_success).add_errback(on_send_error)
kafka_producer.close()

这是来自消费者的代码

import json
from kafka import KafkaConsumer
group_id = "Production"
topic = "TEST.*"

def get_consumer(group_id, configurations):
    '''returns KafkaConsumer object.'''
    return KafkaConsumer(group_id=group_id, value_deserializer=lambda v: json.loads(v.decode('utf-8')),**configurations)

def get_record(group_id, topics,**kwargs):
    '''poll the kafka cluster for the record and yields the record.
    default value serializer is json.
    You can pass list of individual topic names(strings) or a string with regex to poll data from specific streams.'''

    configurations = get_consumer_configurations()
    configurations.update(kwargs)
    consumer = get_consumer(group_id=group_id, configurations=configurations)

    # pass list of individual topic names(strings) or a string with regex
    if isinstance(topics, list):
        consumer.subscribe(topics=topics)
    elif isinstance(topics, str):
        consumer.subscribe(pattern=topics)

    # iterator for record generation
    for record in consumer:
        logger.info('patition: {} | offset: {} | key: {}'.format(
            record.partition, record.offset, record.key))
        yield record.value
        if not configurations.get('enable_auto_commit'):
            try:
                consumer.commit()
            except CommitFailedError:
                logger.info('Commited error by client, handled the commit failed error!')
def processor():
    for record in record_generator.get_record(group_id, topics):
        process_data(json.loads(record))

这是我的kafka安装yaml文件

version: "3.2"
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    # volumes:
    #   - /home/ubuntu/docker-volumes/zookeeper/data:/opt/zookeeper-3.4.13/data
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka:2.12-2.1.1
    ports:
      - 9092:9092
      - 9094:9094
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://kafka.nanoprecisedataservices.com:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
    volumes:
      - /home/ubuntu/docker-volumes/kafka/docker.sock:/var/run/docker.sock
      - /home/ubuntu/docker-volumes/kafka:/kafka
~

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题