一种实现方法是设置properties参数max.in.flight.requests.per.connection = 1.但我想知道在Kafka中是否有一种甚至是直接的或替代的同步发送信息的方式,类似于producer.syncSend(...)。
max.in.flight.requests.per.connection = 1
producer.syncSend(...)
huus2vyu1#
生产者API从send返回一个Future。您可以调用Future#get来阻塞,直到发送完成。请参见此example from the Javadocs:如果你想模拟一个简单的阻塞调用,你可以立即调用get()方法:
send
Future
Future#get
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get();
wn9m85ua2#
正如Thilo所建议的,您可以调用Future#get来阻塞,直到发送完成,但是您可能会遇到一些性能问题,因为当生产者队列有batch.size元素时,当大小为buffer.memory的缓冲区已满或max.block.ms毫秒后,生产者开始发送。如果你的Kafka推送线程数量有限,那么你每次都要等待max.block.ms,所以在某些情况下,你会更喜欢使用:
batch.size
buffer.memory
max.block.ms
// send message to producer queue Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message)); // flush producer queue to spare queuing time producer.flush(); // throw error when kafka is unreachable future.get(10, TimeUnit.SECONDS);
58wvjzkj3#
Thilo提出的答案是可行的。一般来说,你关于使用max.in.flight.requests.per.connection = 1的建议用于启用仍然重试,但不会丢失消息排序。它不用于同步生成器。
pieyvz9o4#
从我的Kafka历险记:-)只有当您有一个Producer线程并设置max.in.flight.requests.per.connection = 1(或retries的旋转,即retries = 0或两者)时,消息生成的顺序才能得到保证。如果要扩展到多个Producer,则必须“确保”将存储到同一分区的消息将由同一Producer示例生成。
max.in.flight.requests.per.connection
retries
gkn4icbw5#
当max.in.flight.requests.per.connection = 1时,这仅仅意味着消息的排序在一个分区内得到保证,这与同步无关。对于同步发送,请确保在未来阻塞一个好的超时。
from kafka import KafkaProducer from kafka.errors import KafkaError #by default ack = 1, if ack = 'all' --> waits for acks from replicas producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all') key = b'key' value = b'value' future = producer.send("my-topic", key=key, value=value) # block on this future for sync sends try: record_metadata = future.get(timeout=10) except KafkaError: log.exception() pass print (record_metadata.topic) print (record_metadata.partition) print (record_metadata.offset) producer.flush() producer.close()
aelbi1ox6#
如果您不需要企业解决方案,请参阅:https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1
6条答案
按热度按时间huus2vyu1#
生产者API从
send
返回一个Future
。您可以调用Future#get
来阻塞,直到发送完成。请参见此example from the Javadocs:
如果你想模拟一个简单的阻塞调用,你可以立即调用get()方法:
wn9m85ua2#
正如Thilo所建议的,您可以调用
Future#get
来阻塞,直到发送完成,但是您可能会遇到一些性能问题,因为当生产者队列有batch.size
元素时,当大小为buffer.memory
的缓冲区已满或max.block.ms
毫秒后,生产者开始发送。如果你的Kafka推送线程数量有限,那么你每次都要等待
max.block.ms
,所以在某些情况下,你会更喜欢使用:58wvjzkj3#
Thilo提出的答案是可行的。一般来说,你关于使用max.in.flight.requests.per.connection = 1的建议用于启用仍然重试,但不会丢失消息排序。它不用于同步生成器。
pieyvz9o4#
从我的Kafka历险记:-)只有当您有一个Producer线程并设置
max.in.flight.requests.per.connection
= 1(或retries
的旋转,即retries
= 0或两者)时,消息生成的顺序才能得到保证。如果要扩展到多个Producer,则必须“确保”将存储到同一分区的消息将由同一Producer示例生成。
gkn4icbw5#
当max.in.flight.requests.per.connection = 1时,这仅仅意味着消息的排序在一个分区内得到保证,这与同步无关。
对于同步发送,请确保在未来阻塞一个好的超时。
aelbi1ox6#
如果您不需要企业解决方案,请参阅:https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1