from kafka import KafkaProducer, errors, admin, KafkaConsumer
SERVERS = ['localhost:9092']
TEST_TOPIC = 'test-topic'
DATA = [{'A':'A'}, {'A':'A'}, {'A':'A'}]
class TestKafkaConsumer(unittest.TestCase):
@classmethod
def setUpClass(self):
self._producer = KafkaProducer(bootstrap_servers=SERVERS, value_serializer=lambda x:dumps(x).encode('utf-8'))
def _send_data(self):
for data in DATA:
print(self._producer.send(TEST_TOPIC, value=data))
def test_basic_processing(self):
self._send_data()
received = []
consumer = KafkaConsumer(TEST_TOPIC, bootstrap_servers=SERVERS)
for msg in consumer:
message = json.loads(msg.value.decode('utf-8'))
received.append(message)
if (len(received) >= len(DATA)):
self.assertEqual(received, DATA)
这应该会很快成功,因为它只是以一种非常简单的方式将数据发送给kafka代理。但是,它超时了;消费者从不阅读任何信息。如果我将使用者部分移动到另一个文件并在另一个终端窗口中运行它,消息会立即被“消费”。为什么在这个unittest中unittest不适用于使用者?
1条答案
按热度按时间acruukt91#
你和制作人一起制作唱片,然后你在阅读,这可能是你的问题。当您的消费者启动时,您已经生成了记录,因此,从消费者的Angular 来看,没有新的消息。在生产商开始生产之前,您应该在不同的线程中运行您的消费者。
扬尼克