ACK机制
ACK 是acknowledge
的缩写,意思就是确认,这里的指的是producer 需要接受到来自Broker 的ack 信息,其实更准确的说法是接收到来自Leader partition 的ack
信息 ,需要注意的是ACK机制的开启,会直接影响Kafka集群的吞吐量和消息可靠性,而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡,所以要根据自己的业务特点进行取舍。
ACK 是kafka 保证数据数据不丢失的重要手段。但是可能导致数据重复,当然这取决于你的配置。在开始之前我们先了解另外一个概念IRS
ISR机制
背景
- 每个topic的分区可以设置若干个副本(Leader、Follower),其中Follower同步Leader的数据形成副本。为了保证生产者发送的数据,能可靠的发送到指定的 topic,topic 的每个分区收到生产者发送的数据后,都需要向生产者发送应答 ack(acknowledgement),如果生产者收到ack,就会进行下一轮的发送,否则重新发送数据。
- 在Kafka中必须要所有Follower都完成同步时才会发送ack,这样的好处是当重新选举Leader时,只需要有n+1个副本即可容忍n台节点故障,但缺点也很明显就是延迟很高,因为必须等待所有follower都完成同步才行。
- 但这样又会带来新的问题,如果其中有一个follower由于网络延迟或者某种原因,迟迟不能完成数据同步,Leader就会一直阻塞等待直到该follower完成同步,这非常的影响性能
- 于是Kafka引入了一个新的机制:ISR(In-Sync Replica),副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性,所以说kafka 的数据同步方式不是完全同步的,也不是完全异步的,而是这种基于ISR的同步机制
ISR
和Leader保持同步的follower集合,Leader不需要等待所有Follower都完成同步,只要在ISR中的Follower完成数据同步就可以发送ack 给生产者
如果ISR集合里的follower 的延迟时间超过配置的参数(replica.lag.time.max.ms)就会从ISR 内剔除,只需要保证Leader 数据能够发送到这些ISR集合里的follower即可。一旦Leader发生故障,就会从ISR集合里选举一个Follower作为新的Leader。
AR
所有的副本(replicas)统称为Assigned Replicas,即AR。
ISR是AR中的一个子集,每个Partition都会有一个ISR,而且是由leader动态维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度)
任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
特性
- 追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,就存在着不可能与 Leader 实时同步的风险。
- leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-syncReplica),每个Partition都会有一个ISR,而且是由leader动态维护
- Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本
- 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
- 当ISR中所有Replica都向Leader发送ACK时,leader才commit
判断标准(replica.lag.time.max.ms)
- 这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。
- 这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。
- 这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
- Follower 副本唯一的工作就是不断地从 Leader 副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
- 倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的
在版本0.90以前还有一个参数,根据follower和leader之间相差的数据量来控制是否在ISR集合内,大于配置的阈值则踢出ISR集合。但是这个有一个很明显的问题,那就是如果流量增加了,可能就是使得正常同步的副本被剔除ISR了
Kafka 的ACK
其实到这里大家都应该知道了,Kafka 的Ack 是基于kafka 的IRS 而言的,其实可以看出来,这就是kafka 在吞吐量和消息可靠性之间做的取舍,通过IRS 来保证可靠性的同时最大程度的保证了吞吐。对于某些不太重要的数据,我们能容忍少量数据的丢失,不需要等待ISR内的Follower都完成同步再发送ack,这样的话可以牺牲可靠性来换来数据吞吐量的提升
ACK 是一个Producer端的配置参数,你可以在创建Producer的时候传进去
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", CustomerSerializer.class.getName());
producer = new KafkaProducer<String, Customer>(props);
acks = 0(至多一次)
- 生产者只负责发消息,不管Leader 和Follower 是否完成落盘就会发送ack 。这样能够最大降低延迟,但当Leader还未落盘时发生故障就会造成数据丢失;
- 这里其实就是相当于是异步的,不需要leader给予回复,producer立即返回,
acks = 1(至多一次)
- Leader将数据落盘后,不管Follower 是否落盘就会发送ack 。这样可以保证Leader节点内有一份数据,但当Follower还未同步时Leader发生故障就会造成数据丢失;
acks = -1(all 至少一次)
- 生产者等待Leader和ISR集合内的所有Follower 都完成同步才会发送ack 。
- 但当Follower同步完之后,Leader发送ack之前,Leader发生故障时,此时会重新从ISR内选举一个新的Leader,此时由于生产者没收到ack,于是生产者会重新发消息给新的Leader,此时就会造成数据重复;