如何在Kafka中发送同步消息?实现它的一种方法是设置properties参数 max.in.flight.requests.per.connection = 1 .但我想知道,在Kafka中,是否有一种甚至直接或替代的同步信息发送方式(类似producer.syncsend(…)等)。
max.in.flight.requests.per.connection = 1
p1tboqfb1#
当max.in.flight.requests.per.connection=1时,这意味着在分区内保证消息的顺序,而与同步无关。python代码。对于同步发送,请确保使用良好的超时阻止未来。
from kafka import KafkaProducer from kafka.errors import KafkaError # by default ack = 1, if ack = 'all' --> waits for acks from replicas producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all') key = b'key' value = b'value' future = producer.send("my-topic", key=key, value=value) # block on this future for sync sends try: record_metadata = future.get(timeout=10) except KafkaError: log.exception() pass print (record_metadata.topic) print (record_metadata.partition) print (record_metadata.offset) producer.flush() producer.close()
v64noz0r2#
正如蒂洛所建议的,你可以打电话给我 Future#get 阻止直到发送完成。但是,您可能会遇到一些性能问题,因为生产者在生产者队列发生故障时开始发送 batch.size 元素,当缓冲区的大小 buffer.memory 已满或已满 max.block.ms 毫秒。如果你有一个线程推到Kafka数量有限,你将不得不等待 max.block.ms 每次你的信息被发送。因此,在某些情况下,您更喜欢使用:
Future#get
batch.size
buffer.memory
max.block.ms
// send message to producer queue Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message)); // flush producer queue to spare queuing time producer.flush(); // throw error when kafka is unreachable future.get(10, TimeUnit.SECONDS);
wqsoz72f3#
producer api返回 Future 从 send . 你可以打电话 Future#get 阻止直到发送完成。参见javadocs中的示例:如果要模拟简单的阻塞调用,可以立即调用get()方法:
Future
send
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get();
eh57zj3b4#
蒂洛提出的答案是正确的。通常,您关于使用max.in.flight.requests.per.connection=1的建议用于启用仍然重试,但不会丢失消息顺序。它不是用来有一个同步生产者。
ut6juiuv5#
从我的Kafka历险记:-)只有当您有一个生产者线程和设置时,才能保证消息生成的顺序 max.in.flight.requests.per.connection =1(或旋转 retries ,即。 retries =0或两者)。如果您需要扩展到多个生产者,那么您必须“确保”将存储到同一分区的消息将由同一生产者示例生成。
max.in.flight.requests.per.connection
retries
5条答案
按热度按时间p1tboqfb1#
当max.in.flight.requests.per.connection=1时,这意味着在分区内保证消息的顺序,而与同步无关。
python代码。对于同步发送,请确保使用良好的超时阻止未来。
v64noz0r2#
正如蒂洛所建议的,你可以打电话给我
Future#get
阻止直到发送完成。但是,您可能会遇到一些性能问题,因为生产者在生产者队列发生故障时开始发送batch.size
元素,当缓冲区的大小buffer.memory
已满或已满max.block.ms
毫秒。如果你有一个线程推到Kafka数量有限,你将不得不等待
max.block.ms
每次你的信息被发送。因此,在某些情况下,您更喜欢使用:wqsoz72f3#
producer api返回
Future
从send
. 你可以打电话Future#get
阻止直到发送完成。参见javadocs中的示例:
如果要模拟简单的阻塞调用,可以立即调用get()方法:
eh57zj3b4#
蒂洛提出的答案是正确的。通常,您关于使用max.in.flight.requests.per.connection=1的建议用于启用仍然重试,但不会丢失消息顺序。它不是用来有一个同步生产者。
ut6juiuv5#
从我的Kafka历险记:-)只有当您有一个生产者线程和设置时,才能保证消息生成的顺序
max.in.flight.requests.per.connection
=1(或旋转retries
,即。retries
=0或两者)。如果您需要扩展到多个生产者,那么您必须“确保”将存储到同一分区的消息将由同一生产者示例生成。