当两个操作符的并行度不同时,Flink使用重新平衡

xoshrz7s  于 2023-03-16  发布在  Apache
关注(0)|答案(1)|浏览(103)

我在试闪克1.12.0,
我注意到flink会自动在使用不同并行性的操作符之间添加relabance。下面是一个例子,source操作符的事件会以循环方式发送到下游的map/sink操作符:
env.addSource(...).setParallesim(1).map(...).addSink(..).setParallesim(3)
我会问flink源代码在哪里添加了这个技巧,我查看了代码,但没有找到。
谢谢。

piv4azn7

piv4azn71#

有多种方法可以在管道中进行重新平衡或重新缩放,以处理两个运算符之间不一致的并行性。您可以看到DataStream<T>基类本身定义了这种方法:

/**
 * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
 * evenly to instances of the next operation in a round-robin fashion.
 *
 * @return The DataStream with rebalance partitioning set.
 */
public DataStream<T> rebalance() {
    return setConnectionType(new RebalancePartitioner<T>());
}

/**
 * Sets the partitioning of the {@link DataStream} so that the output elements are distributed
 * evenly to a subset of instances of the next operation in a round-robin fashion.
 *
 * <p>The subset of downstream operations to which the upstream operation sends elements depends
 * on the degree of parallelism of both the upstream and downstream operation. For example, if
 * the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
 * one upstream operation would distribute elements to two downstream operations while the other
 * upstream operation would distribute to the other two downstream operations. If, on the other
 * hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
 * 4 then two upstream operations will distribute to one downstream operation while the other
 * two upstream operations will distribute to the other downstream operations.
 *
 * <p>In cases where the different parallelisms are not multiples of each other one or several
 * downstream operations will have a differing number of inputs from upstream operations.
 *
 * @return The DataStream with rescale partitioning set.
 */
@PublicEvolving
public DataStream<T> rescale() {
    return setConnectionType(new RescalePartitioner<T>());
}

这两个函数分别使用RebalancePartitionerRescalePartitioner,但是,如果再深入研究一下,就会发现StreamGraph中引用了RebalancePartitionerStreamGraph负责构造Flink在createActualEdge()调用中执行的DAG:

// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
if (partitioner == null
        && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner =
        dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>();
} else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
}

这将处理对图中节点(操作符)的评估,并检查它们各自的并行级别,必要时进行重新平衡。

相关问题