计算总量并在flink中定期发射

xqnpmsa8  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(410)

我有一系列关于资源的事件,如下所示:

  1. id, type, count
  2. 1, view, 1
  3. 1, download, 3
  4. 2, view, 1
  5. 3, view, 1
  6. 1, download, 2
  7. 3, view, 1

我试图为每个资源生成统计信息(总计),因此如果我得到如上所述的流,结果应该是:

  1. id, views, downloads
  2. 1, 1, 5
  3. 2, 1, 0
  4. 3, 2, 0

现在我编写了一个processfunction,它可以这样计算总数:

  1. public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
  2. private ValueState<ResourceTotals> totalsState;
  3. @Override
  4. public void open(Configuration config) throws Exception {
  5. ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
  6. totalsDescriptor.setQueryable("resource-totals");
  7. totalsState = getRuntimeContext().getState(totalsDescriptor);
  8. }
  9. @Override
  10. public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
  11. ResourceTotals totals = totalsState.value();
  12. if (totals == null) {
  13. totals = new ResourceTotals();
  14. totals.id = event.id;
  15. }
  16. switch (event.type) {
  17. case "view":
  18. totals.views += event.count;
  19. break;
  20. case "download":
  21. totals.downloads += event.count;
  22. }
  23. totalsState.update(totals);
  24. out.collect(totals);
  25. }
  26. }

从代码中可以明显看出,它将为每个事件发出一个新的resourcetoals,但我希望每分钟为每个资源发出一次total,而不是更频繁。
我试着用一个全局窗口和一个触发器(continuousprocessingtimetrigger)来做实验,但是没有成功。我遇到的问题是:
如何表达我想要的最后一个事件的窗口?
如何不存储在这个全局窗口中生成的所有resourcetoals?
任何帮助都将不胜感激。

gg0vcinb

gg0vcinb1#

您可以使用计时器每分钟发出一次totalsstate中的值。因为我在数据流中看不到任何时间戳,所以我想您应该使用一个处理时间计时器。
另一种方法是将processfunction替换为一个时间窗口以及一个保留最后一个事件的reducefunction。
在这两种情况下,您都可以考虑通过id和type字段对流进行键控,这将稍微简化您的状态管理。
更新时间:
是的,计时器是状态的一部分,由flink检查并恢复。

相关问题