kafkaconsumer java api subscribe()vs assign()

ibrsph3r  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我是kafkajavaapi的新手,我正在使用来自某个kafka主题的记录。
我知道我可以用这个方法 subscribe() 从主题开始轮询记录。Kafka也提供了方法 assign() 如果我想从主题的选定分区开始轮询记录。
我想知道这是不是两者唯一的区别?

mwg9r5ms

mwg9r5ms1#

subscribe 需要 group.id 因为一个组中的每个使用者将动态地分配给subscribe方法中提供的主题列表的分区,并且每个分区可以由该组中的一个使用者线程使用。这是通过平衡使用者组中所有成员之间的分区来实现的,这样每个分区就只分配给组中的一个使用者 assign 将手动为此使用者分配分区列表。而且此方法不使用使用者的组管理功能(不需要 group.id )
主要区别是 assign(Collection) 会使控制器在动态分区分配和用户组协调上失去控制
使用者还可以使用assign(collection)手动分配特定的分区(类似于旧的“简单”使用者)。在这种情况下,将禁用动态分区分配和使用者组协调。
订阅

public void subscribe(java.util.Collection<java.lang.String> topics)

subscribe方法订阅给定的主题列表以获得动态分配的分区。如果给定的主题列表为空,则将其视为 unsubscribe(). 作为组管理的一部分,使用者将跟踪属于特定组的使用者列表,并在触发以下事件之一时触发重新平衡操作-

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API

分配

public void assign(java.util.Collection<TopicPartition> partitions)

assign方法手动将分区列表分配给这个使用者。如果给定的主题分区列表为空,则将其视为unsubscribe()。
通过此方法手动分配主题不使用使用者的组管理功能。因此,当组成员身份或集群和主题元数据更改时,不会触发重新平衡操作。

相关问题