我在python脚本中有以下代码
from kafka import KafkaProducer
kafka_producer = KafkaProducer(....)
kafka_producer.send(topic, value=message)
kafka_producer.flush()
logger.info('Done!') # this message is displayed
不过,我还是看到了下面的消息。看来消息发送成功了。为什么即使在 flush()
叫什么名字?
info:root:完成!
info:kafka.producer.kafka:关闭Kafka制作程序,超时0秒。
info:kafka.producer.kafka:继续强制关闭生产者,因为挂起的请求无法在超时0内完成。
info:kafka.conn:<brokerconnection node\u id=2主机=[ipv4('..',9092)]>:正在关闭连接。
1条答案
按热度按时间vs91vp4v1#
在这种情况下,我认为错误消息是不正确的。github上的相关代码源代码
我对守则的解释:如果
timeout
是0
我们跳过优美的关闭代码,直接进入强制关闭。检查的唯一条件_sender
在登录之前,请确认它是否存在,以及它是否为\u alive()。当然是存在的,它是活着的,因为它从来没有被告知关闭尚未。它从不检查是否有任何事情可以或不能完成,如果
timeout
是0
. 所以在这种情况下,日志记录是不正确的。在这种情况下
timeout > 0
那么日志记录就有意义了。initiate_close()
与join()一样,join()表示只有通过检查才能知道join是否成功is_alive()
以后再说。如果在那次尝试后它还活着join()
然后它将强制关闭它,请求无法在超时时间内完成。