Flink:当联合多个Kafka源并将它们合并在一起时,哪种水印策略是合适的?

cgfeq70w  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(130)

我是flink的新手,我有五个无限的Kafka源,它们的数据模式不同。我想减少消息,然后用相同的键外连接所有的Kafka源。所以我使用union将它们合并组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象,然后发送到下游。我有两个问题。
1.我想我找到了数据丢失的根本原因。总是有一个Kafka源水印晚于其他。所以它的消息将被丢弃。
1.对于重复的消息,我认为这与windown大小和联合运算符有关。这不是我的逻辑问题。

class CommonObj {

            var id: Long = 0
            var entityType: String? = null
            var timestamp: Long = System.currentTimeMillis()
            val eventMetas: MutableList<EventMeta> = mutableListOf()

            var kafkaStreamValue1: KafkaStreamObj1? = null
            var kafkaStreamValue2: KafkaStreamObj2? = null
            var kafkaStreamValue3: KafkaStreamObj3? = null
            var kafkaStreamValue4: KafkaStreamObj4? = null

            fun buildSinkObj(): SinkObj = ....
        }

字符串
这是一个Kafka的源代码。其他的Kafka源代码逻辑非常相似。

val watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps<KafkaStreamObj1>()
            .withIdleness(Duration.ofMinutes(1))

        val sourceStream1 = env.fromSource(
            getKafkaStream1(params),
            watermarkStrategy,
            "Kafka Source 1"
        )

        val kafkaSource1 = sourceStream1
            .filter { it != null }
            .map {
                EventObj<KafkaStreamObj1>(
                    it.id.toString() + it.entity, //this is key
                    it,                           //obj
                    it.sequence,                  //timestamp
                    mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
                )
            }
            .returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
            .keyBy {
                it.key }
            .window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
            .reduce { v1, v2 ->
                if (v1.obj.sequence > v2.obj.sequence) {
                    v1.eventMetaList.addAll(v2.eventMetaList)
                    v1
                } else {
                    v2.eventMetaList.addAll(v1.eventMetaList)
                    v2
                }
            }
            .map {
                val commonObj = CommonObj()
                commonObj.id = it.obj.id
                commonObj.entityType = it.obj.entity
                commonObj.timestamp = System.currentTimeMillis()
                commonObj.eventMetas.addAll(it.eventMetaList)
                commonObj.kafkaStreamValue1 = it.obj.entity
                commonObj
            }
            .returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
        return kafkaSource1


这个联盟的代码

kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
            .keyBy(keySelector)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
            .process(EventProcessFunction(params))
            .sinkTo(kafkaSink())


EventProcessFunction将所有Kafka源消息合并到CommonObj中。它将通过键和时间戳减少重复。
我删除了一些敏感信息。

更新2023-12-11
我做了一些优化。但总有一个Kafka源有背压,它的水印比联盟水印晚。所以晚的消息会被丢弃。据我所知,联盟将使用所有联盟源中较小的水印。虽然设置为1分钟空闲,但晚的源有流量,正在被消耗。为什么联盟水印被提前,而不是较小的水印?

2023年12月14日更新
谢谢kkrugler的建议。事实上,我不在乎事件是否迟到。如果它迟到了,就像往常一样处理它。所以我会尝试使用处理时间windown。
目前我已经更改为使用处理时间窗口和无水印。当我使用TumblingProcessingTimeWindows时,所有Kafka源都存在高背压。并且检查点会在10 m后超时。如何调整背压和检查点问题?
我试着调整窗口大小10- 30秒。但问题仍在继续。

7xllpg7q

7xllpg7q1#

要回答有关水印的主要问题,您应该通过(较新的)StreamExecutionEnvironment.fromSource()方法将水印策略应用于每个Kafka源,然后将流合并组合起来。

相关问题