ksql中的可变长度列表

lokaqttq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(467)

在ksql中,可以使用 EXTRACTJSONFIELD 对于嵌套结构,但我不知道如何处理可变长度列表。例如:

  1. {"id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}

我能应付 quux 作为基本流的varchar,

  1. create stream mystream (id bigint, quux varchar)
  2. with (kafka_topic='mytopic', value_format='json')

但我想把它变成一张table:

  1. quuxid x y
  2. 1 1 2
  3. 1 3 4
  4. 1 5 6

如何处理ksql中的可变长度列表?

piok6c0g

piok6c0g1#

这在ksql中目前是不可能的。
如您所见,您可以通过索引访问数组数据,具体如下:填充测试数据:

  1. echo '{"id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}' | \
  2. kafkacat -b localhost:9092 -t quux

检查ksql中的消息:

  1. ksql> print 'quux' from beginning;
  2. Format:JSON
  3. {"ROWTIME":1528791985250,"ROWKEY":"null","id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}

创建流:

  1. create stream mystream (id bigint, quux varchar) \
  2. with (kafka_topic='quux', value_format='json');

查询流:

  1. ksql> SET 'auto.offset.reset' = 'earliest';
  2. Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'
  3. ksql>
  4. ksql> select * from mystream;
  5. 1528791985250 | null | 1 | [{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]

创建流 array :

  1. ksql> CREATE STREAM mystream2 (id bigint, quux array<varchar>) with (kafka_topic='quux', value_format='json');
  2. Message
  3. ----------------
  4. Stream created
  5. ----------------

按索引访问单个项目:

  1. ksql> SELECT quux[0], EXTRACTJSONFIELD(quux[0],'$.x') AS X, EXTRACTJSONFIELD(quux[0],'$.y') AS Y from mystream2;
  2. {"x":1,"y":2} | 1 | 2

但你要找的是 EXPLODE 函数,它在ksql中还不存在。
相关github问题:
https://github.com/confluentinc/ksql/issues/1324
https://github.com/confluentinc/ksql/issues/527
https://github.com/confluentinc/ksql/issues/1413 (我刚刚看到你已经登录了,谢谢你:)

展开查看全部

相关问题