我已经定义了一条流
CREATE STREAM QUOTE (quoteId VARCHAR,
counterPartyId VARCHAR)
WITH (KAFKA_TOPIC='quotes',
VALUE_FORMAT='JSON',
KEY='quoteId');
我想汇总到目前为止我得到了多少引用,以及那个事件的最后一个引用
CREATE TABLE KQUOTE AS
SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
FROM QUOTE
GROUP BY 1;
将此表转到流,因为我想知道聚合结果的历史记录(似乎我必须使用底层主题来创建流。无法直接从表“kquote”)创建流。
CREATE stream KQuoteStream (quoteId VARCHAR,
count INT)
WITH (KAFKA_TOPIC='KQUOTE',
VALUE_FORMAT='JSON',
KEY='quoteId');
我希望上面使用rawkey quoteid,但事实并非如此。正如我们在下面看到的,rawkey总是1(因为我们在创建表kquote时按常量1分组)。
ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21
尝试按quoteid重新划分流,以将rawkey更改为quoteid
CREATE stream KQuoteStreamByQuoteId
as
SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;
ramkey仍然是常量1
ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21
顺便说一句:所有的主题都和1有相同的分区,使事情更简单。有人知道吗?谢谢!
1条答案
按热度按时间yqkkidmi1#
这绝对是一个有趣的错误,你已经浮出水面!
这里的诀窍是要明白
WITH(KEY='quoteId')
实际上并没有做任何事情,这是对ksqldb的一个提示,即key字段恰好也存在于值中quoteId
. 然后,当你PARTITION BY quoteId
,它认为您正在按rowkey进行分区,所以它什么也不做!我同意这种行为是相当不直观的,这就是为什么我们计划删除WITH(KEY=...)
支持更直观的功能(待定)。同时,解决方法应该是在创建
KQuoteStream
所以ksql不会优化重分区。