flink流式字数聚合

tgabmvqs  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(410)

我碰到了一类问题,这些问题在批处理中是不存在的,但在流式处理的情况下似乎并不重要。让我们考虑一下经典的字数计算示例:

  1. lines
  2. .flatMap(_.split("\\W+"))
  3. .map(word => (word, 1))
  4. .keyBy(0)
  5. .sum(1)

这将打印流中每个单词的结果,例如:

  1. input: "foo bar baz foo"
  2. output: (foo, 1) (bar, 1) (baz, 1) (foo, 2)

我想做的是将每一行作为一个整体进行处理,然后才打印结果,即在每一行上使用一个窗口:

  1. input: "foo bar baz foo"
  2. output: (foo, 2) (bar, 1) (baz, 1)

显然,基于时间和基于计数的窗口在这里都不适用。解决这个问题的正确方法是什么?

1cklez4t

1cklez4t1#

即使在批处理模式下,也无法并行处理字和行,因为嵌套 groupBy (或 keyBy )在flink中不支持。但是,如果您需要以下批字计数的流式版本:

  1. lines
  2. .flatMap(line => (lineId,word,1))
  3. .groupBy(0)
  4. .reduceGroup {aggregateWords}

哪里 aggregateWords 迭代该特定键的单词并对它们进行计数,然后可以按以下方式实现:对于每一行,您在末尾发出单词和一个特殊记录,然后使用一个带有自定义触发器的globalwindow,该触发器在收到特殊记录后触发。
以前批处理作业的流式版本可能如下所示:

  1. public static void main(String[] args) throws Exception {
  2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.fromElements("foo bar baz foo", "yes no no yes", "hi hello hi hello")
  4. .flatMap(new FlatMapFunction<String, Tuple3<Double, String, Integer>>() {
  5. @Override
  6. public void flatMap(String s, Collector<Tuple3<Double, String, Integer>> collector) throws Exception {
  7. String[] words = s.split("\\W+");
  8. Double lineId = Math.random();
  9. for (String w : words) {
  10. collector.collect(Tuple3.of(lineId, w, 1));
  11. }
  12. collector.collect(Tuple3.of(lineId, "\n", 1));
  13. }
  14. })
  15. .keyBy(0)
  16. .window(GlobalWindows.create())
  17. .trigger(new Trigger<Tuple3<Double, String, Integer>, GlobalWindow>() {
  18. @Override
  19. public TriggerResult onElement(Tuple3<Double, String, Integer> element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  20. if (element.f1.equals("\n")) {
  21. return TriggerResult.FIRE;
  22. }
  23. return TriggerResult.CONTINUE;
  24. }
  25. @Override
  26. public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  27. return TriggerResult.CONTINUE;
  28. }
  29. @Override
  30. public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  31. return TriggerResult.CONTINUE;
  32. }
  33. })
  34. .fold(new HashMap<>(), new FoldFunction<Tuple3<Double, String, Integer>, HashMap<String, Integer>>() {
  35. @Override
  36. public HashMap<String, Integer> fold(HashMap<String, Integer> hashMap, Tuple3<Double, String, Integer> tuple3) throws Exception {
  37. if (!tuple3.f1.equals("\n")) {
  38. hashMap.put(tuple3.f1, hashMap.getOrDefault(tuple3.f1, 0) + 1);
  39. }
  40. return hashMap;
  41. }
  42. }).print();
  43. env.execute("Test");
  44. }

输出:

  1. {bar=1, foo=2, baz=1}
  2. {no=2, yes=2}
  3. {hi=2, hello=2}
展开查看全部

相关问题