我是一个新手,正在尝试编写一个涉及到Kafka的单元测试。我有一个带有函数的类,该函数调用另一个函数来初始化Kafka Producer,然后调用Producer.end()。我想在我的单元测试中嘲笑Kafka制作人。
下面是我的代码,我想为Product_Kafka_Message方法编写单元测试。
class KafkaProducerIntface
def __init__(self, topic, .....)
self.bootstrap_server = bootstrap_server
self.producer = None
self.post_topic = topic
....
def produce_kafka_message(self, key, value, headers)
self.__initialize_producer__(retries=3)
future = self.producer.send(self.topic, key=key, value=value, headers=headers)
self.producer.flush()
def __initialize_producer__(self, retries=3)
self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_server, acks='all', retries=retries)
1条答案
按热度按时间f87krz0w1#
您可以对其使用mock lib并模仿
produce_kafka_message
方法:您也可以对KafkaProducer类本身执行相同的操作:
顺便说一句。另外,请检查您是要在发送后运行刷新,还是只运行.poll(0):)