风暴三叉戟是否只能批量加入?

bqujaahr  于 2021-06-25  发布在  Storm
关注(0)|答案(1)|浏览(474)

我想实现一个连接语义,并尝试trident拓扑中的连接方法。我发现join是在批之间进行的。如果两个流之间的连接有数百万个元组,它是否必须在一个批内完成?
在genderspout中,每批有3个元组,因此喷口将发射2个批次agespout,每批有5个元组,因此喷口将仅发射1个批次
我用jointype做一个左外连接
测试代码的输出为:

1 man 15
2 woman 18
1 man 19
3 woman NULL
4 man NULL
1 woman NULL

从输出中,我发现前四个结果在genderspout的第一批和agespout的第一批之间连接。最后两个结果是genderspout的第二批和agespout的空批之间的连接。因此,对于连接语义,结果是不正确的,因为genderspoutleft join agespout的预期结果是:

1 man 15
1 man 19
2 woman 18
3 woman NULL
4 man 20
1 woman 15
1 woman 19

所以我的问题是:如果join的两边(喷口)有数百万个元组,我应该把它们放在一个批中才能得到正确的结果吗?
或者我的方法是错误的,你能告诉我怎样才能得到正确的外部连接语义的结果吗?
测试代码如下:

public static void main(String[] args) throws Exception{
    Fields genderField = new Field("id", "gender");
    FixedBatchSpout genderSpout = new FixedBatchSpout(genderField, 3,
        new Values("1", "man"),
        new Values("2", "woman"),
        new Values("3", "woman"),
        new Values("4", "man"),
        new Values("1", "woman"));
    genderSpout.setCycle(false);

    Fields ageField = new Field("id2", "age");
    FiexedBatchSpout ageSpout = new FixedBatchSpout(new Fields("id2", "age"), 5,
        new Values("1", "15"),
        new Values("4", "20"),
        new Values("2", "18"),
        new Values("1", "19"));
    ageSpout.setCycle(false);

    List<Stream> allStreams = new ArrayList<Stream>();
    List<Fields> allFields = new ArrayList<Fields>();
    List<Fields> joinFileds = new ArrayList<Fields>();
    List<JoinType> joinTypes = new ArrayList<JoinType>();    

    TridentTopology topology = new TridentTopology();

    Stream genderStream = topology.newStream("genderIn", genderSpout);
    Stream ageStream = topology.newStream("ageIn", ageSpout);

    allStreams.add(genderStream);
    allStreams.add(ageStream);

    allFields.add(genderFields);
    allFields.add(ageFields);

    joinFields.add(new Field("id")));
    joinFields.add(new Field("id2"));

    joinTypes.add(JoinType.INNER);
    joinTypes.add(JoinType.OUTER);

    topology.join(allStreams, joinFields, new Filds("id", "gender", "age"), joinTypes)

    LocalCluster cluster = new LocalCluster();

    Config config = new Config()
    config.setDebug(false);
    config.setMaxSpoutPending(3);

    cluster.submitTopology("trident-join-test", config, topology.build());

    Thread.sleep(3000);
    cluster.shutdown();
}
afdcj2ne

afdcj2ne1#

我以前也问过同样的问题https://groups.google.com/forum/?fromgroups=#!论坛/风暴用户:https://groups.google.com/forum/?fromgroups=#!主题/风暴用户/7fxavgf2\u
jason jackson的答案是:tridenttopology.join不会跨批进行连接。您可以使用statequery和partitionpersist之一跨批执行流连接。
希望有用

相关问题