在TVF之前执行Flink Sql重复数据消除

hi3rlvi2  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(182)

We are trying to run a FlinkSQL query that applies some deduplication and then windows and aggregates the result of that deduplication, but running into the following error at query plan time: org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[order_id], order=[ROWTIME])
We managed to get a simple example query reproducing this issue:

CREATE TABLE Orders (
  order_id STRING, 
  user_id STRING, 
  product STRING, 
  num BIGINT, 
  user_action_time TIMESTAMP(3), 
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen');

WITH Deduplicated AS (
  SELECT 
    order_id, 
    user_id, 
    product, 
    num, 
    user_action_time 
  FROM 
    (
      SELECT 
        *, 
        ROW_NUMBER() OVER (
          PARTITION BY order_id 
          ORDER BY 
            user_action_time ASC
        ) AS row_num 
      FROM 
        Orders
    ) 
  WHERE 
    row_num = 1
) 
SELECT 
  user_id, 
  SUM(num) as num_sum 
FROM 
  TABLE(
    TUMBLE(
      TABLE Deduplicated, 
      DESCRIPTOR(user_action_time), 
      INTERVAL '5' MINUTES
    )
  ) 
GROUP BY 
  user_id, 
  window_start, 
  window_end

If the same query is run using PROCTIME instead of ROWTIME , the query runs successfully.
We are using flink 1.15.0
Is this expected behavior?

5lhxktic

5lhxktic1#

你能试试这个吗?我不知道是什么问题.

WITH Deduplicated AS (
  SELECT 
    order_id, 
    user_id, 
    product, 
    num, 
    user_action_time 
  FROM 
    (
      SELECT 
        *, 
        ROW_NUMBER() OVER (
          PARTITION BY order_id 
          ORDER BY 
            user_action_time ASC
        ) AS row_num 
      FROM 
        Orders
    ) 
  WHERE 
    row_num = 1
) 
SELECT 
  user_id, 
  SUM(num) as num_sum 
FROM Deduplicated
GROUP BY user_id, TUMBLE(user_action_time, INTERVAL '5' MINUTES);
x4shl7ld

x4shl7ld2#

这目前是不可能的。你可以跟踪https://issues.apache.org/jira/browse/FLINK-27539(还不确定这是一个bug还是Flink的新特性)

相关问题