如何在计数器更新之前获取以前的状态

vnjpjtjt  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(420)

例如,我有一批大小为第5批的元组,其中包含用户的印象:

  1. Batch 1:
  2. [UUID1, clientId1]
  3. [UUID2, clientId1]
  4. [UUID2, clientId1]
  5. [UUID2, clientId1]
  6. [UUID3, clientId2]
  7. Batch 2:
  8. [UUID4, clientId1]
  9. [UUID5, clientId1]
  10. [UUID5, clientId1]
  11. [UUID6, clientId2]
  12. [UUID6, clientId2]

这是我保存计数状态的例子:

  1. TridentState ClientState = impressionStream
  2. .groupBy(new Fields("clientId"))
  3. .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
  4. "UserImpressionCounter"), new Count(), new Fields("count));
  5. Stream ClientStream = ClientState.newValuesStream();

我有明确的数据库和运行我的拓扑结构。在按clientid对流进行分组之后,我使用persistentaggregate函数和count aggregator保存状态。对于第一批,是newvaluesstream方法之后的结果: [clientId1, 4] , [clientId2, 1] . 第二批: [clientId1, 7] , [clientId2, 3] 一如预期。
clientstream在几个分支中使用,在其中一个分支中,我需要处理元组,以便使用大小为1的批处理,因为我需要关于每个元组计数的信息。大小为1的批显然是垃圾,所以我必须找出计数器的前一个状态,然后再更新它,并用tuple发出此信息,因为已经更新了计数器,例如,对于第二批 [clientId1, 7, 4] .
有人知道怎么做吗?

vatpfxk5

vatpfxk51#

我已通过添加新聚合器并加入持久聚合解决了此问题:

  1. TridentState ClientState = impressionStream
  2. .groupBy(new Fields("clientId"))
  3. .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
  4. "UserImpressionCounter"), new Count(), new Fields("count));
  5. Stream ClientBatchAggregationStream = impressionStream
  6. .groupBy(new Fields("clientId"))
  7. .aggregate(new SumCountAggregator(), new Fields("batchCount"));
  8. Stream GroupingPeriodCounterStateStream = topology
  9. .join(ClientState.newValuesStream(), new Fields("clientId"),
  10. ClientBatchAggregationStream, new Fields("clientId"),
  11. new Fields("clientId", "count", "batchCount"));

汇总器:

  1. public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> {
  2. static class CountState {
  3. long count = 0;
  4. }
  5. @Override
  6. public CountState init(Object batchId, TridentCollector collector) {
  7. return new CountState();
  8. }
  9. @Override
  10. public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
  11. state.count += 1;
  12. }
  13. @Override
  14. public void complete(CountState state, TridentCollector collector) {
  15. collector.emit(new Values(state.count));
  16. }
  17. }
展开查看全部

相关问题