我有一个KTable(“person_table”),它在生成新的JSON数据时正确更新。
当我执行一个查询时,
select * from person_table;
字符串
更新的数据显示正确。
例如,如果我的主题(“person”)最初具有以下数据行
{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}
型
并且随后发生对其中一个记录的更新,即,向主题发送新的更新事件,则主题正确地显示为:
{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}
{"id": 2, "name": "mary anne"}
型
该表正确显示:
select * from person_table;
-----------------
id name
-----------------
1 John
2 Mary Anne
型
问题在于使用消费者“kafka-console-consumer”。这个消费者总是显示主题的全部内容,我希望它显示与KTable相同的信息,也就是说,消费者只显示控制台中最新的数据。
有谁能指出我错在哪里吗?
运行消费者的命令如下:
docker-compose exec kafka kafka-console-consumer --topic person_with_id --from-beginning --bootstrap-server kafka:9092
型
创建Ktable的命令如下:
create table person_table with(kafka_topic='person_with_id')
as
select id, latest_by_offset(name) as name
from person_key_stream
group by id
emit changes;
型
创建流的命令如下:
create stream person_stream(id bigint, name varchar)
with(kafka_topic='person', value_format='JSON');
create stream person_key_stream
as select * from person_stream partition by id;
型
1条答案
按热度按时间bnl4lu3b1#
无法从Kafka内置工具中查询KTable。使用任何消费者都会扫描整个主题。
相反,您需要使用ksql REST API来查询其HTTP服务器
这在Kafka文档的“交互式浏览器”部分的Kafka文档中有更详细的解释