Flink Kinesis Analytics SQL查询以缩小不发送数据的传感器范围

cetgtptt  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(123)

Context: We use Kinesis analytics to process our sensor data and find anomalies in the sensor data.
Goal: We need to identify the sensors that didn’t send the data for the past X minutes.

The following methods have been tried with Kinesis analytics SQL, but no luck:

  • Stagger Window technique works for the first 3 test cases, but doesn't work for test case 4.
CREATE OR REPLACE PUMP "STREAM_PUMP_ALERT_DOSCONNECTION" AS INSERT INTO "INTERMEDIATE_SQL_STREAM" SELECT STREAM "deviceID" as "device_key", count("deviceID") as "device_count", ROWTIME as "time" FROM  "INTERMEDIATE_SQL_STREAM_FOR_ROOM"
    WINDOWED BY STAGGER (
        PARTITION BY "deviceID", ROWTIME  RANGE INTERVAL '1' MINUTE);
  • Left join and group by queries mentioned below doesn't work for all the test cases.
    Query 1:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
    INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
        SELECT STREAM
        ROWTIME as "resultrowtime",
        Input2."device_key" as "device_key",
    FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
    OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
    LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
    ON
        Input1."device_key" = Input2."device_key"
        AND Input1.ROWTIME <> Input2.ROWTIME;

Query 2:

CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
    SELECT STREAM
    ROWTIME as "resultrowtime",
    Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
    Input1."device_key" = Input2."device_key"
    AND TSDIFF(Input1, Input2) > 0;

Query 3:

CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
    SELECT STREAM
    ROWTIME as "resultrowtime",
    Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
    Input1."device_key" = Input2."device_key"
    AND Input1.ROWTIME = Input2.ROWTIME;
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP2" AS
    INSERT INTO "DIS_CONN_DEST_SQL_STREAM_ALERT"
        SELECT STREAM "device_key", "count"
        FROM (
            SELECT STREAM
                "device_key",
                COUNT(*) as "count"
            FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM2
            GROUP BY FLOOR(INTERMEDIATE_SQL_STREAM_FOR_ROOM2.ROWTIME TO MINUTE), "device_key"
            )
        WHERE "count" = 1;

Here are test cases for your reference:

Test case 1:

  • Device 1 and Device 2 send data continuously to the Kinesis Analytics.
  • After X minutes, Device 2 continues to send the data, but device 1 is not sending the data.
    Output for test case 1:

We want the Kinesis Analytics to pop out Device 1, so that we know which device is not sending data.

Test case 2 (Interval - 10 minutes)

  • Device 1 sends data at 09:00
  • Device 2 sends data at 09:02
  • Device 2 again sends the data at 09:11, but Device 1 doesn’t send any data.
    Output for test case 2:

We want the Kinesis Analytics to pop out Device 1, so that we know which device is not sending data.

Test case 3 (Interval - 10 minutes)

  • Device 1 and device 2 send data continuously to kinesis analytics.
  • Both devices (1 & 2) don't send any data for the next 15 minutes.
    Output for test case 3:

We want the Kinesis Analytics to pop out Device 1 & Device 2, so that we know which devices are not sending data.

Test case 4: (Interval - 10 mins)

  • Device 1 sends data at 09:00
  • Device 2 sends data at 09:02
  • Device 1 again sends data at 09:04
  • Device 2 again sends data at 09:06
  • Then no data
    Output for test case 4:

We want the analytics to pop out device 1 at 09:14 and pop out device 2 at 09:16. So that we can get the disconnected devices(i.e devices not sending data) after the exact interval.

Note: AWS Support directed us to simple queries that don't answer the question. Looks like they can help with the exact query only if we upgrade our support plan.

nlejzf6q

nlejzf6q1#

我并不熟悉AWS扩展或修改Apache Flink的所有方式,但开源Flink并没有提供一种简单的方法来检测所有源是否停止发送数据。一种解决方案是使用类似于进程函数的东西,带有处理时间计时器来检测数据的缺失。
文档沿着有这样一个例子:https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/#example

相关问题