kafka滚动重启:数据丢失

y3bcpkx1  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(736)

作为我们当前kafka集群的一部分,高可用性测试(ha)正在进行中。目标是,当生产者作业将数据推送到某个主题的特定分区时,kafka集群中的所有代理都会依次重新启动(停止第一个代理-重新启动它,在第一个代理出现后,对第二个代理执行相同的步骤,依此类推)。在测试进行期间,制作人的工作将在大约30分钟内推送700万张唱片。在作业结束时,发现大约有1000条记录丢失。
下面是我们Kafka集群的细节:(Kafka2.10-0.8.2.0)
-3个kafka代理,每个代理有2个100gb挂载
创建主题时使用:-复制因子3-min.insync.replica=2
服务器属性:

broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/drive1,/drive2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=10000
log.retention.hours=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=1800000
log.cleaner.enable=false
zookeeper.connect=ZK1:2181,ZK2:2181,ZK3:2181
zookeeper.connection.timeout.ms=10000
advertised.host.name=XXXX
auto.leader.rebalance.enable=true
auto.create.topics.enable=false
queued.max.requests=500
delete.topic.enable=true
controlled.shutdown.enable=true
unclean.leader.election=false
num.replica.fetchers=4
controller.message.queue.size=10

producer.properties(具有新producer api的aync producer)

bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
acks=all
buffer.memory=33554432
compression.type=snappy
batch.size=32768
linger.ms=5
max.request.size=1048576
block.on.buffer.full=true
reconnect.backoff.ms=10
retry.backoff.ms=100
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

是否有人可以共享有关kafka集群和ha的任何信息,以确保在启动kafka代理时不会丢失数据?
还有,这是我的制片人代码。这是一个火和忘记类型的生产者。到目前为止,我们还没有明确地处理故障。几乎数百万张唱片都能正常工作。我看到的问题,只有当Kafka经纪人重新启动如上所述。

public void sendMessage(List<byte[]> messages, String destination, Integer parition, String kafkaDBKey) {

    for(byte[] message : messages) {

        producer.send(new ProducerRecord<byte[], byte[]>(destination, parition, kafkaDBKey.getBytes(), message));

    }

}
wr98u20j

wr98u20j1#

通过在生产者端将默认重试值从0增加到4000,我们能够成功地发送数据而不会丢失数据。 retries=4000 由于此设置,有可能发送两次相同的消息,并且消息在使用者接收到消息时顺序不一致(第二个消息可能在第一个消息之前到达)。但对于我们目前的问题,这不是一个问题,而是在消费者方面处理的,以确保一切正常。

相关问题