创建ksql流:如何从复杂的json中提取值

cbeh67ev  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(532)

我试图在apache/kafka ksql中创建一个主题包含的流(有点复杂的json)

{
  "agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
  "created_at": "2018-02-17 16:00:00.000Z",
  "id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
  "event_type": "data",
  "total_charged_amount": {
    "tax_free_amount": null,
    "tax_amounts": [],
    "tax_included_amount": {
      "amount": 0.0241,
      "currency": "EUR"
    }
  }
  "used_service_units": [
    {
      "amount": 2412739,
      "currency": null,
      "unit_of_measure": "bytes"
    }
  ]
}

现在创建一个流很容易,只需要一些简单的东西,比如事件类型和创建时间。就像这样 CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json'); 但现在我需要访问旧的服务单元。。。。我想提取上面json中的“amount”
我该怎么做?

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');

结果

line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...

如果我创建一个这样的流

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');

然后sql会像这样在流上进行选择吗

SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;

这两种选择都不起作用。。。
这个给了我

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'

Code generation failed for SelectValueMapper
hjqgdpho

hjqgdpho1#

似乎这个问题的一个解决方案是将列数据类型设置为数组,即。

CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');

现在我可以做到以下几点:

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage

相关问题