kafka使用者启动后未接收从不同线程发送的消息

sg24os4d  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(202)

因此,我有一个集成测试,我正在尝试运行的地方,我想看看我是否发送一个消息,它是由消费者拿起。逻辑是我启动一个新线程,在5秒后发送消息,然后启动消费者。因为它是非阻塞的,所以消息应该在消费者启动之后发送。但是消费者没有收到任何消息,它只是继续运行,这很好,但它应该打印出我发送的消息。
这是我的密码

import time
import threading
from kafka import KafkaConsumer
from kafka import KafkaProducer

def send_message(message, topic="facts", sleep=0.5):
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092']
    )

    time.sleep(sleep)
    message = bytes(message, encoding="utf8")
    producer.send(topic, key=b'PayloadKey', value=message)

def consume():
    consumer = KafkaConsumer(
        "facts",
        enable_auto_commit=False,
        group_id='lelu',
        bootstrap_servers=['localhost:9092']
    )

    for message in consumer:
        consumer.commit()
        yield message

messages = consume()

thread = threading.Thread(target=send_message, args=('hello', 5))
thread.daemon = True
thread.start()

for msg in messages:
    print(msg)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题