如何在kafka中均匀分布数据,通过spark生成消息?

z18hc3ub  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(490)

我有一个将数据写入kafka的流式工作,我注意到其中一个kafka分区(#3)比其他分区占用更多的数据。

+-----------------------------------------------------+
| partition | messages  | earlist offset | next offset|
+-----------------------------------------------------+
|1          | 166522754 | 5861603324     | 6028126078 |
|2          | 152251127 | 6010226633     | 6162477760 |
|3          | 382935293 | 6332944925     | 6715880218 |
|4          | 188126274 | 6171311709     | 6359437983 |
|5          | 188270700 | 6100140089     | 6288410789 |
+-----------------------------------------------------+

我找到了一个选项-使用kafka分区数(5)重新划分输出数据集。
有没有其他方法来平均分配数据?

ymzxtsji

ymzxtsji1#

kafka中数据的分区方式与spark及其数据集中数据的分区方式无关。从kafka的Angular 来看,它取决于消息的键,或者在写入kafka时应用自定义的partitioner类。
kafka中的数据分区有以下几种情况:

消息键为空且没有自定义分区器

如果kafka消息中没有定义密钥,则kafka将以循环方式将消息分发到所有分区。

消息键不为null且没有自定义分区器

如果您提供消息密钥,默认情况下,kafka将根据

hash(key) % numer_of_partitions

提供自定义分区器

如果您想完全控制kafka在主题分区中存储消息的方式,可以编写自己的partitioner类并将其设置为 partitioner.class 在生产者配置中。
下面是一个customer partitioner类的示例

public class MyPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {}
  public void close() {}

  public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String)))
      throw new InvalidRecordException("Record did not have a string Key");

    if (((String) key).equals("myKey"))
       return 0; // This key will always go to Partition 0

    // Other records will go to the rest of the Partitions using a hashing function
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
  }
}

相关问题