如何使用复合键从主题创建ksql表?

mw3dktmi  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(363)

假设我有一个关于温度预测数据的主题,如下所示:

2018-10-25,Melbourne,21
2018-10-26,Melbourne,17
2018-10-27,Melbourne,21
2018-10-25,Sydney,22
2018-10-26,Sydney,20
2018-10-27,Sydney,23
2018-10-26,Melbourne,18
2018-10-27,Melbourne,22
2018-10-26,Sydney,21
2018-10-27,Sydney,24

每个条目都包含一个日期、一个城市和一个预测温度,并表示该城市在该日期的预测更新。我可以将其描述为如下ksql流:

CREATE STREAM forecasts_csv ( \
  date VARCHAR, \
  city VARCHAR, \
  temperature INTEGER \
) WITH (kafka_topic='forecasts-csv', value_format='DELIMITED');

现在,我需要一个表格,它表示每个城市当前(即最新)的预测温度,以及该预测随时间变化的最小值和最大值。所需输出示例如下:

{ date='2018-10-27', city='Melbourne', latest=22, min=21, max=22 }

我怎样才能做到这一点?
我已经设法得到了如下的聚合(最小值/最大值):

CREATE STREAM forecasts_keyed \
WITH (partitions=4, value_format='JSON') \
AS SELECT date + '/' + city AS forecast_key, * \
FROM forecasts_csv \
PARTITION BY forecast_key;

CREATE TABLE forecasts_minmax \
WITH (partitions=4, value_format='JSON') \
AS SELECT forecast_key, date, city, \
          min(temperature) as min, max(temperature) as max \
FROM forecasts_keyed \
GROUP by forecast_key, date, city;

它给我的输出信息如下:

{"FORECAST_KEY":"2018-10-27/Melbourne","DATE":"2018-10-27","CITY":"Melbourne","MIN":21,"MAX":22}

但我不知道如何把这些和“最新”的阅读结合起来。

j0pj023g

j0pj023g1#

你需要实现一个udaf,我们称之为udaf LATEST ,它保留给定列和键的最新值。这非常简单,您可以了解如何在ksql文档中添加自定义udaf:https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#udafs
假设你有 LATEST udaf可用您可以编写以下查询:

CREATE TABLE foo AS
SELECT
  date,
  city,
  MIN(temperature) AS minValue,
  MAX(temperature) AS maxValue,
  LATEST(temperature) AS latestValue
FROM forecasts_csv
GROUP BY date, city;

相关问题