ksql无法识别min函数的原因

nfs0ujit  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(372)

我正在使用confluent编写一个查询,以获取Kafka主题的5分钟窗口中的第一个时间戳。下面是一个问题(我知道这不是一个很好的方法):

CREATE STREAM start_metric_value AS
select metric_value 
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;

但我有个错误:
predicate 的代码生成失败:找不到任何名为“min”的函数。表达式:(metric\u datetime\u utc=min(timestamptostring(metric\u datetime\u utc,'yyyy-mm-dd hh:mm:ss')),架构: ROWKEY 字符串键, ID 字符串, METRIC_NAME 字符串, METRIC_VALUE 字符串, METRIC_DATETIME_UTC 比基特, METRIC_INDEX 字符串, IANA_TIMEZONE 字符串, PROCESSED_DATETIME_UTC 比基特, DATA_TYPE 字符串, ASSET_TYPE 字符串, ROWTIME 比基特, ROWKEY 字符串原因:找不到任何名为“min”的函数
有人知道怎么解决这个问题吗

js5cn81o

js5cn81o1#

不是100%清楚你想要达到的目标。请参阅上面关于添加更多细节以帮助人们了解您想要实现的目标的问题的评论。
也就是说,我可以说。。。。
这个 Min 功能不被认可有两个原因:
你正在传递 TIMESTAMPTOSTRINGMIN ,但是 MIN 不需要字符串。
不能在中使用聚合函数 WHERE 条款。
您看到的错误消息看起来像个bug。如果它仍然存在于最新版本的ksqldb中,您可能希望在ksqldb github项目中提出一个问题。
即使纠正这两件您正在查询的事情仍然会失败,因为ksqldb中的窗口化需要聚合,所以您需要一个 GROUP BY .
例如,如果您想捕获 metric_datetime_utcmetric_value 对于每个5分钟的窗口,您可以使用:

CREATE TABLE start_metric_value AS
  SELECT
    metric_value,
    MIN(metric_datetime_utc) as minTs
  FROM dataaggregaion 
  WINDOW TUMBLING (SIZE 5 MINUTE)
  GROUP BY metric_value;

这将创建一个带窗口的表,即一个由键组成的表 metric_value 以及 WINDOWSTART 时间。 minTs 将存储所看到的最小日期时间。
让我们通过查询运行一些数据来了解发生了什么:
输入:

rowtime | metric_value  | metric_datetime_utc
--------|---------------|--------------------
 1      |  A            | 3
 2      |  A            | 4
 3      |  A            | 2
 4      |  B            | 5
 300000 |  A            | 6

输出到 START_METRIC_VALUE 主题可能是(注意:度量值和windowstart将存储在kafka记录的键中,而mints将存储在值中):

metric_value | windowStart | minTs 
-------------|-------------|------
 A           | 0           | 3
 A           | 0           | 3
 A           | 0           | 2
 B           | 0           | 5
 A           | 300000      | 6

实际输出到主题的内容将取决于您的 cache.max.bytes.buffering . 将此设置为 0 ,关闭缓冲,将看到上述输出。但是,启用缓冲后,一些中间结果可能不会输出到kafka,尽管每个窗口的最终结果将保持不变。您还可以使用即将推出的抑制功能控制输出到kafka的内容
上面的解决方案给出了每个度量值的最小时间戳。如果希望每个窗口都有一个全局最小日期时间,那么可以 GROUP BY 一个常数。注意,这会将所有事件路由到单个ksqldb节点,因此不能作为解决方案很好地扩展。如果缩放是一个问题,则有解决方案,例如首先计算最小值 metric_value 然后对其进行后处理,找到全局最小值。

CREATE TABLE start_metric_value AS
  SELECT
    1 as Key,
    MIN(metric_datetime_utc) as minTs
  FROM dataaggregaion 
  WINDOW TUMBLING (SIZE 5 MINUTE)
  GROUP BY 1;

注意:ksqldb版本0.10的语法是正确的。您可能需要调整其他版本。

相关问题