【FLink】flink keyby 分布不均匀问题

x33g5p2x  于2022-04-13 转载在 Flink  
字(8.8k)|赞(0)|评价(0)|浏览(1304)

1.概述

转载并且补充: flink keyby 分布不均匀问题

我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四个slot在使用,另外四个是空闲状态,数据倾斜严重,请问是什么原因呢

第一想法,难道是随机数不均匀,于是我测试一下,结果发现随机数挺均匀的

  1. /**
  2. * 测试点:测试随机值真的随机吗
  3. *
  4. * {0=12531, 1=12448, 2=12538, 3=12466, 4=12530, 5=12563, 6=12462, 7=12462}
  5. *
  6. * 可以看到还是很随机的
  7. */
  8. public void randomIsRandom(){
  9. Map<Integer,Long> map = new HashMap<>();
  10. Random random = new Random() ;
  11. for (int i = 0; i < 100000; i++) {
  12. int value = random.nextInt(8);
  13. Long aLong = map.get(value);
  14. if(aLong == null){
  15. aLong = 1l;
  16. }else {
  17. aLong++;
  18. }
  19. map.put(value,aLong);
  20. }
  21. System.out.println(map);
  22. }

2.案例2

后来在博客:flink keyby 分布不均匀问题 看到一样的错误。

flink 从 kafka 读取数据, 经过 keyby, 然后使用 .timeWindow(Time.seconds(1)) 每秒处理一次数据, 为了根据并行度, 均匀的分布 到 flink 同一个窗口算子的不同的 subtask 上, key 使用了随机整数

  1. dataStream.map(
  2. new MapFunction<String, Tuple2<Integer, String>>() {
  3. @Override
  4. public Tuple2<Integer, String> map(String value) throws Exception {
  5. int key = RandomUtils.nextInt(parallelism);
  6. return new Tuple2<>(key, value);
  7. }
  8. })
  9. .keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple2 -> tuple2.f0)
  10. .timeWindow(Time.seconds(1));

2.1 问题

当并行度为 2 时, 发现 数据都被发到1个 subtask, 另一个 subtask 没有数据.
并行度 > 2 时, 也会出现个别 subtask 数据很多, 这种数据倾斜问题.

2.3 原因

这个跟 flink 的 Key Group 有关系,
key group 参考:

Flink中Key Groups与最大并行度

Flink状态的缩放(rescale)与键组(Key Group)设计

Flink在使用key进行分组的时候,会对key的hashcode()再进行一次murmurhash算法,目的是为了在实际情况中尽量打散数据,减少碰撞。但是对于我们这种使用数字手工生成的key来说,计算后得到的 subtask id 相同,所以导致了部分subtask分配不到数据

计算工公式:

  1. // maxParallelism 默认 128, parallelism 为自定义的并行度
  2. subtaskIndex = (MathUtils.murmurHash(key.hashCode()) % maxParallelism) * parallelism / maxParallelism;

具体代码在 KeyGroupRangeAssignment 类中

  1. /**
  2. * Assigns the given key to a parallel operator index.
  3. *
  4. * @param key the key to assign
  5. * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
  6. * @param parallelism the current parallelism of the operator
  7. * @return the index of the parallel operator to which the given key should be routed.
  8. */
  9. public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
  10. Preconditions.checkNotNull(key, "Assigned key must not be null!");
  11. return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
  12. }
  13. /**
  14. * Assigns the given key to a key-group index.
  15. *
  16. * @param key the key to assign
  17. * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
  18. * @return the key-group to which the given key is assigned
  19. */
  20. public static int assignToKeyGroup(Object key, int maxParallelism) {
  21. Preconditions.checkNotNull(key, "Assigned key must not be null!");
  22. return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
  23. }
  24. /**
  25. * Assigns the given key to a key-group index.
  26. *
  27. * @param keyHash the hash of the key to assign
  28. * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
  29. * @return the key-group to which the given key is assigned
  30. */
  31. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
  32. return MathUtils.murmurHash(keyHash) % maxParallelism;
  33. }
  34. /**
  35. * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
  36. * parallelism.
  37. *
  38. * <p>IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
  39. * to go beyond this boundary, this method must perform arithmetic on long values.
  40. *
  41. * @param maxParallelism Maximal parallelism that the job was initially created with.
  42. * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
  43. * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
  44. * @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism.
  45. * @return The index of the operator to which elements from the given key-group should be routed under the given
  46. * parallelism and maxParallelism.
  47. */
  48. public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
  49. return keyGroupId * parallelism / maxParallelism;
  50. }

2.4 解决方案

对于这种手动指定 key 分布不均匀问题, 需要手动穷举几个值, 来代替 随机的数值, 使 subtaskIndex 分布更均匀

生成所需的 key

  1. public static void main(String[] args) {
  2. HashMap<Integer, LinkedHashSet<Integer>> result = new HashMap<>();
  3. int parallelism = 2;// 指定并行度
  4. int maxParallelism = 128;// 默认值
  5. int maxRandomKey = parallelism * 10;
  6. for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
  7. int subtaskIndex = (MathUtils.murmurHash(Integer.valueOf(randomKey).hashCode()) % maxParallelism) * parallelism / maxParallelism;
  8. LinkedHashSet<Integer> randomKeys = result.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
  9. randomKeys.add(randomKey);
  10. }
  11. result.forEach((k, v) -> {
  12. System.out.println("subtaskIndex: " + k + ", randomKeys: " + v);
  13. });
  14. }
  1. subtaskIndex: 0, randomKeys: [4, 6, 8, 9, 12, 13, 14]
  2. subtaskIndex: 1, randomKeys: [0, 1, 2, 3, 5, 7, 10, 11, 15, 16, 17, 18, 19]

