Kafkapython优雅关闭消费者

edqdpe6u  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(283)

我试图优雅地关闭一个kafka消费者,但是脚本阻止了心跳线程的停止。我如何才能优雅地关闭消费者与Kafkapython的sigterm。这就是我所做的

  1. import logger as logging
  2. import time
  3. import sys
  4. from kafka import KafkaConsumer
  5. import numpy as np
  6. import signal
  7. log = logging.getLogger(__name__)
  8. class Cons:
  9. def __init__(self):
  10. signal.signal(signal.SIGINT, self.sigterm_handler)
  11. signal.signal(signal.SIGTERM, self.sigterm_handler)
  12. self.consumer = KafkaConsumer('dummy-topic', group_id='poll-test', bootstrap_servers=['b1'])
  13. def sigterm_handler(self, signum, frame):
  14. log.info("Sigterm handler")
  15. self.consumer.close(autocommit=False)
  16. sys.exit(0)
  17. def consume(self):
  18. try:
  19. while True:
  20. records = self.consumer.poll(timeout_ms=500, max_records=500)
  21. for topic_partition, consumer_records in records.items():
  22. for record in consumer_records:
  23. log.info("Got Record - {}".format(record))
  24. #code to manually commit
  25. except ValueError as e:
  26. log.exception("exception")
  27. if __name__ == '__main__':
  28. c=Cons()
  29. c.consume()

启用调试日志后,这就是我得到的输出,代码在此日志上被阻止。

  1. ^C2020-04-28 07:18:33,050 - MainThread - __main__ - INFO - Sigterm handler
  2. 2020-04-28 07:18:33,050 - MainThread - kafka.consumer.group - DEBUG - Closing the KafkaConsumer.
  3. 2020-04-28 07:18:33,051 - MainThread - kafka.coordinator - INFO - Stopping heartbeat thread

这背后的原因是什么?什么是关闭sigterm或sigint消费者的正确方法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题