flink:将水印分配给flinkkafconsumer

gcuhipw9  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(430)

我有一个flinkkafkaconsumer定义如下 FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties) 我正在用 setStreamTimeCharacteristic(TimeCharacteristic.EventTime) .
现在我想用函数指定一个周期水印 assignTimestampsAndWatermarks ,但我不知道应该传递给该函数什么,因为在文档中,该函数的示例接收类型为的元素 MyType 用一个 getCreationTime() 我的消费者是string类型的。
在这种情况下,是否可以分配事件时间?
编辑:我想用作事件时间的时间是每个寄存器存储在Kafka中的时间。

emeijp43

emeijp431#

关于 EventTime 至少在定义上是与事件的创建时间而不是接收时间严格相关的。因此,如果您从kafka消费的事件具有某种时间戳(例如,如果您将json作为字符串消费,然后对其进行解析),那么您可以在 assignTimestampsAndWatermarks 功能。
如果你是普通人 String 对象,那么最好的方法就是使用自定义 KafkaDeserializationSchema 提取每个事件的kafka时间戳并使用它。
从技术上讲,您甚至可以使用计数器来人为地增加每条记录的时间戳(例如,将其增加1),但这似乎没有意义 EventTime 处理。

相关问题