我正在测试Kafka(Apache;Kafka2.12-1.1.0)。我所期望的是,当一个节点崩溃时,一个主题的isr应该增加它自己(即复制到可用节点)。我花了4天时间在谷歌上搜索可能的解决方案,但没有用。
使用docker(wurstmeister)在server.properties中更新了下面的内容,创建了3个代理,3个Zookeeper(1node=1broker+1 zookeeper)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
default.replication.factor=3
启动所有经纪人;等了一会儿;已创建主题,其中replication3,min in sync replication 2
bin/kafka-topics.sh --create --zookeeper 172.31.31.142:2181,172.31.26.102:2181,172.31.17.252:2181 --config 'min.insync.replicas=2' --replication-factor 3 --partitions 1 --topic test2
当我描述这个主题时,我看到了下面的数据
bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2 PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
Topic: test2 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
到目前为止还不错,现在我开始考虑;接着是制片人。当消费开足马力时,我杀了经纪人。现在,当我描述相同的主题时,我看到了下面的([edit-1])
bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2 PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
Topic: test2 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1
bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic __consumer_offsets
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3
Topic: __consumer_offsets Partition: 1 Leader: 3 Replicas: 2,3,1 Isr: 1,3
.. .. ..
[编辑结束-1]
我让Kafka的制作者、消费者继续几分钟;问题1:为什么当代理2关闭时副本仍然显示2?
现在我又向集群添加了2个代理。在生产者、消费者继续观察isr的同时;isr副本的数量不会增加,它们只会保持在3,1。问题2:为什么isr没有增加,即使有两个以上的经纪人?。
然后我停止了生产者,消费者;等了几分钟;再次运行descripe命令--仍然是相同的结果。isr何时扩展其复制?。如果还有2个节点可用,为什么isr没有复制?
我对我的制作人的评价如下
props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", new Integer(args[2]));// 60384
props.put("linger.ms", new Integer(args[3]));// 1
props.put("buffer.memory", args[4]);// 33554432
props.put("bootstrap.servers", args[6]);// host:port,host:port,host:port etc
props.put("max.request.size", "10485760");// 1048576
和消费者如下
props.put("group.id", "testgroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", args[2]);// 1000
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("max.partition.fetch.bytes", args[3]);// 52428800
props.put("fetch.max.bytes", args[4]);// 1048576
props.put("fetch.message.max.bytes", args[5]);// 1048576
props.put("bootstrap.servers", args[6]);
props.put("max.poll.records", args[7]);
props.put("max.poll.interval.ms", "30000");
props.put("auto.offset.reset", "latest");
在另一个实验中,当我删除另一个代理时,我开始看到同步复制的总数小于所需的最小值的错误。令人惊讶的是,在这个州,生产商没有被封锁;但是我在broker server.log上看到了错误。没有新邮件进入队列。问题4:制片人不应该被封杀吗?而不是在代理端抛出错误?还是我的理解错了?
需要帮忙吗?
2条答案
按热度按时间y1aodyip1#
重述副本的含义:所有分区副本都是副本,甚至是前导副本;换句话说,两个复制品意味着你有一个领导者和一个追随者。
当您描述主题时,对于您唯一的分区,您会看到:“replicas:2,3,1 isr:3,1”,这意味着在创建主题时,将leader分区分配给broker 2(replicas列表中的第一个),并将follower分配给broker 3和1;现在代理2是该分区的“首选领导者”。
此任务本身不会更改(领导者可能会更改,但“首选领导者”不会更改),因此您的追随者不会移动到其他代理,只能将领导者角色分配给另一个同步副本(有一个属性auto.leader.rebalance.enable,如果该属性设置为true,将允许领导者角色在再次启动时返回到首选领导者,否则该领导者角色将由新当选的领导者保留。。。
下一次尝试杀死领导人经纪人,你会看到一个新的领导人将被选举和使用,但“副本:2,3,1”将保留。
如果您将复制因子设置为3 acks=all和min.insync.replicas=2,则只要有2个复制副本确认写入(leader和一个follower),就可以生成,但如果无法维护3个isr,则可以在代理上获取日志。。。
希望这有帮助。。。
2nbm6dog2#
如果我理解正确,Kafka不会自动重新平衡时,你添加经纪人。除非使用重新分区工具,否则不会重新分配已关闭的复制副本
现在还不清楚你的环境之间有什么区别,但如果一个经纪人仍然被列为领导者,那么看起来你并没有真的杀了他。
如果您有两个代理,minisr为2,那么,是的,您将看到错误。但是,生产者应该仍然能够联系到至少一个代理,因此我认为除非将ack值设置为all,否则它不会被完全阻止。代理端的错误更多地与放置副本有关