在Flink中使用当前时间戳生成事件的最佳方法是什么?

szqfcxe2  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(166)

I am trying to understand what is the best way to achieve current time timestamps using Flink when producing a new record to Kafka
Does flink automatically fill the produced event with metadata containing the timestamp of the current time? Is that the best practice for the consumers or should we put the current time inside the event?
If I really want to put the current time of a processed event, how should I do it in Java? I am running flink in kubernetes, so I don't know if a simple current_time() call would be the ideal way of doing it, because task managers may be in different nodes, and I am not sure if the clock in each of them are going to be in sync.

watbbzwu

watbbzwu1#

When initializing a KafkaSink you have to provide a KafkaRecordSerializationSchema, in the serialize method you can set the timestamp associated to each element when building the org.apache.kafka.clients.producer.ProducerRecord . The timestamp the serialize method receives will depend on your pipeline configuration. You can get more information about assigning timestamps and how Flink handles time in here: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/
If you are not setting it, Kafka will automatically assign a timestamp to each record when receiving it (the ingestion time, which will basically be the processing time plus a slight delay).
In any case, achieving perfectly ordered processing time timestamps in a distributed application will face the problem you describe. Different nodes will have different clocks, even if all are synchronized using NTP. It is a big problem in distributed systems that requires significant effort to solve (if even possible).
A pragmatic approach that may be good enough is to just have all records that belong to the same key timestamped by the same node, this way you will have most of the time a perfectly ordered timestamp. Be aware that a rebalance or a correction of the clock (which NTP does periodically) will break these perfectly ordered timestamps per key for some records from time to time. If you have a KeyedStream and you assign the timestamp in a keyed map or let Kafka do it, you will get these mostly-ordered timestamps per key.

rt4zxlrg

rt4zxlrg2#

flink会自动用包含当前时间的时间戳的元数据填充生成的事件吗?这是消费者的最佳实践吗?还是我们应该把当前时间放在事件中?
是的,时间戳被设置为为该记录返回的任何值TimestampAssigner。因此,Flink转换可以保留原始记录的时间戳。
我在kubernetes中运行flink,所以我不知道简单的current_time()调用是否是理想的方法,因为任务管理器可能位于不同的节点,我不确定每个节点的时钟是否同步。
我可以向你保证,它们不会同步,这就是为什么,为了简化分布式系统中的事情,我们并不真正依赖于挂钟,而是依赖于事件时间。

相关问题