在pyflink中访问kafka时间戳

2sbarzqh  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(635)

我正在尝试编写一个pyflink应用程序来测量延迟和吞吐量。我的数据来自kafka主题的json对象,并加载到 DataStream 使用 SimpleStringSchema -类进行反序列化。根据这篇文章的答案(如何在kafka和flink环境中测试性能?),我让kafka制作人在事件中添加时间戳,但现在我很难理解如何访问这些时间戳。我知道上面提到的文章提供了一个解决这个问题的方法,但是我很难将这个示例转换成python,因为文档/示例很少。
另一篇文章(apacheflink:如何在摄取时间模式下获取事件的时间戳?)建议我应该定义一个 ProcessFunction 相反。然而,这里我也不确定语法。我可能不得不这样做(摘自:https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py)

class MyProcessFunction():

    def process_element(self, value, ctx):
        result = value.get_time_stamp()
        yield result

正确的方法是什么 value.get_time_stamp() 在这里?或者有没有更简单的方法来解决我不知道的问题?
谢谢!

lndjwyie

lndjwyie1#

设置由kafka主题支持的表时,可以为kafka时间戳声明一个虚拟列,如 event_time 本例中的列:

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

请参阅flink的kafka表连接器的文档,以获取有关在kafka头中使用元数据的更多信息。

相关问题