ksqldb::从Map属性的现有流创建子流

ghhkc1vu  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(220)

我目前是这个领域的新手,在ksqldb工作。我想从json数据中创建子流,其中包含map属性。
我的json格式的数据来自现有的主题。

ksql> print Topic_name;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON_SR or KAFKA_STRING
rowtime: 2021/01/04 06:46:26.337 Z, key: <null>, value: {"fields":{"field1":"1","field2":"0","field3":"1","field4":"2"},"name":"test","tags":{"tag1":"abc","tag2":"def","tag3":"ghi","tag4":"xyz","tag5":"jkl"},"timestamp":34018905608}

使用现有主题为上述json创建的流

ksql>create stream test(fields map<string,string>, name varchar, tags map<string,string>, timestamp bigint) with(kafka_topic='Topic_name',value_format='JSON_SR');

我想从上面的流中创建子流,在其中一个字段上使用where子句,Map属性中的数据是动态的。目前我正在使用下面的查询来创建子流。

ksql>create stream test_substream as select * from test where TAGS['tag1']='abc' emit changes;

ksql> describe test_substream;

Name: TEST_SUBSTREAM
 Field      Type

 FIELDS       MAP<STRING, VARCHAR(STRING)>
 NAME         VARCHAR(STRING)
 TAGS         MAP<STRING, VARCHAR(STRING)>
 TIMESTAMP    BIGINT

但是当我描述子流时,我可以看到所有字段的数据都与原始流相似,但是当我检查与子流关联的主题(test\u substream topic)时,它显示的是数组,而不是map<varchar,varchar>datatype属性。因此,select query没有返回子流的任何数据,但是我可以看到预期的数据以数组格式出现在test\ u子流主题中。

The topic TEST_SUBSTREAM structure is as follows,
Value column(s)
Column name          Column type

FIELDS              ARRAY<VARCHAR>

NAME                VARCHAR

TAGS                ARRAY<VARCHAR>

TIMESTAMP           BIGINT

我期望数据出现在子流test\u子流中,该子流在原始流test中作为map属性接收。请帮忙。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题