我用flink-cep来检测Kafka事件的模式。为简单起见,事件只有一种类型。我试图检测连续事件流中字段值的变化。代码如下所示
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event, _] =
Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
kafka主题有104个分区,事件平均分布在各个分区上。当我提交这份工作时, parallelism
设置为104。
在webui中,有两个任务:第一个是 Source->filter->map->timestamp/watermark
; 第二个是 CepOperator->sink
. 每个任务有104个并行性。
子任务的工作量是不均衡的,它应该来自 keyBy
. 子任务之间的水印是不同的,但它们开始停留在一个值上,很长一段时间没有变化。从日志中,我可以看到cep不断评估事件,匹配的结果被推送到下游接收器。
事件发生率为10k/s,第一个任务的背压保持不变 high
第二个呢 ok
.
请帮助解释在cep中发生了什么以及如何解决这个问题
谢谢
1条答案
按热度按时间pkwftd7m1#
我仔细考虑了你的问题,正在修改我的答案。
听起来cep正在继续产生匹配,它们被推到了下沉,但是cep+sink任务产生了很高的反压力。有助于确定背压的原因。
如果可以从所有分区读取事件,而水印只是勉强前进,那么听起来背压严重到足以阻止事件被接收。
我怀疑
cep引擎中的组合爆炸,和/或
足够的火柴让Flume跟不上
可能的原因。
获得更多洞察力的一些想法:
(1) 尝试使用探查器来确定cep操作符是否是瓶颈,并确定它正在做什么。
(2) 禁用cep操作符和sink之间的操作符链接以隔离cep——这只是一个调试步骤。这将使您更好地了解cep和sink各自在做什么(通过度量和背压监控)。
(3) 在较小的设置中测试这个,并展开cep日志记录。