当制作人(Kafka)不生产新线时,如何让Flink把最后一条线冲洗下去

f0brbegy  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(248)

当我的flink程序处于事件时间模式时,sink不会得到最后一行(比如说a行)。如果我把新线(b线)给Flink,我会得到a线,但我还是不能得到b线。

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")

    val consumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)

    val stream: DataStream[String] = env.addSource(consumer).setParallelism(1)

    stream.map { m =>
      val result = JSON.parseFull(m).asInstanceOf[Some[Map[String, Any]]].get
      val msg = result("message").asInstanceOf[String]
      val num = parseMessage(msg)
      val key = s"${num.zoneId} ${num.subZoneId}"
      (key, num, num.onlineNum)
    }.filter { data =>
      data._2.subZoneId == 301 && data._2.zoneId == 5002
    }.assignTimestampsAndWatermarks(new MyTimestampExtractor()).keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
        .allowedLateness(Time.minutes(1))
      .maxBy(2).addSink { v =>
      System.out.println(s"${v._2.time} ${v._1}: ${v._2.onlineNum} ")
    }
class MyTimestampExtractor() extends AscendingTimestampExtractor[(String, OnlineNum, Int)](){
  val byMinute = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:SS")
  override def extractAscendingTimestamp(element: (String, OnlineNum, Int)): Long = {
    val dateTimeString =  element._2.date + " " + element._2.time
    val c1 = byMinute.parse(dateTimeString).getTime
    if ( element._2.time.contains("22:59") && element._2.subZoneId == 301){
      //System.out.println(s"${element._2.time} ${element._1}: ${element._2.onlineNum} ")
      // System.out.println(s"${element._2.time} ${c1 - getCurrentWatermark.getTimestamp}")
    }

    // System.out.println(s"${element._2.time} ${c1} ${c1 - getCurrentWatermark.getTimestamp}")
    return c1
  }
}

数据样本:

01:01:14 5002 301: 29 
01:01:36 5002 301: 27 
01:02:05 5002 301: 27 
01:02:31 5002 301: 29 
01:03:02 5002 301: 29 
01:03:50 5002 301: 29 
01:04:52 5002 301: 29 
01:07:24 5002 301: 26 
01:09:28 5002 301: 21 
01:11:04 5002 301: 22 
01:12:11 5002 301: 24 
01:13:54 5002 301: 23 
01:15:13 5002 301: 22 
01:16:04 5002 301: 19 (I can not get this line )

然后我把新线路推到Flink(通过Kafka)

01:17:28 5002 301: 15

我会得到的 01:16:04 5002 301: 19 ,但是 01:17:28 5002 301: 15 可能在Flink举行。

wlsrxk51

wlsrxk511#

这是因为它是事件时间,而事件的时间戳用于测量windows的时间流。
在这种情况下,当窗口中只有一个事件时,flink不知道应该忽略该窗口。因此,当您添加下一个事件时,上一个窗口将关闭并发出元素(在您的案例19中),但是下一个窗口将再次创建(在您的案例15中)。
在这种情况下,最好的办法可能是添加自定义 ProcessingTimeTrigger 它基本上允许你在一段时间后发射窗口,不管事件是否在流动。您可以在文档中找到有关触发器的信息。

oxcyiej7

oxcyiej72#

config tableenv让它提前发射:

TableConfig config = bbTableEnv.getConfig();
    config.getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
    config.getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");
mdfafbf1

mdfafbf13#

请问最后的解决办法是什么?我也遇到了类似的情况,可以通过使用新的水印(system.currttimemillis())来解决,但它似乎不符合水印的目的。这不是一个常见的问题,还是应用程序开发人员故意忽略它,而社区却忽略了它?
当我使用flink streaming sql group by tumble(rowtime)使用kafka消息时,为什么不准时呢?

相关问题