ksql streams ksql db:使用struct数组(返回数组中每个对象的重复记录

fykwrbwg  于 2021-07-15  发布在  Kafka
关注(0)|答案(0)|浏览(267)

我有如下的json

  1. {
  2. "customer_id": "CUST002",
  3. "address_code": "1",
  4. "addresses": [
  5. {
  6. "Address_Line1": "street-1",
  7. "Address_Line2": "st mathews",
  8. "Address_Line3": "liberal rk",
  9. "City": "NJ"
  10. },
  11. {
  12. "Address_Line1": "A1",
  13. "Address_Line2": "A2",
  14. "Address_Line3": "A3",
  15. "City": "C2"
  16. }
  17. ]
  18. }

我已经为json创建了流,它是:

  1. CREATE STREAM TEST_SOURCE1_ARRAY(
  2. addresses ARRAY < STRUCT < Address_Line1 VARCHAR,
  3. Address_Line2 VARCHAR, Address_Line3 VARCHAR,
  4. City VARCHAR, state VARCHAR, zip VARCHAR >>,
  5. customer_id VARCHAR, address_code VARCHAR
  6. ) WITH (
  7. KAFKA_TOPIC = 'source_1_data_new',
  8. VALUE_FORMAT = 'JSON_SR', partitions = 1
  9. );
  10. SELECT
  11. SRC_JSON.customer_id,
  12. CASE
  13. WHEN SRC_JSON.address_code='1' THEN
  14. ARRAY[STRUCT(
  15. StreetName := EXPLODE(SRC_JSON.addresses)->Addr_1,
  16. Province := EXPLODE(SRC_JSON.addresses)->Addr_2,
  17. City := EXPLODE(SRC_JSON.addresses)->City
  18. )
  19. from FROM source_data AS SRC_JSON EMIT CHANGES;

当我执行上述查询时,记录被复制,即不是为一个客户返回一行,而是为数组中的每个对象复制记录。

  1. {
  2. "CUSTOMER_ID": "CUST002",
  3. "Addr_1": "street-1",
  4. "Addr_2": "st mathews",
  5. "City": "NJ"
  6. },
  7. {
  8. "CUSTOMER_ID": "CUST002",
  9. "Addr_1": "A1",
  10. "Addr_2": "A2",
  11. "City": "C2"
  12. }

我希望结果是

  1. {
  2. "CUSTOMER_ID": "CUST002",
  3. "ADDRESSES": [
  4. {
  5. "Address_Line1": "street-1",
  6. "Address_Line2": "st mathews",
  7. "City": "NJ"
  8. },
  9. {
  10. "Address_Line1": "A1",
  11. "Address_Line2": "A2",
  12. "City": "C2"
  13. }
  14. ]
  15. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题