转载并且补充: flink keyby 分布不均匀问题
我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四个slot在使用,另外四个是空闲状态,数据倾斜严重,请问是什么原因呢
第一想法,难道是随机数不均匀,于是我测试一下,结果发现随机数挺均匀的
/**
* 测试点:测试随机值真的随机吗
*
* {0=12531, 1=12448, 2=12538, 3=12466, 4=12530, 5=12563, 6=12462, 7=12462}
*
* 可以看到还是很随机的
*/
public void randomIsRandom(){
Map<Integer,Long> map = new HashMap<>();
Random random = new Random() ;
for (int i = 0; i < 100000; i++) {
int value = random.nextInt(8);
Long aLong = map.get(value);
if(aLong == null){
aLong = 1l;
}else {
aLong++;
}
map.put(value,aLong);
}
System.out.println(map);
}
后来在博客:flink keyby 分布不均匀问题 看到一样的错误。
flink 从 kafka 读取数据, 经过 keyby, 然后使用 .timeWindow(Time.seconds(1))
每秒处理一次数据, 为了根据并行度, 均匀的分布 到 flink 同一个窗口算子的不同的 subtask 上, key 使用了随机整数
dataStream.map(
new MapFunction<String, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(String value) throws Exception {
int key = RandomUtils.nextInt(parallelism);
return new Tuple2<>(key, value);
}
})
.keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple2 -> tuple2.f0)
.timeWindow(Time.seconds(1));
当并行度为 2 时, 发现 数据都被发到1个 subtask, 另一个 subtask 没有数据.
并行度 > 2 时, 也会出现个别 subtask 数据很多, 这种数据倾斜问题.
这个跟 flink 的 Key Group 有关系,
key group 参考:
Flink状态的缩放(rescale)与键组(Key Group)设计
Flink在使用key进行分组的时候,会对key的hashcode()再进行一次murmurhash算法,目的是为了在实际情况中尽量打散数据,减少碰撞。但是对于我们这种使用数字手工生成的key来说,计算后得到的 subtask id 相同,所以导致了部分subtask分配不到数据
。
计算工公式:
// maxParallelism 默认 128, parallelism 为自定义的并行度
subtaskIndex = (MathUtils.murmurHash(key.hashCode()) % maxParallelism) * parallelism / maxParallelism;
具体代码在 KeyGroupRangeAssignment 类中
/**
* Assigns the given key to a parallel operator index.
*
* @param key the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @param parallelism the current parallelism of the operator
* @return the index of the parallel operator to which the given key should be routed.
*/
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
/**
* Assigns the given key to a key-group index.
*
* @param key the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @return the key-group to which the given key is assigned
*/
public static int assignToKeyGroup(Object key, int maxParallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
/**
* Assigns the given key to a key-group index.
*
* @param keyHash the hash of the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @return the key-group to which the given key is assigned
*/
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
/**
* Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
* parallelism.
*
* <p>IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
* to go beyond this boundary, this method must perform arithmetic on long values.
*
* @param maxParallelism Maximal parallelism that the job was initially created with.
* 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
* @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
* @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism.
* @return The index of the operator to which elements from the given key-group should be routed under the given
* parallelism and maxParallelism.
*/
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
对于这种手动指定 key 分布不均匀问题, 需要手动穷举几个值, 来代替 随机的数值, 使 subtaskIndex 分布更均匀
生成所需的 key
public static void main(String[] args) {
HashMap<Integer, LinkedHashSet<Integer>> result = new HashMap<>();
int parallelism = 2;// 指定并行度
int maxParallelism = 128;// 默认值
int maxRandomKey = parallelism * 10;
for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
int subtaskIndex = (MathUtils.murmurHash(Integer.valueOf(randomKey).hashCode()) % maxParallelism) * parallelism / maxParallelism;
LinkedHashSet<Integer> randomKeys = result.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
randomKeys.add(randomKey);
}
result.forEach((k, v) -> {
System.out.println("subtaskIndex: " + k + ", randomKeys: " + v);
});
}
subtaskIndex: 0, randomKeys: [4, 6, 8, 9, 12, 13, 14]
subtaskIndex: 1, randomKeys: [0, 1, 2, 3, 5, 7, 10, 11, 15, 16, 17, 18, 19]
分别不同的 subtaskIndex 中选取一个值, 例如: subtaskIndex_0 选 4, subtaskIndex_1 选 5,
然后改造原来的 keyby 中使用的 key
Integer[] rebalanceKeys = new Integer[]{4, 5};
dataStream.map(
new MapFunction<String, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(String value) throws Exception {
// rebalanceKeys 的索引位置代表对应的 subtaskIndex
int key = rebalanceKeys[RandomUtils.nextInt(parallelism)];
return new Tuple2<>(key, value);
}
})
.keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple2 -> tuple2.f0)
.timeWindow(Time.seconds(1));
进一步优化可以将生成 rebalanceKeys 数组的方法改造成工具类, 相信你可以的.
…
…
如果您没有时间尝试, 也可以参考下面的例子, createRebalanceKeys(int parallelism)
方法就是抽离出来的工具方法
下面是一个完整的例子
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Test {
/**
* 任务描述:
* 把输入的单词, 按照指定的 parallelism 数量分组, 每组数据没 5 秒钟一个窗口
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int parallelism = 5;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setParallelism(parallelism)
.enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
DataStream<List<String>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter(parallelism))
.returns(new TypeHint<Tuple2<Integer, String>>() {
})
// 使用 rebalanceKeys 中的值作为分组 key
.keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple -> tuple.f0)
// 构建窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<Tuple2<Integer, String>, List<String>, Integer, TimeWindow>() {
@Override
public void apply(Integer key, TimeWindow window,
Iterable<Tuple2<Integer, String>> input, Collector<List<String>> out) throws Exception {
List<String> lists = new ArrayList<>();
// value.f0 是之前的 key, value.f1 是 word
input.forEach(value -> lists.add(value.f1));
out.collect(lists);
}
})
// todo 处理每一个窗口的数据, 例如: 批量入 hbase
// .addSink(...)
;
env.execute("job name");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<Integer, String>> {
private int parallelism;
private Integer[] rebalanceKeys;
public Splitter(int parallelism) {
this.parallelism = parallelism;
// 用于负载均衡的 KEY 数组
this.rebalanceKeys = createRebalanceKeys(parallelism);
}
@Override
public void flatMap(String sentence, Collector<Tuple2<Integer, String>> out) throws Exception {
for (String word : sentence.split(" ")) {
// 对输入的 word 进行 hash,
// 使用"并行度" 对 hash值 取模, 得到 rebalanceKeys 中的值, 用于后面的 key 分组
// 如果直接使用 word 的 hash 值,作为后面的 keyBY 分组依据的话, 很大的可能导致分组不够均匀
int rebalanceKeyIndex = Math.abs(word.hashCode() % parallelism);
Integer key = rebalanceKeys[rebalanceKeyIndex];
out.collect(new Tuple2<Integer, String>(key, word));
}
}
}
/**
* 构建均衡 KEY 数组
*
* @param parallelism 并行度
* @return
*/
public static Integer[] createRebalanceKeys(int parallelism) {
HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new HashMap<>();
int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
// 构造多个 key 用于生成足够的 groupRanges
int maxRandomKey = parallelism * 10;
for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism);
LinkedHashSet<Integer> randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
randomKeys.add(randomKey);
}
Integer[] result = new Integer[parallelism];
for (int i = 0; i < parallelism; i++) {
LinkedHashSet<Integer> ranges = groupRanges.get(i);
if (ranges == null || ranges.isEmpty()) {
throw new RuntimeException("create rebalance keys error");
}
result[i] = ranges.stream().findFirst().get();
}
return result;
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_21383435/article/details/124121669
内容来源于网络,如有侵权,请联系作者删除!