水印在flink cep中远远落后

g6baxovj  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(411)

我用flink-cep来检测Kafka事件的模式。为简单起见,事件只有一种类型。我试图检测连续事件流中字段值的变化。代码如下所示

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))

val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

kafka主题有104个分区,事件平均分布在各个分区上。当我提交这份工作时, parallelism 设置为104。
在webui中,有两个任务:第一个是 Source->filter->map->timestamp/watermark ; 第二个是 CepOperator->sink . 每个任务有104个并行性。
子任务的工作量是不均衡的,它应该来自 keyBy . 子任务之间的水印是不同的,但它们开始停留在一个值上,很长一段时间没有变化。从日志中,我可以看到cep不断评估事件,匹配的结果被推送到下游接收器。
事件发生率为10k/s,第一个任务的背压保持不变 high 第二个呢 ok .
请帮助解释在cep中发生了什么以及如何解决这个问题
谢谢

pkwftd7m

pkwftd7m1#

我仔细考虑了你的问题,正在修改我的答案。
听起来cep正在继续产生匹配,它们被推到了下沉,但是cep+sink任务产生了很高的反压力。有助于确定背压的原因。
如果可以从所有分区读取事件,而水印只是勉强前进,那么听起来背压严重到足以阻止事件被接收。
我怀疑
cep引擎中的组合爆炸,和/或
足够的火柴让Flume跟不上
可能的原因。
获得更多洞察力的一些想法:
(1) 尝试使用探查器来确定cep操作符是否是瓶颈,并确定它正在做什么。
(2) 禁用cep操作符和sink之间的操作符链接以隔离cep——这只是一个调试步骤。这将使您更好地了解cep和sink各自在做什么(通过度量和背压监控)。
(3) 在较小的设置中测试这个,并展开cep日志记录。

相关问题