apache-flink:count窗口超时

2vuwiymt  于 2021-06-24  发布在  Flink
关注(0)|答案(3)|浏览(636)

下面是一个简单的代码示例来说明我的问题:

  1. case class Record( key: String, value: Int )
  2. object Job extends App
  3. {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  6. val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
  7. val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  8. val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  9. val merged = step1.union( step2, step3 )
  10. val keyed = merged.keyBy(0)
  11. val windowed = keyed.countWindow( 3 )
  12. val summed = windowed.sum( 1 )
  13. summed.print()
  14. env.execute("test")
  15. }

这将产生以下结果:

  1. Record(01,6)
  2. Record(02,12)
  3. Record(04,24)
  4. Record(05,30)

正如预期的那样,不会为键“03”生成任何结果,因为计数窗口需要3个元素,而流中只有两个元素。
我想要的是某种带有超时的count窗口,这样,在某个超时之后,如果没有达到count窗口所期望的元素数,则使用现有元素生成部分结果。
对于这种行为,在我的示例中,当达到超时时将生成一条记录(03,15)。

92dk7w1h

92dk7w1h1#

我认为您可以使用processfunction实现这个用例
其中有count属性和windowend属性。使用它,您可以决定何时收集数据。

  1. public class TimeCountWindowProcessFunction extends ProcessFunction {
  2. protected long windowStart;
  3. protected long windowEnd;
  4. protected long count;
  5. private ValueState<CountPojo> state;
  6. public TimeCountWindowProcessFunction(long windowSize, long count) {
  7. this.windowSize = windowSize;
  8. this.count = count;
  9. }
  10. @Override
  11. public void open(Configuration parameters) {
  12. TypeInformation<CountPojo> typeInformation = TypeInformation.of(new TypeHint<CountPojo>() {
  13. });
  14. ValueStateDescriptor<CountPojo> descriptor = new ValueStateDescriptor("test", typeInformation);
  15. state = getRuntimeContext().getState(descriptor);
  16. }
  17. @Override
  18. public void processElement(CountPojo input, Context ctx, Collector<CountPojo> out)
  19. throws Exception {
  20. long timestamp = ctx.timestamp();
  21. windowStart = timestamp - (timestamp % windowSize);
  22. windowEnd = windowStart + windowSize;
  23. // retrieve the current count
  24. CountPojo current = (CountPojo) state.value();
  25. if (current == null) {
  26. current = new CountPojo();
  27. current.count = 1;
  28. ctx.timerService().registerEventTimeTimer(windowEnd);
  29. } else {
  30. current.count += 1;
  31. }
  32. if(current.count >= count) {
  33. out.collect(current);
  34. }
  35. // set the state's timestamp to the record's assigned event time timestamp
  36. current.setLastModified(ctx.timestamp());
  37. // write the state back
  38. state.update(current);
  39. }
  40. @Override
  41. public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountPojo> out)
  42. throws Exception {
  43. if (windowEnd == timestamp) {
  44. out.collect(state.value());
  45. }
  46. state.clear();
  47. }
  48. }

我希望这对你有帮助。

展开查看全部
e4yzc0pl

e4yzc0pl2#

你也可以用一个自定义窗口来实现 Trigger 它在达到计数或超时过期时激发,有效地混合了内置的 CountTrigger 以及 EventTimeTrigger .

ubby3x7f

ubby3x7f3#

我遵循了大卫和尼拉夫的方法,下面是结果。
1) 使用自定义触发器:
在这里我颠倒了我最初的逻辑。我没有使用“计数窗口”,而是使用一个“时间窗口”,其持续时间与超时相对应,后跟一个触发器,在处理完所有元素后触发。

  1. case class Record( key: String, value: Int )
  2. object Job extends App
  3. {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  6. val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
  7. val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  8. val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  9. val merged = step1.union( step2, step3 )
  10. val keyed = merged.keyBy(0)
  11. val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
  12. val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
  13. val summed = triggered.sum( 1 )
  14. summed.print()
  15. env.execute("test")
  16. }

