I am trying to understand what is the best way to achieve current time timestamps using Flink when producing a new record to Kafka
Does flink automatically fill the produced event with metadata containing the timestamp of the current time? Is that the best practice for the consumers or should we put the current time inside the event?
If I really want to put the current time of a processed event, how should I do it in Java? I am running flink in kubernetes, so I don't know if a simple current_time() call would be the ideal way of doing it, because task managers may be in different nodes, and I am not sure if the clock in each of them are going to be in sync.
2条答案
按热度按时间watbbzwu1#
When initializing a
KafkaSink
you have to provide aKafkaRecordSerializationSchema
, in theserialize
method you can set the timestamp associated to each element when building theorg.apache.kafka.clients.producer.ProducerRecord
. The timestamp theserialize
method receives will depend on your pipeline configuration. You can get more information about assigning timestamps and how Flink handles time in here: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/If you are not setting it, Kafka will automatically assign a timestamp to each record when receiving it (the ingestion time, which will basically be the processing time plus a slight delay).
In any case, achieving perfectly ordered processing time timestamps in a distributed application will face the problem you describe. Different nodes will have different clocks, even if all are synchronized using NTP. It is a big problem in distributed systems that requires significant effort to solve (if even possible).
A pragmatic approach that may be good enough is to just have all records that belong to the same key timestamped by the same node, this way you will have most of the time a perfectly ordered timestamp. Be aware that a rebalance or a correction of the clock (which NTP does periodically) will break these perfectly ordered timestamps per key for some records from time to time. If you have a
KeyedStream
and you assign the timestamp in a keyedmap
or let Kafka do it, you will get these mostly-ordered timestamps per key.rt4zxlrg2#
flink会自动用包含当前时间的时间戳的元数据填充生成的事件吗?这是消费者的最佳实践吗?还是我们应该把当前时间放在事件中?
是的,时间戳被设置为为该记录返回的任何值
TimestampAssigner
。因此,Flink转换可以保留原始记录的时间戳。我在kubernetes中运行flink,所以我不知道简单的current_time()调用是否是理想的方法,因为任务管理器可能位于不同的节点,我不确定每个节点的时钟是否同步。
我可以向你保证,它们不会同步,这就是为什么,为了简化分布式系统中的事情,我们并不真正依赖于挂钟,而是依赖于事件时间。