我有以下Kafka消费者,它工作得很好,如果分配 group_id
到无-它收到了所有的历史消息和我的新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
但是,如果我设置 group_id
有点价值。我试着运行测试生成器来发送新消息,但是没有收到任何消息。
使用者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py (Re-)joining group my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py Successfully joined group my_group with generation 497
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py Updated partition assignment: []
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py Setting newly assigned partitions set() for group my_group
1条答案
按热度按时间cuxqih211#
一个主题的一个分区只能由同一个consumergroup中的一个使用者使用。
如果不设置group.id,kafkaconsumer将为您生成一个新的随机group.id。由于group.id是唯一的,您将看到数据正在被消耗。
如果有多个使用者使用相同的group.id运行,则只有一个使用者将读取数据,而另一个使用者将保持空闲,不使用任何内容。