kafka product.send从不发送消息

k0pti3hp  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(546)

我使用kafka2.12和kafkapython模块作为kafka客户端。我试图测试一个简单的生产者:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))

当这个过程被示例化时,消费者永远不会收到消息
如果刷新producer并更改linger_ms param(使其同步),则消息将由使用者发送和读取:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))
    producer.flush()

在以前的kafka版本中,有param queue.buffering.max.ms来指定生产者在发送队列中的消息之前要等待多长时间,但在最新版本(kafka python 1.3.3)中没有该参数。如何在较新的kafka版本中指定它以保持通信异步?
谢谢!

tpxzln5u

tpxzln5u1#

正如您所观察到的,消息排队等待异步发送,并且不能保证它会立即发送。因此,如果要强制将消息发送到代理,则需要显式调用 producer.flush() 它将阻止直到消息被发送(尽管 flush() 不保证确认)。
注:因为 flush() 是一种阻塞调用,通常只建议用于低吞吐量系统或应用程序关闭时使用。对于大容量系统,同步发送与异步发送的吞吐量冲击通常是不可行的。我的经验是,除了需要立即执行的测试套件/开发之外,生产者通常发送的速度非常快,而不需要调用flush()。
我很肯定 queue.buffering.max.ms 被替换为 linger_ms : https://kafka-python.readthedocs.io/en/master/apidoc/kafkaproducer.html#kafka.kafkaproducer
所以您已经在工作示例中使用了该参数。

abithluo

abithluo2#

producer = KafkaProducer(bootstrap_servers='kafkaIp:kafkaPort')
producer.send("topic_name", b'Your string here')
producer.flush()

使用send和flush。

相关问题