apacheflink:两个(或更多)任务管理器之间的共享状态

to94eoyn  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(948)

假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:

KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);

一个任务管理器将使用一个kafka主题的数据,另一个任务管理器将使用另一个kafka主题的数据。
我将作业发送给作业管理器以执行它。flink分配两个任务管理器来处理flatmap(因为任务管理器只有一个任务槽)。
flatmap在事件之间进行简单连接(使用两个键控状态):

public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...);
                BState.clear();
            }
        }

        @Override
        public void flatMap2(A event, Collector<String> out) throws Exception {
            A firstEvent = AState.value();

            if (firstEvent == null)
                BState.update(event);
            else {
                out.collect(...);
                AState.clear();
            }
        }
    }

如果我理解正确的话,在connect方法之后,流就变成了一个。现在,实现的flatmap需要共享状态,因为操作符必须控制是否到达了correspective事件来应用join,但是它是以两个并行的方式执行的,所以使用两个任务管理器。这意味着每次必须更新一个状态时,任务管理器都应该保存在另一个任务管理器的状态中(在connect方法之后共享),或者它可能需要简单地读取该状态。那么任务管理者是如何沟通的呢?由于任务管理器可能在不同的集群节点上运行,它是否会影响性能?
编辑:我在flink的博客上找到了下面的文章,似乎两个任务管理器可以通过tcp连接进行通信,这对我来说很有意义,因为在某些情况下,我们需要在事件之间共享状态。如果这是错误的,你能向我解释一下flink是如何处理以下场景的吗?
假设总是有两个任务管理器,物理上位于两个集群节点上。每个任务管理器总是只有一个插槽。我运行上述作业并将并行度设置为2(例如,在将作业发送到作业管理器时使用-p参数)。现在,flink将从我的工作中创建两个子任务,它们在结构上是相同的,并将它们发送给任务管理器。两个任务管理器将执行“相同”的作业,但使用不同的事件。作业使用两个kafka主题中的事件:a和b。这意味着第一个和第二个任务管理器将同时使用主题a和主题b中的事件,但是使用不同的事件,否则会有重复的事件。作业是相同的,即它执行上面的richcoflatmapmfunction,然后每个任务管理器都将在本地处理其已使用的事件集和个人本地状态。现在问题来了:假设第一个任务管理器使用了一个键为“1”的事件。此事件到达richcoflatmappfunction内部,并存储在state内部,因为操作符仍在等待另一个具有相同键的事件来生成连接。如果另一个键为“1”的事件从第二个任务管理器中使用,并且它们不共享状态或通信,那么就不可能进行连接。我的推理有什么错?

gkl3eglg

gkl3eglg1#

两个任务管理器不需要为了状态共享而进行通信——flink中没有状态共享。
下面显示的这三个执行图中的任何一个都是可能的,这取决于您如何安排源的细节。在每个图的左侧,我们可以看到a和b的源操作符,在右侧,两个平行的双输入操作符示例通过richcoflatmap实现连接。

keyby不是操作符,而是指定如何连接源和两个richcoflatmap示例。它将其安排为一个哈希连接,对源流进行重新分区。
使用这三种场景中的哪一种并不重要,因为在这三种情况下,keyby将具有相同的效果,将某些键的所有事件转向join1,将其他键的所有事件转向join2。
换句话说,对于任何给定的键,该键的所有事件都将在同一个任务槽中处理。你能想到 ValueState<A> 作为分布式(分片)键/值存储,其中值的类型为a。每个任务管理器都具有该键/值存储的一部分的状态(对于不相交的键子集),并处理这些键的所有事件(并且仅处理这些键)。
例如:在 flatMap1 ,何时 BState.value() 用来自的元素调用 streamA ,flink运行时将访问 BState 对于当前在上下文中的键,表示与来自的事件的键关联的值 streamA 正在处理。在当前任务中,此状态将始终是本地的。同样地, flatMap2 将始终使用来自 streamB .
这种设计避免了任务管理器之间的任何耦合,这有利于可伸缩性和性能。

相关问题