sync kafka producer send仍有0秒超时?

f4t66c6m  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(1049)

我在python脚本中有以下代码

from kafka import KafkaProducer

kafka_producer = KafkaProducer(....)

kafka_producer.send(topic, value=message)
kafka_producer.flush()

logger.info('Done!') # this message is displayed

不过,我还是看到了下面的消息。看来消息发送成功了。为什么即使在 flush() 叫什么名字?
info:root:完成!
info:kafka.producer.kafka:关闭Kafka制作程序,超时0秒。
info:kafka.producer.kafka:继续强制关闭生产者,因为挂起的请求无法在超时0内完成。
info:kafka.conn:<brokerconnection node\u id=2主机=[ipv4('..',9092)]>:正在关闭连接。

vs91vp4v

vs91vp4v1#

在这种情况下,我认为错误消息是不正确的。github上的相关代码源代码

if timeout > 0:
            if invoked_from_callback:
                log.warning("Overriding close timeout %s secs to 0 in order to"
                            " prevent useless blocking due to self-join. This"
                            " means you have incorrectly invoked close with a"
                            " non-zero timeout from the producer call-back.",
                            timeout)
            else:
                # Try to close gracefully.
                if self._sender is not None:
                    self._sender.initiate_close()
                    self._sender.join(timeout)

        if self._sender is not None and self._sender.is_alive():
            log.info("Proceeding to force close the producer since pending"
                     " requests could not be completed within timeout %s.",
                     timeout)
            self._sender.force_close()

我对守则的解释:如果 timeout0 我们跳过优美的关闭代码,直接进入强制关闭。检查的唯一条件 _sender 在登录之前,请确认它是否存在,以及它是否为\u alive()。当然是存在的,它是活着的,因为它从来没有被告知关闭尚未。
它从不检查是否有任何事情可以或不能完成,如果 timeout0 . 所以在这种情况下,日志记录是不正确的。
在这种情况下 timeout > 0 那么日志记录就有意义了。 initiate_close() 与join()一样,join()表示只有通过检查才能知道join是否成功 is_alive() 以后再说。如果在那次尝试后它还活着 join() 然后它将强制关闭它,请求无法在超时时间内完成。

相关问题