如何使用单个接收器发出“侧输出”和“进程输出”。在这里,在这种情况下,两个输出都需要发射到单个接收器,并且基于标记文件夹路径不同
如
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
SingleOutputStreamOperator<String> mainDataStream = source.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
try {
builder.parse(new InputSource(new StringReader(value)));
out.collect(value);
} catch (SAXException | IOException e) {
ctx.output(outputTag, value);
}
}
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
还有其他更好的解决办法吗?只是担心表现
2条答案
按热度按时间nwnhqdif1#
如果要使用单个接收器,可以将属性添加到输出格式中,并使用该属性标识单个接收器中的数据源。
您还可以构造具有不同参数的两个接收器,以从不同的源接收数据。在我看来,不考虑你使用的数据库,这种多线程的方式有更好的性能。
piztneat2#
flink的bucketingsink可以使用bucketer来确定将使用基目录中的哪个子目录。因此,您可以使用它来根据正在写入的记录中的属性设置子目录。
至于使用单个接收器,由于函数的主输出和副输出都是字符串对象(相同类型),因此可以
mainDataStream.union(sideOutputStream)
在输出结果之前,将两个流合并在一起。