分区 Partition
分区的意义
提高负载均衡的能力
kafak 通过分区来提高系统的负载均衡能力,主要通过以下两个方面进行保证的
- Kafka 创建Topic 的时候使得分区均匀的分布在各个Broker(集群节点)上
- kafka 在生产者发送消息到kafka 集群的时候,通过一定的负载均衡策略,使得数据均匀的分布在各个分区上
这样通过在两个层面上的保证,从而保证了集群整体的负载均衡
实现系统的高伸缩性(Scalability)
不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变
- kafka 的伸缩性也可以通过Partition来体现,因为我们可以对Topic的分区进行动态扩容,从而提高整个系统的吞吐。
分区策略
- 默认策略为:如果指定了partition就直接发送到该分区
- 如果没有指定分区但是指定了key,就按照key的hash值选择分区
- 如果partition和key都没有指定就使用轮询策略
自定义分区策略
- 具体可以参考前面一节
- 你需要显式地配置生产者端的参数partitioner.class,这个参数该的设定,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close()
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- 这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中
跨地域机房案例
- 对于跨地域机房,我们希望就近发生,即当地的Produce 只需要将数据发送到kafka 集群对应的当地分区即可
- kafka-topics支持在创建topic时指定partition放在那些broker上(北京的partition 放在北京机房的broker上)
- 消费的时候,也可以采取本地化的策略,即本地的程序只消费本地的分区,使用这个方法:consumer.assign()直接消息指定分区
轮询策略
- 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,
- 轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地发送消息。
- 轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一
随机策略
- 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
按消息键保序策略
- Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据
- 一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
- 其实一眼就看到这个分区方法存在的问题,如果分区数目变了就不能保证key 相同的数据放在同一个策略了
存在的问题
- 假如我现在的业务数据定义了三个key,但是这三个key对应的消息生产速率不一致,按照老师上面的示意图展示的是,特定的key只会存储在特定的一个分区中,那岂不是牺牲了拓展性么,如果其中一个key的生产速率非常大,而另外2个key没那么大,就会导致分区数据不均衡
- 是有这样的问题。所以其实在生产环境中用key做逻辑区分并不太常见。如果不同key速率相差很大,可以考虑使用不同的topic
例子
- 有很多时候需要数据有序——全局有序,但是当我们仔细去思考的时候,发现不需要全局有序只需要满足共同特征的消息有序即可,所以我们可以采取将共同特征提升为key 的方式,来保证有序。
- 在同步mysql 数据到kafka 的时候,我们希望数据可以被还原成和mysql 一致的数据,这个时候我们可以使用 tableName+主键,来保证消息的不乱序,从而实现正确还原。
备份 Repcation
为了提高消息的可靠性,Kafka topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个broker失效情况下仍然保证服务可用。
在Kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求。
Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,所有Partition的Replica默认情况会均匀分布到所有Broker上。
副本的工作机制
- 产者总是向领导者副本写消息,消费者总是从领导者副本读消息。也就是说Leader负责读写请求
- 至于追随者副本(follower),它只做一件事:定期向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步,也就是说flower只是数据的一个备份,保证leader被挂掉后顶上来,并不对外提供服务
ISR机制
背景
- 每个topic的分区可以设置若干个副本(Leader、Follower),其中Follower同步Leader的数据形成副本。为了保证生产者发送的数据,能可靠的发送到指定的 topic,topic 的每个分区收到生产者发送的数据后,都需要向生产者发送应答 ack(acknowledgement),如果生产者收到ack,就会进行下一轮的发送,否则重新发送数据。
- 在Kafka中必须要所有Follower都完成同步时才会发送ack,这样的好处是当重新选举Leader时,只需要有n+1个副本即可容忍n台节点故障,但缺点也很明显就是延迟很高,因为必须等待所有follower都完成同步才行。
- 但这样又会带来新的问题,如果其中有一个follower由于网络延迟或者某种原因,迟迟不能完成数据同步,Leader就会一直阻塞等待直到该follower完成同步,这非常的影响性能
- 于是Kafka引入了一个新的机制:ISR(In-Sync Replica),副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性
- 所以说kafka 的数据同步方式不是完全同步的,也不是完全异步的,而是这种基于ISR的同步机制
ISR
和Leader保持同步的follower集合,Leader不需要等待所有Follower都完成同步,只要在ISR中的Follower完成数据同步就可以发送ack 给生产者
如果ISR集合里的follower 的延迟时间超过配置的参数(replica.lag.time.max.ms)就会从ISR 内剔除,只需要保证Leader 数据能够发送到这些ISR集合里的follower即可。一旦Leader发生故障,就会从ISR集合里选举一个Follower作为新的Leader。
AR
所有的副本(replicas)统称为Assigned Replicas,即AR。
ISR是AR中的一个子集,每个Partition都会有一个ISR,而且是由leader动态维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度)
任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
特性
- 追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,就存在着不可能与 Leader 实时同步的风险。
- leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-syncReplica),每个Partition都会有一个ISR,而且是由leader动态维护
- Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本
- 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
- 当ISR中所有Replica都向Leader发送ACK时,leader才commit
判断标准(replica.lag.time.max.ms)
- 这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。
- 这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。
- 这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
- Follower 副本唯一的工作就是不断地从 Leader 副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
- 倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的
在版本0.90以前还有一个参数,根据follower和leader之间相差的数据量来控制是否在ISR集合内,大于配置的阈值则踢出ISR集合。但是这个有一个很明显的问题,那就是如果流量增加了,可能就是使得正常同步的副本被剔除ISR了
Producer配置
request.required.asks=0
- 0 相当于异步的,不需要leader给予回复,producer立即返回,发送就是成功,那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步)
- 1 当leader接收到消息之后发送ack,丢会重发,丢的概率很小
- -1 当所有的follower都同步消息成功后leader 发送ack. 丢失消息可能性比较低
repcation机制的意义
提供数据冗余
- 即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
提供高伸缩性
- 支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
- 其实这个不仅仅依赖于副本机制,还依赖于分区机制(分片)
改善数据局部性
- 允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
- 在改善数据局部性上,Kafka 的追随者副本没有任何作用,它既不能像 MySQL 那样帮助领导者副本“抗读”,也不能实现将某些副本放到离客户端近的地方来改善数据局部性。
副本角色(数据一致性保证)
领导者副本(Leader Replica)
- 领导者副本对外提供服务,这里的对外指的是与客户端程序进行交互
领导者副本的选举
- 当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,
- Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。
- 老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
- Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
- 开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性
- 当ISR中的副本的LEO不一致时,如果此时leader挂掉,选举新的leader时并不是按照LEO的高低进行选举,而是按照ISR中的顺序选举
追随者副本(Follower Replica)
- 追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步
在很多其他系统中追随者副本是可以对外提供服务的,比如 MySQL 的从库是可以处理读操作的
设计理念
- 方便实现“Read-your-writes”。所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。
比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。
如果允许追随者副本对外提供服务,由于副本同步是异步的,因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
- 方便实现单调读(Monotonic Reads),什么是单调读呢,就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。
高可用的实现
- 多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务。
- 实现高可用的另一个手段就是备份机制(Replication)。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。
- 副本的数量是可以配置的,这些副本保存着相同的数据,但却有不同的角色和作用。Kafka 定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)
伸缩性
- 副本机制可以保证数据的持久化或消息不丢失,但没有解决伸缩性的问题
- Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,
- 副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有 1 个领导者副本和 N-1 个追随者副本。
kafka 为什么不设计读写分离呢
- kafka的分区已经让读是从多个broker读从而负载均衡,不是MySQL的主从,压力都在主上
- kafka保存的数据和数据库的性质有实质的区别就是数据具有消费的概念,是流数据,kafka是消息队列,所以消费需要位移,而数据库是实体数据不存在这个概念,如果从kafka的follower读,消费端offset控制更复杂;
- 生产者来说,kafka可以通过配置来控制是否等待follower对消息确认的,如果从上面读,也需要所有的follower都确认了才可以回复生产者,造成性能下降,如果follower出问题了也不好处理
- Redis和MySQL都支持主从读写分离,我个人觉得这和它们的使用场景有关。对于那种读操作很多而写操作相对不频繁的负载类型而言,采用读写分离是非常不错的方案——我们可以添加很多follower横向扩展,提升读操作性能。反观Kafka,**它的主要场景还是在消息引擎而不是以数据存储的方式对外提供读服务,通常涉及频繁地生产消息和消费消息,**这不属于典型的读多写少场景,因此读写分离方案在这个场景下并不太适合。