用于滚动聚合的globalwindow的替代方案

ht4b089n  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(349)

我想知道flink是否适合以下用例。假设我有一个测量流(设备id,值),例如。
(1, 10.2), (2, 3.4), (3, 9.1), (1, 7.0), (3, 6.3), (5, 17.8)
我每分钟都要报告到目前为止看到的任何设备id的最新值。
根据数据:

  1. data: (1, 10.2), (2, 3.4), (3, 9.1), (1, 7.0), (3, 6.3), (5, 17.8)
  2. time: 0 ----------------- 1min -------------- 2min ------------------ 3min

我想要一个结果:
1: { (1, 10.2), (2, 3.4) }
2: { (1, 7.0), (2, 3.4), (3, 9.1) }
3: { (1, 7.0), (2, 3.4), (3, 6.3), (5, 17.8) }
我提出了包括

  1. .windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1)).apply( ... )

但是在一个大的数据集上它看起来并不好(内存方面)。还有别的办法吗?

qcuzuvrc

qcuzuvrc1#

您可能需要考虑以下内容作为起点:

  1. public class StreamingJob {
  2. private static final TimeUnit windowTimeUnit = TimeUnit.SECONDS;
  3. private static final long windowLength = 10;
  4. private static long getNearestRightBoundaryFor(Long timestamp, Long duration, TimeUnit unit){
  5. Long durationEpoch = unit.toMillis(duration);
  6. Long quotient = timestamp / durationEpoch;
  7. return (quotient + 1) * durationEpoch - 1;
  8. }
  9. public static void main(String[] args) throws Exception {
  10. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  12. env.fromElements(
  13. Tuple3.of(1000L, 1L, 3.8f), Tuple3.of(2003L, 2L, 82.3f), Tuple3.of(3006L, 1L, 4.2f), // 0 - 09
  14. Tuple3.of(11120L, 2L, 10f), Tuple3.of(12140L, 2L, 7.15f), Tuple3.of(13150L, 3L, 3.33f), // 10 - 19
  15. Tuple3.of(21200L, 2L, 1.09f), Tuple3.of(22270L, 1L, 2.22f), Tuple3.of(23280L, 2L, 3.8f), // 20 - 29
  16. Tuple3.of(31310L, 3L, 3.12f), Tuple3.of(32330L, 2L, 9.2f), Tuple3.of(33390L, 1L, 4.0f) // 30 - 39
  17. )
  18. .assignTimestampsAndWatermarks(
  19. new AssignerWithPunctuatedWatermarks<Tuple3<Long,Long,Float>>() {
  20. @Nullable
  21. @Override
  22. public Watermark checkAndGetNextWatermark(Tuple3<Long, Long, Float> lastElement, long extractedTimestamp) {
  23. return new Watermark(extractedTimestamp);
  24. }
  25. @Override
  26. public long extractTimestamp(Tuple3<Long, Long, Float> element, long previousElementTimestamp) {
  27. return element.f0;
  28. }
  29. })
  30. .keyBy(new KeySelector<Tuple3<Long,Long,Float>, Long>() {
  31. @Override
  32. public Long getKey(Tuple3<Long, Long, Float> value) throws Exception {
  33. return value.f1;
  34. }
  35. })
  36. .process(new KeyedProcessFunction<Long, Tuple3<Long,Long,Float>, Tuple4<Long, Long, Long, Float>>() {
  37. private ValueState<Tuple3<Long, Long, Float>> state;
  38. @Override
  39. public void open(Configuration parameters) {
  40. ValueStateDescriptor<Tuple3<Long, Long, Float>> descriptor = new ValueStateDescriptor<>(
  41. "state",
  42. TypeInformation.of(new TypeHint<Tuple3<Long, Long, Float>>() {
  43. }));
  44. state = getRuntimeContext().getState(descriptor);
  45. }
  46. @Override
  47. public void processElement(Tuple3<Long, Long, Float> value, Context ctx, Collector<Tuple4<Long, Long, Long, Float>> out) throws Exception {
  48. Tuple3<Long, Long, Float> currentValue = state.value();
  49. if (currentValue == null) {
  50. Long ts = getNearestRightBoundaryFor(value.f0, windowLength, windowTimeUnit);
  51. ctx.timerService().registerEventTimeTimer(ts);
  52. state.update(value);
  53. }
  54. else if (value.f0 > currentValue.f0) { // ignore out-of-order events
  55. state.update(value);
  56. }
  57. }
  58. @Override
  59. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<Long, Long, Long, Float>> out) throws IOException {
  60. Tuple3<Long, Long, Float> currentValue = state.value();
  61. out.collect(new Tuple4(timestamp, currentValue.f0, currentValue.f1, currentValue.f2));
  62. Long newTs = timestamp + windowTimeUnit.toMillis(windowLength);
  63. if (ctx.timerService().currentWatermark() < Long.MAX_VALUE) {
  64. ctx.timerService().registerEventTimeTimer(newTs);
  65. }
  66. }
  67. })
  68. .print();
  69. env.execute("Flink FTW!");
  70. }
  71. }

需要指出的是:
我不建议用windows来做这个。使用globalwindows,管理过期状态变得很复杂。
我使用了带标点水印的赋值函数,而不是升序时间戳抽取函数。我这样做有三个原因:(1)一旦切换到并行运行,可能很难确保事件按顺序到达(2) AscendingTimeStampExtractor定期生成水印(默认情况下,每200毫秒实时一次),在本例中,应用程序在生成第一个水印之前已经消耗了所有输入(3) processelement方法中的一个简单检查就是处理无序事件所需的全部。但是如果事件确实有序,那么在生产中使用ascendingtimestampextractor或BoundedAutoforErnessTimestampExtractor可能会更好。
输出如下所示:

  1. (9999,11120,2,10.0)
  2. (19999,21200,2,1.09)
  3. (19999,13150,3,3.33)
  4. (29999,23280,2,3.8)
  5. (29999,31310,3,3.12)
  6. (39999,32330,2,9.2)
  7. (39999,31310,3,3.12)
  8. (9999,3006,1,4.2)
  9. (19999,3006,1,4.2)
  10. (29999,22270,1,2.22)
  11. (39999,33390,1,4.0)

(11120,2,10.0)在9999触发的原因是,正是这个时间戳为11120的事件的到来使水印前进到9999之后,导致计时器触发。在调用ontimer时,onelement已经被调用了。
ctx.timerservice().currentwatermark()<long.max\u值的检入计时器是这样,这个有限的示例不会永远运行。如果流式处理作业到达其输入的结尾,则注入时间戳为long.max\u值的最终水印,以导致任何剩余计时器的最后一次触发。在这种情况下,我们不应该创建另一个计时器。

展开查看全部

相关问题