kafka client poll()在每个接收到的消息之后抛出eoferror

xqkwcwgp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(580)

我使用的是在合流github中找到的示例客户端轮询代码的一个轻微变体:

c = Consumer({'bootstrap.servers':'localhost:9092','group.id':'devops','auto.offset.reset':'earliest'})
c.subscribe(['system-diskio-write-bytes','system-cpu-user-pct'])

try:
    while True:
        msg = c.poll(timeout=1000.0)
        if msg is None:
            continue
        if msg.error():
            print(msg.error())
        else:
            print('topic: %s key: %s value: %s' % (msg.topic(), msg.key(), msg.value()))

except KeyboardInterrupt:
    print('Polling interrupted by consumer')

每次收到消息后,都会引发eof kafkaerror:

topic: system-diskio-write-bytes key: None value: b'{"route" : "system-diskio-write-bytes", "timestamp" : 2019-03-06T13:46:25.244, "value" : 655002980352.0}'
KafkaError{code=_PARTITION_EOF,val=-191,str="Broker: No more messages"}

我不明白为什么会发生这种情况——关于为什么会抛出这个错误,有什么想法吗?如何修复?任何想法都非常感谢——谢谢!

m3eecexj

m3eecexj1#

好的,答案在这里很详细。解释如下:
每次使用者到达新的结束偏移量时,都会在内部消息队列(由poll()提供服务)上推送一个eof事件,无论应用程序是否在10秒内未调用poll()。
可以按如下方式禁用eof事件:

enable.partition.eof=false

相关问题