我们正在使用flink sql来构建窗口组聚合。我们将结果存储在mongo db中。我们已经定义了一个主键表插入数据到mongo数据库。
CREATE VIEW USER_TABLE
AS
SELECT
window_start WINDOW_START,
window_end WINDOW_END,
USER_ID,
SUM(PURCHASE_AMOUNT) PURCHASE_AMOUNT,
COUNT(*) PURCHASE_COUNT
FROM TABLE(
HOP(
DATA => TABLE USER_SRC,
TIMECOL => DESCRIPTOR(PURCHASE_TIMESTAMP),
SLIDE => INTERVAL '1' DAY,
SIZE => INTERVAL '5' DAYS))
GROUP BY window_start, window_end, GROUPING SETS ((PURCHASE_TIMESTAMP));
结果按预期存储在mongodb中。
假设用户几个月前购买了一些物品。最近一个月根本没买过东西该用户的数据需要从mongo db中删除。
我们如何才能做到这一点?
换句话说,我们如何从分组窗口聚合中删除接收器处的旧状态。
+更新示例
Window 1:
Key 1 -> Occurs 200 times.
Key 2 -> Occurs 100 times.
-----------
End of Window 1 in MongoDb sink
Key 1 = 200 (inserted)
Key 2 = 100 (inserted.
-------
Window 2:
Key 1 -> Doesn't occur. zero times.
Key 2 -> Occurs 150 times.
-----------
End of Window 2 in MongoDb sink
Key 1 = 200 (Unchanged) **(Here we want the key 1 to be deleted or updated to zero. How to achieve this...)**
Key 2 = 100 (Updated)
-------
Window 3:
Key 1 -> Occurs 20 times
Key 2 -> Occurs 50 times.
-----------
End of Window 2 in MongoDb sink
Key 1 = 20 (Updated)
Key 2 = 50 (Updated)
-------
1条答案
按热度按时间oyt4ldly1#
在您的示例中,您只创建了一个视图。视图不会具体化,因此没有结果存储在Flink的状态中。你可以在https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/上的Flink动态表解释器中阅读更多信息
只有在运行
SELECT * from USER_TABLE
或INSERT INTO x SELECT * from USER_TABLE
之类的程序时,才会存储状态。当使用Window TVF函数执行此操作时(就像您正在执行的操作一样),状态将在窗口关闭后自动清除。