dynamicpivot使用storm

hpcdzsge  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(295)

我在bigdatadb中有列名为col1、col2、col3、val1、val2的行(在我的例子中是cassandra)
在sql方法中,我可以按col1、col2或col2、col1或任何其他可能的方式进行分组。这样我可以很容易地形成树层次结构。
但是现在我们使用cassandra来存储不支持groupby的数据。所以我们想使用storm进行分组和聚合。我们编写了一些示例代码来进行聚合和分组,但是我们无法确定是否可以实现。
数据是这样的

  1. col1,col2,col3,val1,val2
  2. ------------------------
  3. a1,b1,c1,10,20
  4. a1,b1,c2,11,13
  5. a1,b2,c1,9,15
  6. a1,b2,c3,13,88
  7. a2,b1,c1,30,44
  8. a2,b3,c2,22,33
  9. a4,b4,c4,99,66

就像在excel pivot中一样,我想构建层次结构root->child1->child2->child3-val1,val2,如果我的层次结构是col1->col2->col3,那么它可能是这样的

  1. a1 {43,136}
  2. --b1 {21,33}
  3. --c1 10,20
  4. --c2 11,13
  5. --b2 {22,103}
  6. --c1 9,15
  7. --c3 13,88
  8. a2 {52,77}
  9. --b1 {30,44}
  10. --c1 30,44
  11. --b3 {22,33}
  12. --c2 22,33
  13. a4 {99,66}
  14. --b4 {99,66}
  15. --c4 99,66

我想给用户提供重新排列层次结构元素的功能,比如col3->col1->col2(或者其他一些动态元素),在这种情况下,数据如下所示

  1. c1 {49,79}
  2. --a1 {19,35}
  3. --b1 10,20
  4. --b2 9,15
  5. --a2 {30,44}
  6. --b1 30,44
  7. c2 {11,13}
  8. --a1 {11,13}
  9. --b1 11,13
  10. --a2 {22,33}
  11. --b3 22,33
  12. c3 {13,88}
  13. --a1 {13,88}
  14. --b2 13,88
  15. c4 {99,66}
  16. --a4 {99,66}
  17. --b4 99,66

我的三叉戟代码中有几行是这样的,这并不像预期的那样工作。

  1. topology.newStream("aggregation", spout)
  2. .groupBy(new Fields("col1","col2","col3","val1","val2"))
  3. .aggregate(new Fields("val1","val2"), new Sum(), new Fields("val1sum","val2sum"))
  4. .each(new Fields("col1","col2","col3","val1sum","val2sum"), new Utils.PrintFilter());

对于上面的转换,我想使用storm,不管有没有tridentapi支持。有人能指导我如何实现它吗?任何方案的想法都非常感谢。

ycggw6v2

ycggw6v21#

groupby中应该只包括维度(col1、col2和col3),而不包括度量值(val1、val2)。当需要聚合多个度量值时,需要使用chainedagg()构造。以下是针对您的用例更改的拓扑代码:

  1. topology.newStream("aggregation", spout)
  2. .groupBy(new Fields("col1","col2"))
  3. .chainedAgg()
  4. .aggregate(new Fields("val1"), new Sum(), new Fields("val1sum"))
  5. .aggregate(new Fields("val2"), new Sum(), new Fields("val2sum"))
  6. .chainEnd()
  7. .each(new Fields("col1","col2","val1sum", "val2sum"), new Utils.PrintFilter());

它产生如下输出,正如您所期望的!
分区ID=0,[a1,b1,21,33]
分区ID=0,[a1,b2,22,103]
分区ID=0,[a4,b4,99,66]
分区ID=0,[a2,b1,30,44]
分区ID=0,[a2,b3,22,33]
干杯!
mk公司

展开查看全部

相关问题