KSQL以及如何使用kafka-console-consumer从KTable获取数据

oipij1gg  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(121)

我有一个KTable(“person_table”),它在生成新的JSON数据时正确更新。
当我执行一个查询时,

select * from person_table;

字符串
更新的数据显示正确。
例如,如果我的主题(“person”)最初具有以下数据行

{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}


并且随后发生对其中一个记录的更新,即,向主题发送新的更新事件,则主题正确地显示为:

{"id": 1, "name": "john"}
{"id": 2, "name": "mary"}
{"id": 2, "name": "mary anne"}


该表正确显示:

select * from person_table;

-----------------
id name
-----------------
1 John
2 Mary Anne


问题在于使用消费者“kafka-console-consumer”。这个消费者总是显示主题的全部内容,我希望它显示与KTable相同的信息,也就是说,消费者只显示控制台中最新的数据。
有谁能指出我错在哪里吗?
运行消费者的命令如下:

docker-compose exec kafka kafka-console-consumer --topic person_with_id --from-beginning --bootstrap-server kafka:9092


创建Ktable的命令如下:

create table person_table with(kafka_topic='person_with_id') 
as 
select id, latest_by_offset(name) as name
from person_key_stream 
group by id 
emit changes;


创建流的命令如下:

create stream person_stream(id bigint, name varchar)
  with(kafka_topic='person', value_format='JSON');

create stream person_key_stream
  as select * from person_stream partition by id;

bnl4lu3b

bnl4lu3b1#

无法从Kafka内置工具中查询KTable。使用任何消费者都会扫描整个主题。
相反,您需要使用ksql REST API来查询其HTTP服务器
这在Kafka文档的“交互式浏览器”部分的Kafka文档中有更详细的解释

相关问题