横向缩放spring kafka消费者应用程序

nx7onnlm  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(363)

我想知道什么是一个好的方法来配置分区的数量相对于最大数量的水平扩展的示例。
假设我有一个有6个分区的主题。
我有一个应用程序使用 ConcurrentKafkaListenerContainerFactorysetConcurrency 第6页。那就意味着我有6个 KafkaMessageListenerContainer 每一个都使用一个线程,并且消耗来自所有分区的消息。
如果以上是正确的,那么我想知道如果我通过添加另一个示例水平扩展应用程序会发生什么?如果新示例具有相同的并发配置6,当然还有相同的使用者组,我相信第二个示例不会使用任何消息。因为不会发生重新平衡,因为每个现有的使用者将有一个分配给它的分区。
但是如果我们回到第一个例子,有6个分区,其中一个示例的并发性为3,那么每个使用者线程呢/ KafkaMessageListenerContainer 将分配2个分区。如果我们扩展这个应用程序(相同的用户组id和3的并发性),我相信会发生一个重新平衡,两个示例将分别使用3个分区。
这些假设正确吗?如果不正确,你应该如何处理这种情况?

dsekswqp

dsekswqp1#

通常,您的假设对于默认行为是正确的,默认行为基于:

/**
 * <p>The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and
 * <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>,
 * <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
 *
 * <p>The assignment will be:
 * <ul>
 * <li><code>C0: [t0p0, t0p1, t1p0, t1p1]</code></li>
 * <li><code>C1: [t0p2, t1p2]</code></li>
 * </ul>
 *
 * Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
 * For the above example, after one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
 * for example <code>C0</code> -&gt; <code>C3</code> <code>C1</code> -&gt; <code>C2</code>.
 *
 * <p>The assignment could be completely shuffled to:
 * <ul>
 * <li><code>C3 (was C0): [t0p2, t1p2] (before was [t0p0, t0p1, t1p0, t1p1])</code>
 * <li><code>C2 (was C1): [t0p0, t0p1, t1p0, t1p1] (before was [t0p2, t1p2])</code>
 * </ul>
 *
 * The assignment change was caused by the change of <code>member.id</code> relative order, and
 * can be avoided by setting the group.instance.id.
 * Consumers will have individual instance ids <code>I1</code>, <code>I2</code>. As long as
 * 1. Number of members remain the same across generation
 * 2. Static members' identities persist across generation
 * 3. Subscription pattern doesn't change for any member
 *
 * <p>The assignment will always be:
 * <ul>
 * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
 * <li><code>I1: [t0p2, t1p2]</code>
 * </ul>
 */
public class RangeAssignor extends AbstractPartitionAssignor {

但是你可以插入任何 ConsumerPartitionAssignor 通过 partition.assignment.strategy 消费者财产:https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
另请参见 ConsumerPartitionAssignor javadocs获取更多信息及其实现,以便为您的用例做出选择。

相关问题