我是Kafka的新人。我的要求是,我有两个分区,例如partition-0和partition-1,我有一个值列表,其中也包含键值。我想根据我的密钥来存储数据,比如key-1会转到分区-0,key-2会转到分区-1。有了旧的api,就有办法实现分区接口,但我如何用新的api实现这一点。谢谢您
yws3nbqq1#
从Kafka2.4.0开始,您可以选择“始终”循环。https://issues.apache.org/jira/browse/kafka-3333
waxmsbnn2#
如果您想要循环行为,只需在向制作者和应用程序写信时不要传递密钥 DefaultPartitioner 我会帮你的。您不需要编写自定义实现。来自javadocs:
DefaultPartitioner
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
kpbpu0083#
与新的生产者,你也可以实现 Partitioner 接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/partitioner.java)实现循环分配。你可以用 DefaultPartitioner 供参考-https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/defaultpartitioner.java
Partitioner
zy1mlcev4#
您可以通过覆盖Kafka生产者的默认分区器,以循环方式生成Kafka。伪实现
class RRPartitioner(): def __init__(): # Using topic metadata get total number of partitions self.total_partitions = client[topic].get_number_partitions() self.part_offset = 0 def partitioner(self, key, msg): if self.part_offset > self.total_partitions: self.part_offset = 0 return self.part_offset else: self.part_offset += 1 return self.part_offset
class RRPartitioner():
def __init__():
# Using topic metadata get total number of partitions
self.total_partitions = client[topic].get_number_partitions()
self.part_offset = 0
def partitioner(self, key, msg):
if self.part_offset > self.total_partitions:
return self.part_offset
else:
self.part_offset += 1
上面的实现是纯粹的循环,如果您希望消息按照键排序并进行循环,则必须在自定义分区器中执行更多操作。
4条答案
按热度按时间yws3nbqq1#
从Kafka2.4.0开始,您可以选择“始终”循环。
https://issues.apache.org/jira/browse/kafka-3333
waxmsbnn2#
如果您想要循环行为,只需在向制作者和应用程序写信时不要传递密钥
DefaultPartitioner
我会帮你的。您不需要编写自定义实现。来自javadocs:kpbpu0083#
与新的生产者,你也可以实现
Partitioner
接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/partitioner.java)实现循环分配。你可以用
DefaultPartitioner
供参考-https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/defaultpartitioner.javazy1mlcev4#
您可以通过覆盖Kafka生产者的默认分区器,以循环方式生成Kafka。
伪实现
上面的实现是纯粹的循环,如果您希望消息按照键排序并进行循环,则必须在自定义分区器中执行更多操作。