Kafka中的INPUT_STREAM
是使用以下KESQL语句创建的:
CREATE STREAM INPUT_STREAM (year STRUCT<month STRUCT<day STRUCT<hour INTEGER, minute INTEGER>>>) WITH (KAFKA_TOPIC = 'INPUT_TOPIC', VALUE_FORMAT = 'JSON');
它使用字段year
、month
和day
以及hour
和minute
定义了四个级别的嵌套json模式,如下所示:
{
"year": {
"month": {
"day": {
"hour": string,
"minute": string
}
}
}
}
我想创建第二个OUTPUT_STREAM
,该OUTPUT_STREAM
将从INPUT_STREAM
读取消息,并将其字段名称重新Map到一些自定义名称。我想获取hour
和minute
值,并将它们放在字段one
和two
下面的嵌套json中,如下所示:
{
"one": {
"two": {
"hour": string,
"minute": string
}
}
}
接下来,我将Kesql语句组合在一起以创建OUTPUT_STREAM
CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR ONE->TWO->HOUR FROM INPUT_STREAM EMIT CHANGES;
该语句失败,并出现错误。这条语句中有语法错误吗?是否可以像我在这里所做的那样指定目标字段名称
...AS SELECT YEAR->MONTH->DAY->HOUR ONE->TWO->HOUR FROM...
?
我尝试使用STRUCT
而不是ONE->TWO->HOUR
:
CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR ONE STRUCT<TWO STRUCT<HOUR VARCHAR>> FROM INPUT_STREAM EMIT CHANGES;
它也出错了,不能工作
1条答案
按热度按时间kkbh8khc1#
在ksqlDB中创建结构
所以你的问题的答案是,是的,你有语法错误。