例如,我有一批大小为第5批的元组,其中包含用户的印象:
Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]
Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]
这是我保存计数状态的例子:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientStream = ClientState.newValuesStream();
我有明确的数据库和运行我的拓扑结构。在按clientid对流进行分组之后,我使用persistentaggregate函数和count aggregator保存状态。对于第一批,是newvaluesstream方法之后的结果: [clientId1, 4]
, [clientId2, 1]
. 第二批: [clientId1, 7]
, [clientId2, 3]
一如预期。
clientstream在几个分支中使用,在其中一个分支中,我需要处理元组,以便使用大小为1的批处理,因为我需要关于每个元组计数的信息。大小为1的批显然是垃圾,所以我必须找出计数器的前一个状态,然后再更新它,并用tuple发出此信息,因为已经更新了计数器,例如,对于第二批 [clientId1, 7, 4]
.
有人知道怎么做吗?
1条答案
按热度按时间vatpfxk51#
我已通过添加新聚合器并加入持久聚合解决了此问题:
汇总器: