我使用kafka2.12和kafkapython模块作为kafka客户端。我试图测试一个简单的生产者:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
当这个过程被示例化时,消费者永远不会收到消息
如果刷新producer并更改linger_ms param(使其同步),则消息将由使用者发送和读取:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
在以前的kafka版本中,有param queue.buffering.max.ms来指定生产者在发送队列中的消息之前要等待多长时间,但在最新版本(kafka python 1.3.3)中没有该参数。如何在较新的kafka版本中指定它以保持通信异步?
谢谢!
2条答案
按热度按时间tpxzln5u1#
正如您所观察到的,消息排队等待异步发送,并且不能保证它会立即发送。因此,如果要强制将消息发送到代理,则需要显式调用
producer.flush()
它将阻止直到消息被发送(尽管flush()
不保证确认)。注:因为
flush()
是一种阻塞调用,通常只建议用于低吞吐量系统或应用程序关闭时使用。对于大容量系统,同步发送与异步发送的吞吐量冲击通常是不可行的。我的经验是,除了需要立即执行的测试套件/开发之外,生产者通常发送的速度非常快,而不需要调用flush()。我很肯定
queue.buffering.max.ms
被替换为linger_ms
: https://kafka-python.readthedocs.io/en/master/apidoc/kafkaproducer.html#kafka.kafkaproducer所以您已经在工作示例中使用了该参数。
abithluo2#
使用send和flush。