如何在storm trident中按窗口中的元组分组?

v09wglhw  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(163)

我需要在storm trident中对tumblingwindow中的元组(groupby基于另外两个字段加上它们的时间窗口)执行groupby,然后对它们应用聚合函数。以下代码聚合窗口中的所有元组:

topology.newStream("fixed_spout", fixedSpout).each(new Fields("flows"), new  ExtractflowValues2("IPV4_SRC_ADDR"),new Fields("sa"))
    .each(new Fields("flows"),new ExtractflowValues2("L4_DST_PORT"),new Fields("dp"))
    .each(new Fields("flows"),new ExtractflowValues2("IPV4_DST_ADDR"),new Fields("da"))
    .tumblingWindow(BaseWindowedBolt.Duration.seconds(5), wsf, new Fields(groupbyFileds), new countDistinct("da"), new Fields("count"));

在上面的代码中,我首先从元组中提取3个字段(sa、da和dp),然后将元组放入持续时间为5秒的窗口中,并计算每个窗口中不同“da”的数量。然而,我真正需要的是将元组放入持续时间为5秒的窗口中,并根据它们的“sa”和“dp”字段对这些元组进行分组,然后计算它们不同的“da”的数量。我怎样才能做到呢?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题