如何使用Kafka和浮士德检查在给定的时间段内是否发送了新记录

6qfn3psc  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(422)

我正在使用一个测试设置,包括汇合平台(docker)和am处理包含以下信息的记录:传感器id、时间戳、值。使用robinhood的《浮士德》(类似于kafka streams,但在python中),我试图做到以下几点:
只要有一个传感器的新记录,就应该有一个“计时器”,如果在给定的时间内没有收到该传感器id的新记录,则应该有一个错误,指示该传感器/机器可能出现故障。
我试过使用 time.sleep() 但它只会休眠10秒,然后处理下一个记录。
有没有可能用我正在使用的设置做这样的事情?

3vpjnl9f

3vpjnl9f1#

您可以使用ksql的窗口滚动:
创建传感器信息流;

CREATE STREAM sensorinformation \
  (sensorid VARCHAR, \
   sensortimestamp BIGINT, \
   value VARCHAR) \
 WITH (KAFKA_TOPIC='sensorinformationtopic', \
       VALUE_FORMAT='DELIMITED', \
       KEY='sensorid', \
       TIMESTAMP='sensortimestamp');

最后创建一个表,其中包含在10秒的时间窗口内只出现一次的故障传感器:

CREATE TABLE faulty_sensors AS \
  SELECT sensorid, \
         count(*) \
  FROM sensorinformation \
  WINDOW TUMBLING (SIZE 10 SECONDS) \
  GROUP BY sensorid \
  HAVING count(*) = 1;

相关问题