flink动态接收器数

mkh04yzy  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(432)

我正在使用apache flink和kafkaconsumer来阅读kafka主题中的一些值。我还有一个从读取文件中获得的流。
根据收到的值,我想写不同Kafka主题的流。
基本上,我有一个与许多孩子有联系的领导的关系网。对于每个孩子,领导者需要在一个特定于孩子的Kafka主题中编写流read,这样孩子就可以阅读它。当孩子开始时,它会在从领导者那里读到的Kafka主题中注册自己。问题是我不知道我有多少个孩子。
例如,我阅读 1 从Kafka主题开始,我只想在一个名为 Topic1 . 我读过 1-2 ,我想写两个Kafka主题( Topic1 以及 Topic2 ).
我不知道这是否可能,因为为了写这个主题,我使用Kafka制作人和 addSink 方法和我的理解(并从我的尝试),似乎Flink需要知道的数目汇先验。
但是,难道没有办法获得这样的行为吗?

qf9go6mv

qf9go6mv1#

如果我能很好地理解你的问题,我想你可以用一个接收器来解决它,因为你可以根据正在处理的记录来选择Kafka主题。似乎源代码中的一个元素可能被写入多个主题,在这种情况下,您需要一个flatmapfunction将每个源记录复制n次(每个输出主题一个)。我建议将输出作为一对(aka Tuple2 )与(主题,记录)。

DataStream<Tuple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() {
    public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) {
        for (String topic : topics) {
            out.collect(Tuple2.of(topic, value));
        }
    }
});

然后,您可以使用前面通过创建具有keyedserializationschema的flinkkafkaproducer计算的主题,在其中实现 getTargetTopic 返回对的第一个元素。

stream.addSink(new FlinkKafkaProducer10<>(
        "default-topic",
        new KeyedSerializationSchema<>() {
            public String getTargetTopic(Tuple2<String, MyValue> element) {
                return element.f0;
            }
            ...
        },
        kafkaProperties)
);

相关问题