下面是触发代码:

  1. import org.apache.flink.annotation.PublicEvolving
  2. import org.apache.flink.api.common.functions.ReduceFunction
  3. import org.apache.flink.api.common.functions.RuntimeContext
  4. import org.apache.flink.api.common.state.ReducingState
  5. import org.apache.flink.api.common.state.ReducingStateDescriptor
  6. import org.apache.flink.api.common.typeutils.base.LongSerializer
  7. import org.apache.flink.streaming.api.TimeCharacteristic
  8. import org.apache.flink.streaming.api.windowing.triggers._
  9. import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
  10. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  11. /**
  12. * A trigger that fires when the count of elements in a pane reaches the given count or a
  13. * timeout is reached whatever happens first.
  14. */
  15. class CountTriggerWithTimeout[W <: TimeWindow](maxCount: Long, timeCharacteristic: TimeCharacteristic) extends Trigger[Object,W]
  16. {
  17. private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( "count", new Sum(), LongSerializer.INSTANCE)
  18. override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult =
  19. {
  20. val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
  21. count.add( 1L )
  22. if ( count.get >= maxCount || timestamp >= window.getEnd ) TriggerResult.FIRE_AND_PURGE else TriggerResult.CONTINUE
  23. }
  24. override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult =
  25. {
  26. if (timeCharacteristic == TimeCharacteristic.EventTime) TriggerResult.CONTINUE else
  27. {
  28. if ( time >= window.getEnd ) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
  29. }
  30. }
  31. override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult =
  32. {
  33. if (timeCharacteristic == TimeCharacteristic.ProcessingTime) TriggerResult.CONTINUE else
  34. {
  35. if ( time >= window.getEnd ) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
  36. }
  37. }
  38. override def clear(window: W, ctx: TriggerContext): Unit =
  39. {
  40. ctx.getPartitionedState( countState ).clear
  41. }
  42. class Sum extends ReduceFunction[java.lang.Long]
  43. {
  44. def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
  45. }
  46. }

2) 使用过程函数:

  1. case class Record( key: String, value: Int )
  2. object Job extends App
  3. {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. env.setStreamTimeCharacteristic( TimeCharacteristic.IngestionTime )
  6. val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  7. val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
  8. val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  9. val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  10. val merged = step1.union( step2, step3 )
  11. val keyed = merged.keyBy(0)
  12. val processed = keyed.process( new TimeCountWindowProcessFunction( 3, 100 ) )
  13. processed.print()
  14. env.execute("test")
  15. }

当所有逻辑(即,窗口、触发和求和)进入函数时:

  1. import org.apache.flink.streaming.api.functions._
  2. import org.apache.flink.util._
  3. import org.apache.flink.api.common.state._
  4. case class Status( count: Int, key: String, value: Long )
  5. class TimeCountWindowProcessFunction( count: Long, windowSize: Long ) extends ProcessFunction[Record,Record]
  6. {
  7. lazy val state: ValueState[Status] = getRuntimeContext
  8. .getState(new ValueStateDescriptor[Status]("state", classOf[Status]))
  9. override def processElement( input: Record, ctx: ProcessFunction[Record,Record]#Context, out: Collector[Record] ): Unit =
  10. {
  11. val updated: Status = Option( state.value ) match {
  12. case None => {
  13. ctx.timerService().registerEventTimeTimer( ctx.timestamp + windowSize )
  14. Status( 1, input.key, input.value )
  15. }
  16. case Some( current ) => Status( current.count + 1, input.key, input.value + current.value )
  17. }
  18. if ( updated.count == count )
  19. {
  20. out.collect( Record( input.key, updated.value ) )
  21. state.clear
  22. }
  23. else
  24. {
  25. state.update( updated )
  26. }
  27. }
  28. override def onTimer( timestamp: Long, ctx: ProcessFunction[Record,Record]#OnTimerContext, out: Collector[Record] ): Unit =
  29. {
  30. Option( state.value ) match {
  31. case None => // ignore
  32. case Some( status ) => {
  33. out.collect( Record( status.key, status.value ) )
  34. state.clear
  35. }
  36. }
  37. }
  38. }
展开查看全部

相关问题