从流中接收的数据应该在每日和每月级别进行聚合。考虑以下版本(但包括返回类型为数组的分支):https://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/kstream.html如何使用transform方法来执行这两种操作?基本上,每日聚合应该首先在执行每月迄今的聚合之前完成。
y0u0uwnf1#
如果您使用的是kafka流,那么还可以查看基于kafka流的ksql。它使您能够简单地使用sql来声明流处理,包括聚合:
CREATE TABLE DAILY_SALES_AGG AS \SELECT STORE, PRODUCT, SUM(NETT_SALES) AS DAILY_SALES_TOTAL \FROM SALES_STREAM WINDOW TUMBLING (SIZE 1 DAY) \GROUP BY STORE, PRODUCT; CREATE TABLE MONTHLY_SALES_AGG AS \SELECT STORE, PRODUCT, SUM(NETT_SALES) AS MONTHLY_SALES_TOTAL \FROM SALES_STREAM WINDOW TUMBLING (SIZE 28 DAYS) \GROUP BY STORE, PRODUCT;
CREATE TABLE DAILY_SALES_AGG AS \
SELECT STORE, PRODUCT, SUM(NETT_SALES) AS DAILY_SALES_TOTAL \
FROM SALES_STREAM WINDOW TUMBLING (SIZE 1 DAY) \
GROUP BY STORE, PRODUCT;
CREATE TABLE MONTHLY_SALES_AGG AS \
SELECT STORE, PRODUCT, SUM(NETT_SALES) AS MONTHLY_SALES_TOTAL \
FROM SALES_STREAM WINDOW TUMBLING (SIZE 28 DAYS) \
有关更多信息,请参阅ksql语法参考,包括聚合函数。有关ksql的更多信息,请参见:https://www.confluent.io/product/ksql/ksql文档ksql语法参考ksql快速入门教程ksql视频教程免责声明:我为confluent工作,他负责开发开源ksql项目
1条答案
按热度按时间y0u0uwnf1#
如果您使用的是kafka流,那么还可以查看基于kafka流的ksql。它使您能够简单地使用sql来声明流处理,包括聚合:
有关更多信息,请参阅ksql语法参考,包括聚合函数。
有关ksql的更多信息,请参见:
https://www.confluent.io/product/ksql/
ksql文档
ksql语法参考
ksql快速入门教程
ksql视频教程
免责声明:我为confluent工作,他负责开发开源ksql项目