kafka分区中消息的不均匀分布

w8ntj3qf  于 2021-06-08  发布在  Kafka
关注(0)|答案(7)|浏览(1333)

我有一个主题,有10个分区,1个用户组,4个用户,工作区大小是3。
我可以看到分区中的消息分布不均匀,一个分区有如此多的数据,而另一个分区是免费的。
如何使我的生产者将负载均匀地分配到所有分区中,以便所有分区都得到正确利用?

brgchamk

brgchamk1#

在我的例子中,我使用了默认的分区器,但是一个分区中的记录仍然比其他分区中的记录多得多。问题是我意外地有许多记录使用相同的密钥。检查你的钥匙!

5lwkijsr

5lwkijsr2#

您可以使用producer记录的关键参数。对于一个特定的键,数据总是进入同一个分区,我不知道你的生产者记录的结构,但是正如你说的,你有10个分区,那么你可以简单地使用n%10作为你的生产者记录键。如果n是0到9,那么记录0的密钥将是0,然后kafka将生成一个散列密钥并将其放入某个分区,比如分区0,记录1的密钥将是1,然后它将进入第一个分区,依此类推。这样您就可以在producer记录上应用循环,您的密钥将独立于记录中的字段,因此您可以将变量n和密钥设置为n%10。
或者可以在producer记录中指定分区。因此,要么使用producer记录的key或partition字段。

0s0u357o

0s0u357o3#

由于我无法与浮士德解决这个问题,我使用的方法是自己实现“循环”分发。
我反复查看我的记录以生成并执行以下操作,例如:

for index, message in enumerate(messages):
    topic.send(message, partition=index % num_partitions)

i、 e.将我的索引绑定到我拥有的分区范围内。
仍然可能存在不均匀性-考虑您重复运行此操作,但您的记录数少于您的记录数 num_partitions -然后您的第一个分区将继续获得消息的主要份额。可以通过添加随机偏移量来避免此问题:

import random
initial_partition = random.randrange(0, num_partitions)
for index, message in enumerate(messages):
    topic.send(message, partition=(initial_partition + index) % num_partitions)
pokxtpni

pokxtpni4#

如果您已经从记录中定义了partitioner,比如在kafka中,key是string,value是student pojo。
在student pojo中,假设基于student country字段,我想在一个特定的分区中。假设一个主题有10个分区,例如,在值上,“印度”是一个国家,基于“印度”,我们得到了分区号5。
每当国家是“印度”时,Kafka就会分配5号分区,并且该记录总是转到5号分区(如果分区没有改变的话)。
假设在您的管道中有许多记录即将到来,并且有一个国家“印度”,所有这些记录都将转到第5分区,您将看到Kafka分区中的不均匀分布。

x4shl7ld

x4shl7ld5#

根据defaultpartitioner类本身中的javadoc注解,默认的分区策略是:
如果在记录中指定了分区,请使用它。
如果未指定分区,但存在密钥,请根据密钥的哈希值选择分区。
如果没有分区或密钥,则以循环方式选择分区。
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/defaultpartitioner.java
因此,这里有两个可能导致分布不均匀的原因,具体取决于您是否在生成消息时指定了密钥:
如果您指定一个键,并且使用defaultpartitioner得到的分布不均匀,最明显的解释是您多次指定同一个键。
如果未指定键并使用defaultpartitioner,则可能会发生不明显的行为。根据上面的内容,您可能希望消息的循环分发,但情况未必如此。0.8.0中引入的优化可能会导致使用相同的分区。有关更详细的说明,请查看此链接:https://cwiki.apache.org/confluence/display/kafka/faq#faq-如果没有指定分区键,为什么数据不会在分区中分发。

qvtsj1bj

qvtsj1bj6#

不必使用默认的分区器类,您可以为生产者分配一个分区号,这样消息就可以直接到达指定的分区,

ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, partitionNumber,key, value);
disho6za

disho6za7#

似乎你的问题是不均衡的信息消费,而不是不均衡的信息生产Kafka主题。换句话说,读取线程的数量与拥有的分区数量不匹配(尽管它们不需要匹配1:1,但每个使用者线程读取的分区数量必须相同)。
有关详细信息,请参阅简短的说明。

相关问题