如何使用Stream创建嵌套的JSON模式

lnxxn5zx  于 2022-10-07  发布在  Apache
关注(0)|答案(1)|浏览(193)

Kafka中的INPUT_STREAM是使用以下KESQL语句创建的:

CREATE STREAM INPUT_STREAM (year STRUCT<month STRUCT<day STRUCT<hour INTEGER, minute INTEGER>>>) WITH (KAFKA_TOPIC = 'INPUT_TOPIC', VALUE_FORMAT = 'JSON');

它使用字段yearmonthday以及hourminute定义了四个级别的嵌套json模式,如下所示:

{
  "year": {
    "month": {
      "day": {
        "hour": string,
        "minute": string
        }
      }
    }
}

我想创建第二个OUTPUT_STREAM,该OUTPUT_STREAM将从INPUT_STREAM读取消息,并将其字段名称重新Map到一些自定义名称。我想获取hourminute值,并将它们放在字段onetwo下面的嵌套json中,如下所示:

{
  "one": {
    "two": {
      "hour": string,
      "minute": string
      }
    }
}

接下来,我将Kesql语句组合在一起以创建OUTPUT_STREAM

CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR ONE->TWO->HOUR FROM INPUT_STREAM EMIT CHANGES;

该语句失败,并出现错误。这条语句中有语法错误吗?是否可以像我在这里所做的那样指定目标字段名称

...AS SELECT YEAR->MONTH->DAY->HOUR ONE->TWO->HOUR FROM...

我尝试使用STRUCT而不是ONE->TWO->HOUR

CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR ONE STRUCT<TWO STRUCT<HOUR VARCHAR>> FROM INPUT_STREAM EMIT CHANGES;

它也出错了,不能工作

kkbh8khc

kkbh8khc1#

在ksqlDB中创建结构

SELECT
  STRUCT(
    COLUMN_NEW_NAME_1 := OLD_COLUMN_NAME_1,
    COLUMN_NEW_NAME_2 := OLD_COLUMN_NAME_2
  ) as STRUCT_COLUMN_NAME
FROM OLD_TABLE_OR_STREAM

所以你的问题的答案是,是的,你有语法错误。

SELECT 
  STRUCT(
    TWO := STRUCT(
      HOUR := YEAR->MONTH->DAY->HOUR,
      MINUTE := YEAR->MONTH->DAY->MINUTE
    )
  ) as ONE
FROM INPUT_STREAM;

相关问题