如何读取用于创建流的嵌套avro字段?

5fjcxozz  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(374)

我有以下关于Kafka主题的avro信息。

{
"table": {
    "string": "Schema.xDEAL"
},
"op_type": {
    "string": "Insert"
},
"op_ts": {
    "string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
    "string": "2018-03-16 10:03:37.778000"
},
"pos": {
    "string": "00000000000000010722"
},
"before": null,
"after": {
    "row": {
        "DEA_PID_DEAL": {
            "string": "AAAAAAAA"
        },
        "DEA_NME_DEAL": {
            "string": "MY OGG DEAL"
        },
        "DEA_NME_ALIAS_NAME": {
            "string": "MY OGG DEAL"
        },
        "DEA_NUM_DEAL_CNTL": {
            "string": "4swb6zs4"
        }           
    }
}

}
当我运行以下查询时。它用空值创建流。

CREATE STREAM tls_deal (DEA_PID_DEAL VARCHAR, DEA_NME_DEAL varchar, DEA_NME_ALIAS_NAME VARCHAR, DEA_NUM_DEAL_CNTL VARCHAR) WITH (kafka_topic='deal-ogg-topic',value_format='AVRO', key = 'DEA_PID_DEAL');

但当我把avro消息改为following时,它就起作用了。

{
"table": {
    "string": "Schema.xDEAL"
},
"op_type": {
    "string": "Insert"
},
"op_ts": {
    "string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
    "string": "2018-03-16 10:03:37.778000"
},
"pos": {
    "string": "00000000000000010722"
},
"DEA_PID_DEAL": {
    "string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
    "string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
    "string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
    "string": "4swb6zs4"
}

}
现在,如果我运行上面的查询,数据将被填充。
我的问题是,如果我需要从嵌套字段填充流,我如何处理这个问题?
我无法在ksql文档页中找到解决方案。
提前谢谢。我很感激你的帮助

fsi0uk1n

fsi0uk1n1#

ksql目前(2018年3月22日/v0.5版)不支持嵌套avro。您可以使用单个消息转换来展平来自kafka connect的数据。例如,debezium与 UnwrapFromEnvelope .

lx0bsm1f

lx0bsm1f2#

正如robin所说,这目前不受支持(2018年3月22日/v0.5)。但是,它是一个跟踪的特性请求。您可能希望在ksql repo中增加投票或跟踪github问题:
https://github.com/confluentinc/ksql/issues/638

相关问题