使用subscribe()和assign()从Kafka主题阅读之间的奇怪区别

svujldwt  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(121)

我的任务是统计Kafka主题中的消息(有些有一个分区,有些有许多分区)。我尝试了两种技术:一个是subscribe(),另一个是assign()
完整代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import kafka
from kafka.structs import TopicPartition

def count_messages(consumer):
    i = 0
    while True:
        records = consumer.poll(50)     # timeout in millis
        if not records:
            break
        for _, consumer_records in records.items():
            for _ in consumer_records:
                i += 1
    return i

def show_messageges_assign(kafka_server, topic):
    cnt = 0
    consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
    try:
        partitions = consumer.partitions_for_topic(topic)
        if partitions:
            for partition in partitions:
                tp = TopicPartition(topic, partition)
                consumer.assign([tp])
                consumer.seek(partition=tp, offset=0)
                cnt += count_messages(consumer)
    finally:
        consumer.close()
    print('%s assign, partitions: %s, msg cnt=%d' % (topic, partitions, cnt))

def show_messageges_subscribe(kafka_server, topic):
    cnt = 0
    consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
    try:
        consumer.subscribe([topic])
        cnt += count_messages(consumer)
    finally:
        consumer.close()
    print('%s, subscribe, msg cnt=%d' % (topic, cnt))

def test_topic(kafka_server, topic):
    show_messageges_assign(kafka_server, topic)
    show_messageges_subscribe(kafka_server, topic)
    print('')

def main():
    kafka_server = '169.0.1.77:9092'
    test_topic(kafka_server, 'gc.ifd.analyse.fdp')
    test_topic(kafka_server, 'gc.ifd.result.fdp')

if __name__ == '__main__':
    main()

问题:

  1. subscribe()在某些机器上不工作。在同一台机器上assign()工作,但不是很可靠。
  2. assign()在我所有的测试机器上都可以工作,但是当有很多分区时,我认为它不会从所有分区读取数据。
    结果如下:
    经过测试,两个主题共有1435条消息。我可以在Web浏览器“Apache Kafka的UI”中看到它们。
    在我的4台测试机器上的结果:
    机器1和2:
[root@test-kafka emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435

[root@igg emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435

机器3和4:

[mn: emulator] python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0

[root@test-gc emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0

正如你所看到的,使用assign(),我可以在所有测试机器上读取4个分区的主题,但不是所有的消息都被读取。
在两台机器上subscribe()不读取任何消息,而在另外两台机器上它读取所有消息。
我的代码或环境有问题吗?
我根据StephaneD改变了poll()的用法。评论:

def count_messages(consumer):
    i = 0
    try_cnt = 0
    while True:
        records = consumer.poll(50)     # timeout in millis
        if not records:
            try_cnt += 1
            if try_cnt > 10:
                break
        for _, consumer_records in records.items():
            for _ in consumer_records:
                i += 1
    return i

有了这个变化,我的程序似乎可以读取所有带有subscribe()assign()的消息。

vatpfxk5

vatpfxk51#

如果没有来自consumer.poll()的记录,请不要停止:增加一个重试策略,至少检查X poll()是否为空,然后停止。
我不认为Kafka中有任何保证poll()总是返回一些东西(即使你知道有一些东西),这是由于Kafka代理算法/优化。

相关问题