我有一个flinkkafkaconsumer定义如下 FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)
我正在用 setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
.
现在我想用函数指定一个周期水印 assignTimestampsAndWatermarks
,但我不知道应该传递给该函数什么,因为在文档中,该函数的示例接收类型为的元素 MyType
用一个 getCreationTime()
我的消费者是string类型的。
在这种情况下,是否可以分配事件时间?
编辑:我想用作事件时间的时间是每个寄存器存储在Kafka中的时间。
1条答案
按热度按时间emeijp431#
关于
EventTime
至少在定义上是与事件的创建时间而不是接收时间严格相关的。因此,如果您从kafka消费的事件具有某种时间戳(例如,如果您将json作为字符串消费,然后对其进行解析),那么您可以在assignTimestampsAndWatermarks
功能。如果你是普通人
String
对象,那么最好的方法就是使用自定义KafkaDeserializationSchema
提取每个事件的kafka时间戳并使用它。从技术上讲,您甚至可以使用计数器来人为地增加每条记录的时间戳(例如,将其增加1),但这似乎没有意义
EventTime
处理。