我的任务是统计Kafka主题中的消息(有些有一个分区,有些有许多分区)。我尝试了两种技术:一个是subscribe()
,另一个是assign()
。
完整代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import kafka
from kafka.structs import TopicPartition
def count_messages(consumer):
i = 0
while True:
records = consumer.poll(50) # timeout in millis
if not records:
break
for _, consumer_records in records.items():
for _ in consumer_records:
i += 1
return i
def show_messageges_assign(kafka_server, topic):
cnt = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
try:
partitions = consumer.partitions_for_topic(topic)
if partitions:
for partition in partitions:
tp = TopicPartition(topic, partition)
consumer.assign([tp])
consumer.seek(partition=tp, offset=0)
cnt += count_messages(consumer)
finally:
consumer.close()
print('%s assign, partitions: %s, msg cnt=%d' % (topic, partitions, cnt))
def show_messageges_subscribe(kafka_server, topic):
cnt = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
try:
consumer.subscribe([topic])
cnt += count_messages(consumer)
finally:
consumer.close()
print('%s, subscribe, msg cnt=%d' % (topic, cnt))
def test_topic(kafka_server, topic):
show_messageges_assign(kafka_server, topic)
show_messageges_subscribe(kafka_server, topic)
print('')
def main():
kafka_server = '169.0.1.77:9092'
test_topic(kafka_server, 'gc.ifd.analyse.fdp')
test_topic(kafka_server, 'gc.ifd.result.fdp')
if __name__ == '__main__':
main()
问题:
subscribe()
在某些机器上不工作。在同一台机器上assign()
工作,但不是很可靠。assign()
在我所有的测试机器上都可以工作,但是当有很多分区时,我认为它不会从所有分区读取数据。
结果如下:
经过测试,两个主题共有1435条消息。我可以在Web浏览器“Apache Kafka的UI”中看到它们。
在我的4台测试机器上的结果:
机器1和2:
[root@test-kafka emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435
[root@igg emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435
机器3和4:
[mn: emulator] python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0
[root@test-gc emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0
正如你所看到的,使用assign()
,我可以在所有测试机器上读取4个分区的主题,但不是所有的消息都被读取。
在两台机器上subscribe()
不读取任何消息,而在另外两台机器上它读取所有消息。
我的代码或环境有问题吗?
我根据StephaneD改变了poll()
的用法。评论:
def count_messages(consumer):
i = 0
try_cnt = 0
while True:
records = consumer.poll(50) # timeout in millis
if not records:
try_cnt += 1
if try_cnt > 10:
break
for _, consumer_records in records.items():
for _ in consumer_records:
i += 1
return i
有了这个变化,我的程序似乎可以读取所有带有subscribe()
和assign()
的消息。
1条答案
按热度按时间vatpfxk51#
如果没有来自
consumer.poll()
的记录,请不要停止:增加一个重试策略,至少检查Xpoll()
是否为空,然后停止。我不认为Kafka中有任何保证
poll()
总是返回一些东西(即使你知道有一些东西),这是由于Kafka代理算法/优化。