apache-flink:count窗口超时

2vuwiymt  于 2021-06-24  发布在  Flink
关注(0)|答案(3)|浏览(595)

下面是一个简单的代码示例来说明我的问题:

case class Record( key: String, value: Int )

object Job extends App
{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
  val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  val merged = step1.union( step2, step3 )
  val keyed = merged.keyBy(0)
  val windowed = keyed.countWindow( 3 )
  val summed = windowed.sum( 1 )
  summed.print()
  env.execute("test")
}

这将产生以下结果:

Record(01,6)
Record(02,12)
Record(04,24)
Record(05,30)

正如预期的那样,不会为键“03”生成任何结果,因为计数窗口需要3个元素,而流中只有两个元素。
我想要的是某种带有超时的count窗口,这样,在某个超时之后,如果没有达到count窗口所期望的元素数,则使用现有元素生成部分结果。
对于这种行为,在我的示例中,当达到超时时将生成一条记录(03,15)。

92dk7w1h

92dk7w1h1#

我认为您可以使用processfunction实现这个用例
其中有count属性和windowend属性。使用它,您可以决定何时收集数据。

public class TimeCountWindowProcessFunction extends ProcessFunction {

    protected long windowStart;
    protected long windowEnd;
    protected long count;
    private ValueState<CountPojo> state;

    public TimeCountWindowProcessFunction(long windowSize, long count) {

    this.windowSize = windowSize;
    this.count = count;

    }

@Override
    public void open(Configuration parameters) {

    TypeInformation<CountPojo> typeInformation = TypeInformation.of(new TypeHint<CountPojo>() {
    });
    ValueStateDescriptor<CountPojo> descriptor = new ValueStateDescriptor("test", typeInformation);

    state = getRuntimeContext().getState(descriptor);
}

    @Override
    public void processElement(CountPojo input, Context ctx, Collector<CountPojo> out)
            throws Exception {

    long timestamp = ctx.timestamp();
        windowStart = timestamp - (timestamp % windowSize);
        windowEnd = windowStart + windowSize;

        // retrieve the current count
        CountPojo current = (CountPojo) state.value();

        if (current == null) {

            current = new CountPojo();
        current.count = 1;

            ctx.timerService().registerEventTimeTimer(windowEnd);
        } else {

            current.count += 1;
        }

        if(current.count >= count) {
        out.collect(current);
    }

        // set the state's timestamp to the record's assigned event time timestamp
        current.setLastModified(ctx.timestamp());

        // write the state back
        state.update(current);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountPojo> out)
            throws Exception {

        if (windowEnd == timestamp) {

            out.collect(state.value());
        }

        state.clear();
    }
}

我希望这对你有帮助。

e4yzc0pl

e4yzc0pl2#

你也可以用一个自定义窗口来实现 Trigger 它在达到计数或超时过期时激发,有效地混合了内置的 CountTrigger 以及 EventTimeTrigger .

ubby3x7f

ubby3x7f3#

我遵循了大卫和尼拉夫的方法,下面是结果。
1) 使用自定义触发器:
在这里我颠倒了我最初的逻辑。我没有使用“计数窗口”,而是使用一个“时间窗口”,其持续时间与超时相对应,后跟一个触发器,在处理完所有元素后触发。

case class Record( key: String, value: Int )

object Job extends App
{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
  val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  val merged = step1.union( step2, step3 )
  val keyed = merged.keyBy(0)
  val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
  val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
  val summed = triggered.sum( 1 )
  summed.print()
  env.execute("test")
}

下面是触发代码:

import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.ReducingState
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.common.typeutils.base.LongSerializer
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
 * A trigger that fires when the count of elements in a pane reaches the given count or a 
 * timeout is reached whatever happens first.
 */
class CountTriggerWithTimeout[W <: TimeWindow](maxCount: Long, timeCharacteristic: TimeCharacteristic) extends Trigger[Object,W] 
{
  private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( "count", new Sum(), LongSerializer.INSTANCE)

  override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = 
  {
      val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
      count.add( 1L )
      if ( count.get >= maxCount || timestamp >= window.getEnd ) TriggerResult.FIRE_AND_PURGE else TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = 
  {
      if (timeCharacteristic == TimeCharacteristic.EventTime) TriggerResult.CONTINUE else
      {
          if ( time >= window.getEnd ) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
      }
  }

  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = 
  {
      if (timeCharacteristic == TimeCharacteristic.ProcessingTime) TriggerResult.CONTINUE else
      {
          if ( time >= window.getEnd ) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
      }
  }

  override def clear(window: W, ctx: TriggerContext): Unit = 
  {
          ctx.getPartitionedState( countState ).clear
    }

    class Sum extends ReduceFunction[java.lang.Long] 
  {
        def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
  }
}

2) 使用过程函数:

case class Record( key: String, value: Int )

object Job extends App
{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic( TimeCharacteristic.IngestionTime )
  val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
  val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  val merged = step1.union( step2, step3 )
  val keyed = merged.keyBy(0)
  val processed = keyed.process( new TimeCountWindowProcessFunction( 3, 100 ) )
  processed.print()
  env.execute("test")
}

当所有逻辑(即,窗口、触发和求和)进入函数时:

import org.apache.flink.streaming.api.functions._
import org.apache.flink.util._
import org.apache.flink.api.common.state._

case class Status( count: Int, key: String, value: Long )

class TimeCountWindowProcessFunction( count: Long, windowSize: Long ) extends ProcessFunction[Record,Record] 
{
    lazy val state: ValueState[Status] = getRuntimeContext
      .getState(new ValueStateDescriptor[Status]("state", classOf[Status]))

    override def processElement( input: Record, ctx: ProcessFunction[Record,Record]#Context, out: Collector[Record] ): Unit =
    {
        val updated: Status = Option( state.value ) match {
            case None => {
                ctx.timerService().registerEventTimeTimer( ctx.timestamp + windowSize )
                Status( 1, input.key, input.value )
            }
            case Some( current ) => Status( current.count + 1, input.key, input.value + current.value )    
        }
        if ( updated.count == count ) 
        {
            out.collect( Record( input.key, updated.value ) )
            state.clear
        }
        else
        {
            state.update( updated )  
        }        
    }

    override def onTimer( timestamp: Long, ctx: ProcessFunction[Record,Record]#OnTimerContext, out: Collector[Record] ): Unit =
    {
        Option( state.value ) match {
            case None => // ignore
            case Some( status ) => {
                out.collect( Record( status.key, status.value ) )
                state.clear
            }
        }
    }
}

相关问题