flink输出选择器有奇怪的行为

xtupzzrd  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(418)

我有一条有两个分叉的小溪,因此有两条分流的小溪。
代码如下:

static final class MyOutputSelector1 implements OutputSelector<Long> {

    @Override
    public Iterable<String> select(Long value) {
        List<String> outputs = new ArrayList<>();
        if (value < 5) {
            outputs.add("valid1");
        }
        else {
            outputs.add("error1");
        }
        return outputs;
    }
}

static final class MyOutputSelector2 implements OutputSelector<Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Iterable<String> select(Long value) {
        List<String> outputs = new ArrayList<String>();
        if (value == 2) {
            outputs.add("valid2");
        }
        else {
            outputs.add("error2");
        }
        return outputs;
    }
}

@Test
public void outputSelectorTest() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SplitStream<Long> split1 = env.generateSequence(1, 11).split(new MyOutputSelector1());
    DataStream<Long> stream11 = split1.select("valid1");
    stream11.print();

    SplitStream<Long> split2 = stream11.split(new MyOutputSelector2());
    DataStream<Long> stream21 = split2.select("valid2");
    stream21.print();
    DataStream<Long> stream22 = split2.select("error2");
    stream22.printToErr();

    env.execute();
}

这是我执行代码时得到的输入:
程序输出
我的源代码是1到11之间的整数列表。我希望stream11只包含小于5的整数。当我打印出来的时候看起来还可以。我希望stream21包含2,这似乎是两个“2”被打印的情况。但是,我希望stream22包含除2之外的所有小于5的整数,但是打印1到11之间的所有整数。
为什么会这样?我以为第一个选择器只会在流中保留从1到4的整数,但是从5到11的整数会在最后一次拆分后重新出现。。。
总而言之,以下是我得到的和我期望的:
图表
可能有一种机制我不明白。有什么解决办法吗?我应该改用过滤器吗?
谢谢。

nxowjjhe

nxowjjhe1#

你好像发现了窃听器。我可以用flink1.1.3和当前的主分支(flink1.2-snapshot)重现这个问题。
我提交了一份jira文件:flink-5031来追踪这个错误。

相关问题