如何Map三叉戟持久状态元组?

kjthegm6  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(382)

我在学习三叉戟框架。三叉戟有几种方法 Stream 用于在批处理中聚合元组,包括允许使用 Aggregator 接口。但是不幸的是,一个内置的对应项额外地保持了map状态,就像其他的9个重载一样 persistentAggregate() ,仅与 Aggregator 作为论据,是不存在的。
因此,如何通过结合较低级别的trident和storm抽象和工具来实现所需的功能?因为几乎没有javadoc文档,所以探索api是相当困难的。
换句话说, persistentAggregate() 方法允许通过更新某些持久状态来结束流处理:

stream of tuples ---> persistent state

我想更新持久状态并发出不同的元组:

stream of tuples ------> stream of different tuples
                  with
            persistent state
``` `Stream.aggregate(Fields, Aggregator, Fields)` 不提供容错:

stream of tuples ------> stream of different tuples
with
simple in-memory state

a64a0gku

a64a0gku1#

可以使用tridentstate#newvaluesstream()方法从状态创建新流。这将允许您检索聚合值流。
为了便于说明,我们可以通过添加此方法和调试过滤器来改进trident文档中的示例:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"),
    new Values("the man went to the store and bought some candy"),
    new Values("four score and seven years ago"),
    new Values("how many apples can you eat"));
spout.setCycle(true);

TridentTopology topology = new TridentTopology();        
topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
    .newValuesStream().each(new Fields("count"), new Debug());

运行此拓扑将输出(到控制台)聚合计数。
希望有帮助

相关问题