分别不同的 subtaskIndex 中选取一个值, 例如: subtaskIndex_0 选 4, subtaskIndex_1 选 5,

然后改造原来的 keyby 中使用的 key

  1. Integer[] rebalanceKeys = new Integer[]{4, 5};
  2. dataStream.map(
  3. new MapFunction<String, Tuple2<Integer, String>>() {
  4. @Override
  5. public Tuple2<Integer, String> map(String value) throws Exception {
  6. // rebalanceKeys 的索引位置代表对应的 subtaskIndex
  7. int key = rebalanceKeys[RandomUtils.nextInt(parallelism)];
  8. return new Tuple2<>(key, value);
  9. }
  10. })
  11. .keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple2 -> tuple2.f0)
  12. .timeWindow(Time.seconds(1));

进一步优化可以将生成 rebalanceKeys 数组的方法改造成工具类, 相信你可以的.


如果您没有时间尝试, 也可以参考下面的例子, createRebalanceKeys(int parallelism) 方法就是抽离出来的工具方法

下面是一个完整的例子

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.common.typeinfo.TypeHint;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
  6. import org.apache.flink.streaming.api.CheckpointingMode;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  13. import org.apache.flink.util.Collector;
  14. import java.util.ArrayList;
  15. import java.util.HashMap;
  16. import java.util.LinkedHashSet;
  17. import java.util.List;
  18. import java.util.concurrent.TimeUnit;
  19. public class Test {
  20. /**
  21. * 任务描述:
  22. * 把输入的单词, 按照指定的 parallelism 数量分组, 每组数据没 5 秒钟一个窗口
  23. * @param args
  24. * @throws Exception
  25. */
  26. public static void main(String[] args) throws Exception {
  27. int parallelism = 5;
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
  29. .setParallelism(parallelism)
  30. .enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
  31. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  32. DataStream<List<String>> dataStream = env
  33. .socketTextStream("localhost", 9999)
  34. .flatMap(new Splitter(parallelism))
  35. .returns(new TypeHint<Tuple2<Integer, String>>() {
  36. })
  37. // 使用 rebalanceKeys 中的值作为分组 key
  38. .keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple -> tuple.f0)
  39. // 构建窗口
  40. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  41. .apply(new WindowFunction<Tuple2<Integer, String>, List<String>, Integer, TimeWindow>() {
  42. @Override
  43. public void apply(Integer key, TimeWindow window,
  44. Iterable<Tuple2<Integer, String>> input, Collector<List<String>> out) throws Exception {
  45. List<String> lists = new ArrayList<>();
  46. // value.f0 是之前的 key, value.f1 是 word
  47. input.forEach(value -> lists.add(value.f1));
  48. out.collect(lists);
  49. }
  50. })
  51. // todo 处理每一个窗口的数据, 例如: 批量入 hbase
  52. // .addSink(...)
  53. ;
  54. env.execute("job name");
  55. }
  56. public static class Splitter implements FlatMapFunction<String, Tuple2<Integer, String>> {
  57. private int parallelism;
  58. private Integer[] rebalanceKeys;
  59. public Splitter(int parallelism) {
  60. this.parallelism = parallelism;
  61. // 用于负载均衡的 KEY 数组
  62. this.rebalanceKeys = createRebalanceKeys(parallelism);
  63. }
  64. @Override
  65. public void flatMap(String sentence, Collector<Tuple2<Integer, String>> out) throws Exception {
  66. for (String word : sentence.split(" ")) {
  67. // 对输入的 word 进行 hash,
  68. // 使用"并行度" 对 hash值 取模, 得到 rebalanceKeys 中的值, 用于后面的 key 分组
  69. // 如果直接使用 word 的 hash 值,作为后面的 keyBY 分组依据的话, 很大的可能导致分组不够均匀
  70. int rebalanceKeyIndex = Math.abs(word.hashCode() % parallelism);
  71. Integer key = rebalanceKeys[rebalanceKeyIndex];
  72. out.collect(new Tuple2<Integer, String>(key, word));
  73. }
  74. }
  75. }
  76. /**
  77. * 构建均衡 KEY 数组
  78. *
  79. * @param parallelism 并行度
  80. * @return
  81. */
  82. public static Integer[] createRebalanceKeys(int parallelism) {
  83. HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new HashMap<>();
  84. int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
  85. // 构造多个 key 用于生成足够的 groupRanges
  86. int maxRandomKey = parallelism * 10;
  87. for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
  88. int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism);
  89. LinkedHashSet<Integer> randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
  90. randomKeys.add(randomKey);
  91. }
  92. Integer[] result = new Integer[parallelism];
  93. for (int i = 0; i < parallelism; i++) {
  94. LinkedHashSet<Integer> ranges = groupRanges.get(i);
  95. if (ranges == null || ranges.isEmpty()) {
  96. throw new RuntimeException("create rebalance keys error");
  97. }
  98. result[i] = ranges.stream().findFirst().get();
  99. }
  100. return result;
  101. }
  102. }

相关文章

最新文章

更多