有一个主题,包含普通的json消息,它试图通过从json中提取几列,并将另一列作为varchar和消息的值来创建一个新的流。
下面是主题中的示例消息
{
"db": "mydb",
"collection": "collection",
"op": "update"
}
创建一个类似-
CREATE STREAM test1 (
db VARCHAR,
collection VARCHAR
VAL STRING
) WITH (
KAFKA_TOPIC = 'topic_json',
VALUE_FORMAT = 'JSON'
);
这个流的输出将只包含db和collection列,如何添加另一列作为消息的值- "{\"db\":\"mydb\",\"collection\":\"collection\",\"op\":\"update\"}"
暂无答案!
目前还没有任何答案,快来回答吧!