我正在尝试编写一个pyflink应用程序来测量延迟和吞吐量。我的数据来自kafka主题的json对象,并加载到 DataStream
使用 SimpleStringSchema
-类进行反序列化。根据这篇文章的答案(如何在kafka和flink环境中测试性能?),我让kafka制作人在事件中添加时间戳,但现在我很难理解如何访问这些时间戳。我知道上面提到的文章提供了一个解决这个问题的方法,但是我很难将这个示例转换成python,因为文档/示例很少。
另一篇文章(apacheflink:如何在摄取时间模式下获取事件的时间戳?)建议我应该定义一个 ProcessFunction
相反。然而,这里我也不确定语法。我可能不得不这样做(摘自:https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py)
class MyProcessFunction():
def process_element(self, value, ctx):
result = value.get_time_stamp()
yield result
正确的方法是什么 value.get_time_stamp()
在这里?或者有没有更简单的方法来解决我不知道的问题?
谢谢!
1条答案
按热度按时间lndjwyie1#
设置由kafka主题支持的表时,可以为kafka时间戳声明一个虚拟列,如
event_time
本例中的列:请参阅flink的kafka表连接器的文档,以获取有关在kafka头中使用元数据的更多信息。