我正在使用confluent编写一个查询,以获取Kafka主题的5分钟窗口中的第一个时间戳。下面是一个问题(我知道这不是一个很好的方法):
CREATE STREAM start_metric_value AS
select metric_value
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;
但我有个错误:
predicate 的代码生成失败:找不到任何名为“min”的函数。表达式:(metric\u datetime\u utc=min(timestamptostring(metric\u datetime\u utc,'yyyy-mm-dd hh:mm:ss')),架构: ROWKEY
字符串键, ID
字符串, METRIC_NAME
字符串, METRIC_VALUE
字符串, METRIC_DATETIME_UTC
比基特, METRIC_INDEX
字符串, IANA_TIMEZONE
字符串, PROCESSED_DATETIME_UTC
比基特, DATA_TYPE
字符串, ASSET_TYPE
字符串, ROWTIME
比基特, ROWKEY
字符串原因:找不到任何名为“min”的函数
有人知道怎么解决这个问题吗
1条答案
按热度按时间js5cn81o1#
不是100%清楚你想要达到的目标。请参阅上面关于添加更多细节以帮助人们了解您想要实现的目标的问题的评论。
也就是说,我可以说。。。。
这个
Min
功能不被认可有两个原因:你正在传递
TIMESTAMPTOSTRING
至MIN
,但是MIN
不需要字符串。不能在中使用聚合函数
WHERE
条款。您看到的错误消息看起来像个bug。如果它仍然存在于最新版本的ksqldb中,您可能希望在ksqldb github项目中提出一个问题。
即使纠正这两件您正在查询的事情仍然会失败,因为ksqldb中的窗口化需要聚合,所以您需要一个
GROUP BY
.例如,如果您想捕获
metric_datetime_utc
每metric_value
对于每个5分钟的窗口,您可以使用:这将创建一个带窗口的表,即一个由键组成的表
metric_value
以及WINDOWSTART
时间。minTs
将存储所看到的最小日期时间。让我们通过查询运行一些数据来了解发生了什么:
输入:
输出到
START_METRIC_VALUE
主题可能是(注意:度量值和windowstart将存储在kafka记录的键中,而mints将存储在值中):实际输出到主题的内容将取决于您的
cache.max.bytes.buffering
. 将此设置为0
,关闭缓冲,将看到上述输出。但是,启用缓冲后,一些中间结果可能不会输出到kafka,尽管每个窗口的最终结果将保持不变。您还可以使用即将推出的抑制功能控制输出到kafka的内容上面的解决方案给出了每个度量值的最小时间戳。如果希望每个窗口都有一个全局最小日期时间,那么可以
GROUP BY
一个常数。注意,这会将所有事件路由到单个ksqldb节点,因此不能作为解决方案很好地扩展。如果缩放是一个问题,则有解决方案,例如首先计算最小值metric_value
然后对其进行后处理,找到全局最小值。注意:ksqldb版本0.10的语法是正确的。您可能需要调整其他版本。