在具有不同路径的单个接收器中发射“侧输出”和“进程输出”

vulvrdjw  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(660)

如何使用单个接收器发出“侧输出”和“进程输出”。在这里,在这种情况下,两个输出都需要发射到单个接收器,并且基于标记文件夹路径不同

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};    
SingleOutputStreamOperator<String> mainDataStream = source.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        try {
             builder.parse(new InputSource(new StringReader(value)));
             out.collect(value);
        } catch (SAXException | IOException e) {
             ctx.output(outputTag, value);
        }
    }
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

还有其他更好的解决办法吗?只是担心表现

nwnhqdif

nwnhqdif1#

如果要使用单个接收器,可以将属性添加到输出格式中,并使用该属性标识单个接收器中的数据源。
您还可以构造具有不同参数的两个接收器,以从不同的源接收数据。在我看来,不考虑你使用的数据库,这种多线程的方式有更好的性能。

piztneat

piztneat2#

flink的bucketingsink可以使用bucketer来确定将使用基目录中的哪个子目录。因此,您可以使用它来根据正在写入的记录中的属性设置子目录。
至于使用单个接收器,由于函数的主输出和副输出都是字符串对象(相同类型),因此可以 mainDataStream.union(sideOutputStream) 在输出结果之前,将两个流合并在一起。

相关问题