我一直在尝试配置一个kafka代理,一个主题,一个生产者,一个消费者。当生产者生产时,如果经纪人倒下,数据就会丢失,例如:
In Buffer:
Datum 1 - published
Datum 2 - published
.
. ---->(Broker goes down for a while and reconnects...)
.
Datum 4 - published
Datum 5 - published
为生产者配置的属性包括:
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=2
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.type=sync
buffer.size=102400
reconnect.interval=30000
request.required.acks=1
数据大小小于配置的缓冲区大小。。帮我知道我哪里出错了。。。!
1条答案
按热度按时间yyyllmsg1#
不知道你到底在做什么。我假设您在代理关闭时尝试写入kafka的消息没有被kafka确认。如果消息未确认,则表示消息未写入Kafka,生产者需要重新尝试写入消息。
最简单的方法是设置配置参数
retries
以及retry.backoff.ms
相应地。在应用程序级别,还可以注册
Callback
在send(..., Callback)
获得成功/失败的信息。如果失败,您可以通过调用send()
再一次。