我在学习三叉戟框架。三叉戟有几种方法 Stream
用于在批处理中聚合元组,包括允许使用 Aggregator
接口。但是不幸的是,一个内置的对应项额外地保持了map状态,就像其他的9个重载一样 persistentAggregate()
,仅与 Aggregator
作为论据,是不存在的。
因此,如何通过结合较低级别的trident和storm抽象和工具来实现所需的功能?因为几乎没有javadoc文档,所以探索api是相当困难的。
换句话说, persistentAggregate()
方法允许通过更新某些持久状态来结束流处理:
stream of tuples ---> persistent state
我想更新持久状态并发出不同的元组:
stream of tuples ------> stream of different tuples
with
persistent state
``` `Stream.aggregate(Fields, Aggregator, Fields)` 不提供容错:
stream of tuples ------> stream of different tuples
with
simple in-memory state
1条答案
按热度按时间a64a0gku1#
可以使用tridentstate#newvaluesstream()方法从状态创建新流。这将允许您检索聚合值流。
为了便于说明,我们可以通过添加此方法和调试过滤器来改进trident文档中的示例:
运行此拓扑将输出(到控制台)聚合计数。
希望有帮助