获取用于处理后期事件的前一个窗口值

qmb5sa22  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(393)

我正在寻找一种方法来设置窗口,以允许迟到,以及让我计算值的基础上,以前的值计算的会话。
我的sessions值总体上是一个唯一的标识符,不应该有冲突,但从技术上讲,sessions可以随时进入。在大多数会话中,大多数事件都是在5分钟内处理的,允许迟到1天应该可以满足任何迟到的事件。

stream
    .keyBy { jsonEvent => jsonEvent.findValue("session").toString }
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(5)))
    .allowedLateness(Time.days(1))
    .process { new SessionProcessor }
    .addSink { new HttpSink }

对于每个会话,我将查找字段的最大值,并检查是否有几个事件没有发生(如果发生了,它们将使最大值字段为零)。我决定创建一个 ProcessWindowFunction 做这个。

Class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {

   override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
      //Parse and calculate data
      maxValue = if(badEvent1) 0 else maxValue
      maxValue = if(badEvent2) 0 else maxValue          
      out.collect((string1,string2,string3, maxValue))
   }
}

在考虑到后期事件之前,这可以很好地工作。当一个迟到的事件发生时, maxValue 重新计算并输出到 HttpSink 再一次。我正在寻找一种方法,这样我就可以计算出以前的增量 maxValue 而且很晚 maxValue .
我正在寻找一种方法来确定:
如果对函数的调用来自一个延迟事件(我不想重复计算会话总数)
新数据是什么,或者如果有办法,可以存储以前的计算值。
任何帮助都将不胜感激。
编辑:用于valuestate的新代码
Kafka消费者.scala

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema
import org.apache.flink.streaming.api.scala._
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object KafkaConsumer {
   def main(args: Array[String]) {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
      val properties = getServerProperties
      val consumer = new FlinkKafkaConsumer010[ObjectNode]("test-topic", new JSONDeserializationSchema, properties)
      consumer.setStartFromLatest()
      val stream = env.addSource(consumer)

      stream
        .keyBy { jsonEvent => jsonEvent.findValue("data").findValue("query").findValue("session").toString }
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .allowedLateness(Time.days(1))
        .process {
          new SessionProcessor
        }
        .print
      env.execute("Kafka APN Consumer")
    }
  }

会话处理器.scala

import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {

  final val previousValue = new ValueStateDescriptor("previousValue", classOf[Long])

  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {

    val previousVal: ValueState[Long] = context.windowState.getState(previousValue)
    val pVal: Long = previousVal.value match {
      case i: Long => i
    }
    var session = ""
    var user = ""
    var department = ""
    var lVal: Long = 0

    elements.foreach( value => {
      var jVal: String = "0"
      if (value.findValue("data").findValue("query").has("value")) {
        jVal = value.findValue("data").findValue("query").findValue("value").toString replaceAll("\"", "")
      }
      session = value.findValue("data").findValue("query").findValue("session").toString replaceAll("\"", "")
      user = value.findValue("data").findValue("query").findValue("user").toString replaceAll("\"", "")
      department = value.findValue("data").findValue("query").findValue("department").toString replaceAll("\"", "")
      lVal = if (jVal.toLong > lVal) jVal.toLong else lVal
    })

    val increaseTime = lVal - pVal
    previousVal.update(increaseTime)
    out.collect((session, user, department, increaseTime))
  }
}
ghhkc1vu

ghhkc1vu1#

下面是一个类似的例子。希望这是合理的自我解释,应该很容易适应你的需要。
这里的基本思想是 context.windowState() ,它是通过传递给processwindowfunction的上下文提供的每个窗口状态。这个windowstate实际上只对多次启动的窗口有用,因为每个新窗口示例都有一个新初始化的(空的)windowstate存储。对于在所有窗口中共享的状态(但仍设置了关键帧),请使用 context.globalState() .

private static class DifferentialWindowFunction
  extends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> {

  private final static ValueStateDescriptor<Long> previousFiringState =
    new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE);

  private final static ReducingStateDescriptor<Long> firingCounterState =
    new ReducingStateDescriptor<>("firing-counter", new Sum(), LongSerializer.INSTANCE);

  @Override
  public void process(
      String key, 
      Context context, 
      Iterable<Long> values, 
      Collector<Tuple2<Long, Long>> out) {

    ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
    ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);

    Long output = Iterables.getOnlyElement(values);
    if (firingCounter.get() == null) {
      // first firing
      out.collect(Tuple2.of(0L, output));
    } else {
      // subsequent firing
      out.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value()));    
    } 
    firingCounter.add(1L);
    previousFiring.update(output);
  }

  @Override
  public void clear(Context context) {
    ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
    ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);

    previousFiring.clear();
    firingCounter.clear();
  }
}

相关问题