如何在Kafka中同步传递信息?

kadbb459  于 2023-01-01  发布在  Apache
关注(0)|答案(6)|浏览(155)

一种实现方法是设置properties参数
max.in.flight.requests.per.connection = 1.
但我想知道在Kafka中是否有一种甚至是直接的或替代的同步发送信息的方式,类似于producer.syncSend(...)

huus2vyu

huus2vyu1#

生产者API从send返回一个Future。您可以调用Future#get来阻塞,直到发送完成。
请参见此example from the Javadocs
如果你想模拟一个简单的阻塞调用,你可以立即调用get()方法:

byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = 
     new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();
wn9m85ua

wn9m85ua2#

正如Thilo所建议的,您可以调用Future#get来阻塞,直到发送完成,但是您可能会遇到一些性能问题,因为当生产者队列有batch.size元素时,当大小为buffer.memory的缓冲区已满或max.block.ms毫秒后,生产者开始发送。
如果你的Kafka推送线程数量有限,那么你每次都要等待max.block.ms,所以在某些情况下,你会更喜欢使用:

// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);
58wvjzkj

58wvjzkj3#

Thilo提出的答案是可行的。一般来说,你关于使用max.in.flight.requests.per.connection = 1的建议用于启用仍然重试,但不会丢失消息排序。它不用于同步生成器。

pieyvz9o

pieyvz9o4#

从我的Kafka历险记:-)只有当您有一个Producer线程并设置max.in.flight.requests.per.connection = 1(或retries的旋转,即retries = 0或两者)时,消息生成的顺序才能得到保证。
如果要扩展到多个Producer,则必须“确保”将存储到同一分区的消息将由同一Producer示例生成。

gkn4icbw

gkn4icbw5#

当max.in.flight.requests.per.connection = 1时,这仅仅意味着消息的排序在一个分区内得到保证,这与同步无关。
对于同步发送,请确保在未来阻塞一个好的超时。

from kafka import KafkaProducer
from kafka.errors import KafkaError

#by default ack = 1, if ack = 'all' --> waits for acks from replicas 
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')

key = b'key'
value = b'value'

future = producer.send("my-topic", key=key, value=value)

# block on this future for sync sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    log.exception()
    pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

producer.flush()
producer.close()

相关问题