我有如下的json
{
"customer_id": "CUST002",
"address_code": "1",
"addresses": [
{
"Address_Line1": "street-1",
"Address_Line2": "st mathews",
"Address_Line3": "liberal rk",
"City": "NJ"
},
{
"Address_Line1": "A1",
"Address_Line2": "A2",
"Address_Line3": "A3",
"City": "C2"
}
]
}
我已经为json创建了流,它是:
CREATE STREAM TEST_SOURCE1_ARRAY(
addresses ARRAY < STRUCT < Address_Line1 VARCHAR,
Address_Line2 VARCHAR, Address_Line3 VARCHAR,
City VARCHAR, state VARCHAR, zip VARCHAR >>,
customer_id VARCHAR, address_code VARCHAR
) WITH (
KAFKA_TOPIC = 'source_1_data_new',
VALUE_FORMAT = 'JSON_SR', partitions = 1
);
SELECT
SRC_JSON.customer_id,
CASE
WHEN SRC_JSON.address_code='1' THEN
ARRAY[STRUCT(
StreetName := EXPLODE(SRC_JSON.addresses)->Addr_1,
Province := EXPLODE(SRC_JSON.addresses)->Addr_2,
City := EXPLODE(SRC_JSON.addresses)->City
)
from FROM source_data AS SRC_JSON EMIT CHANGES;
当我执行上述查询时,记录被复制,即不是为一个客户返回一行,而是为数组中的每个对象复制记录。
{
"CUSTOMER_ID": "CUST002",
"Addr_1": "street-1",
"Addr_2": "st mathews",
"City": "NJ"
},
{
"CUSTOMER_ID": "CUST002",
"Addr_1": "A1",
"Addr_2": "A2",
"City": "C2"
}
我希望结果是
{
"CUSTOMER_ID": "CUST002",
"ADDRESSES": [
{
"Address_Line1": "street-1",
"Address_Line2": "st mathews",
"City": "NJ"
},
{
"Address_Line1": "A1",
"Address_Line2": "A2",
"City": "C2"
}
]
}
暂无答案!
目前还没有任何答案,快来回答吧!