当Kafka经纪人倒下又回来时,Kafka制作人的数据丢失

zmeyuzjn  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(443)

每当一个Kafka经纪人倒下并重新加入时,我就会面临一些数据丢失的问题。我猜每当代理加入集群时,就会触发一个重新平衡,在这一点上,我观察到我的kafka producer中有一些错误。
生产者写入一个带有40个分区的kafka主题,下面是每当触发重新平衡时我看到的日志序列。

[WARN ] 2019-06-05 20:39:08 WARN  Sender:521 - [Producer clientId=producer-1] Got error produce response with correlation id 133054 on topic-partition test_ve-17, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION
...
...
[WARN ] 2019-06-05 20:39:31 WARN  Sender:521 - [Producer clientId=producer-1] Got error produce response with correlation id 133082 on topic-partition test_ve-12, retrying (1 attempts left). Error: NOT_ENOUGH_REPLICAS
...
...
[ERROR] 2019-06-05 20:39:43 ERROR GlobalsKafkaProducer:297 - org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
...
...
[WARN ] 2019-06-05 20:39:48 WARN  Sender:521 - [Producer clientId=producer-1] Got error produce response with correlation id 133094 on topic-partition test_ve-22, retrying (1 attempts left). Error: NOT_ENOUGH_REPLICAS
[ERROR] 2019-06-05 20:39:53 ERROR Sender:604 - [Producer clientId=producer-1] The broker returned org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number for topic-partition test_ve-37 at offset -1. This indicates data loss on the broker, and should be investigated.
[INFO ] 2019-06-05 20:39:53 INFO  TransactionManager:372 - [Producer clientId=producer-1] ProducerId set to -1 with epoch -1
[ERROR] 2019-06-05 20:39:53 ERROR GlobalsKafkaProducer:297 - org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number
...
...
RROR] 2019-06-05 20:39:53 ERROR GlobalsKafkaProducer:297 - org.apache.kafka.common.errors.OutOfOrderSequenceException: Attempted to retry sending a batch but the producer id changed from 417002 to 418001 in the mean time. This batch will be dropped.

我们有的Kafka配置是

acks = all
min.insync.replicas=2
unclean.leader.election.enable=false
linger.ms=250
retries = 3

我在生成每3000条记录后调用flush()。有什么我做错了的吗,请指点一下?

bvjveswy

bvjveswy1#

让我假设一些事情,您有3个kafka代理节点,所有主题的复制因子也是3。你不能随意创造主题。
正如你所说:

acks = all
min.insync.replicas=2
unclean.leader.election.enable=false

在这种情况下,如果同步的两个复制副本都停止,您肯定会删除数据。因为最后一个剩余的复制副本没有资格选择作为集群的领导者,因为 unclean.leader.election.enable=false 并且没有领导者接收发送请求。从你开始 linger.ms= 250 其中一个insync副本在短时间内恢复为活动副本,并再次被选为主题领导者,您将避免数据丢失。但警告是 linger.ms 与一起工作 batch.size . 如果你把 batch.size 要发送的消息数达到批量大小生产者可能不会等到达到linger.ms设置。
所以我建议的一个明确的改变是增加 retries . 检查参数的配置 request.timeout.ms . 找出你的平均时间所采取的经纪人回来后关闭。如果有因果关系,你的重试应该包括经纪人活过来所花的时间。如果所有其他折衷措施都到位以减少数据丢失的机会,那么这肯定会帮助您避免数据丢失。

相关问题