使用flink sql如何unest一个数组并与另一个表连接,然后将结果聚合到json数组

d7v8vwbk  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(250)

我有一个表1(源代码是Kafka)事件作为

  1. {
  2. "id": "id-1",
  3. "segs": ["seg-id-1", "segs-id-2"]
  4. }

字符串
我有表2(源代码是Kafka)事件作为

  1. {
  2. "id": "seg-id-1",
  3. "name": "segs-name-1"
  4. }


我有我的sql作为

  1. CREATE VIEW IF NOT EXISTS FINAL_UNESTED_WITH_SEGMENTS AS (
  2. SELECT
  3. id
  4. segs_id,
  5. sc.name as segs_name,
  6. from table1 r CROSS JOIN unnest(json_string_array_to_array(segs)) AS SegmentContentTable (unnested_segments_id)
  7. LEFT JOIN table2 sc on (unnested_segments_id = sc.id));


这里json_string_array_to_array是一个自定义UDF,它将JSON字符串转换为数组。
一旦我有了这个,我就可以在这个上面创建一个视图,然后运行select as

  1. inert into final_table
  2. select json_agg_custom('seg_id', unnested_segments_id, 'seg_name', segment_name) as segs
  3. from FINAL_UNESTED_WITH_SEGMENTS group by id;


这里的最终汇是Kafka的主题。关于Kafka的主题。我看到两条不同消息出现

  1. {
  2. "id": "id-1"
  3. "segs": [{"id":"seg-id-1", "name":"segs-name-1"}]
  4. }
  5. {
  6. "id": "id-1"
  7. "segs": [{"id":"seg-id-2", "name":"segs-name-2"}]
  8. }


要我想看到的是这个,我怎么才能实现这个呢?

  1. {
  2. "id": "id-1",
  3. "segs": [
  4. {
  5. "id": "seg-id-1",
  6. "name": "segs-name-1"
  7. },
  8. {
  9. "id": "seg-id-2",
  10. "name": "segs-name-2"
  11. }
  12. ]
  13. }

zsohkypk

zsohkypk1#

自定义udf json_agg_custom定义为

  1. package com.sailpoint.udf;
  2. import com.fasterxml.jackson.core.JsonFactory;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.fasterxml.jackson.databind.SerializationFeature;
  5. import com.fasterxml.jackson.databind.node.ObjectNode;
  6. import org.apache.flink.table.annotation.DataTypeHint;
  7. import org.apache.flink.table.annotation.FunctionHint;
  8. import org.apache.flink.table.api.TableException;
  9. import org.apache.flink.table.api.dataview.ListView;
  10. import org.apache.flink.table.functions.AggregateFunction;
  11. import org.apache.flink.types.Row;
  12. import java.util.ArrayList;
  13. import java.util.Arrays;
  14. import java.util.List;
  15. import java.util.Objects;
  16. public class ObjectArrayAggregationsFunction extends AggregateFunction<List<String>, ObjectArrayAggregationsFunction.Accumulator> {
  17. private static final JsonFactory JSON_FACTORY = new JsonFactory();
  18. private static final ObjectMapper MAPPER =
  19. new ObjectMapper(JSON_FACTORY)
  20. .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
  21. @Override
  22. public Accumulator createAccumulator() {
  23. return new Accumulator();
  24. }
  25. public void resetAccumulator(Accumulator acc) {
  26. acc.listView.clear();
  27. }
  28. public void accumulate(Accumulator accumulator, String... strings) throws Exception {
  29. System.out.println("accu strings : " + Arrays.toString(strings));
  30. String[] clone = strings.clone();
  31. accumulator.listView.add(String.join(",", clone));
  32. }
  33. @Override
  34. public List<String> getValue(Accumulator accumulator) {
  35. List<String> listout = new ArrayList<>();
  36. try {
  37. for (final String item : accumulator.listView.get()) {
  38. String[] split = item.split(",");
  39. ObjectNode objectNode = MAPPER.createObjectNode();
  40. for (int i = 0; i < split.length; i += 2) {
  41. String key = split[i];
  42. String value = split[i + 1];
  43. objectNode.put(key, value);
  44. }
  45. listout.add(MAPPER.writeValueAsString(objectNode));
  46. }
  47. System.out.println("listout " + listout);
  48. return listout;
  49. } catch (Exception e) {
  50. throw new TableException("The accumulator state could not be serialized.", e);
  51. }
  52. }
  53. public void retract(Accumulator acc, String... strings) throws Exception {
  54. String[] clone = strings.clone();
  55. acc.listView.remove(String.join(",", clone));
  56. }
  57. public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
  58. for (final Accumulator other : others) {
  59. acc.listView.addAll(other.listView.getList());
  60. }
  61. }
  62. public static class Accumulator {
  63. public ListView<String> listView = new ListView<>();
  64. @Override
  65. public boolean equals(Object o) {
  66. if (this == o) return true;
  67. if (o == null || getClass() != o.getClass()) return false;
  68. Accumulator that = (Accumulator) o;
  69. return Objects.equals(listView, that.listView);
  70. }
  71. @Override
  72. public int hashCode() {
  73. return Objects.hash(listView);
  74. }
  75. }
  76. }
  77. package com.sailpoint.udf;
  78. import com.fasterxml.jackson.core.JsonFactory;
  79. import com.fasterxml.jackson.databind.ObjectMapper;
  80. import com.fasterxml.jackson.databind.SerializationFeature;
  81. import com.fasterxml.jackson.databind.node.ObjectNode;
  82. import org.apache.flink.table.annotation.DataTypeHint;
  83. import org.apache.flink.table.annotation.FunctionHint;
  84. import org.apache.flink.table.api.TableException;
  85. import org.apache.flink.table.api.dataview.ListView;
  86. import org.apache.flink.table.functions.AggregateFunction;
  87. import org.apache.flink.types.Row;
  88. import java.util.ArrayList;
  89. import java.util.Arrays;
  90. import java.util.List;
  91. import java.util.Objects;
  92. public class ObjectArrayAggregationsFunction extends AggregateFunction<List<String>, ObjectArrayAggregationsFunction.Accumulator> {
  93. private static final JsonFactory JSON_FACTORY = new JsonFactory();
  94. private static final ObjectMapper MAPPER =
  95. new ObjectMapper(JSON_FACTORY)
  96. .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
  97. @Override
  98. public Accumulator createAccumulator() {
  99. return new Accumulator();
  100. }
  101. public void resetAccumulator(Accumulator acc) {
  102. acc.listView.clear();
  103. }
  104. public void accumulate(Accumulator accumulator, String... strings) throws Exception {
  105. System.out.println("accu strings : " + Arrays.toString(strings));
  106. String[] clone = strings.clone();
  107. accumulator.listView.add(String.join(",", clone));
  108. }
  109. @Override
  110. public List<String> getValue(Accumulator accumulator) {
  111. List<String> listout = new ArrayList<>();
  112. try {
  113. for (final String item : accumulator.listView.get()) {
  114. String[] split = item.split(",");
  115. ObjectNode objectNode = MAPPER.createObjectNode();
  116. for (int i = 0; i < split.length; i += 2) {
  117. String key = split[i];
  118. String value = split[i + 1];
  119. objectNode.put(key, value);
  120. }
  121. listout.add(MAPPER.writeValueAsString(objectNode));
  122. }
  123. System.out.println("listout " + listout);
  124. return listout;
  125. } catch (Exception e) {
  126. throw new TableException("The accumulator state could not be serialized.", e);
  127. }
  128. }
  129. public void retract(Accumulator acc, String... strings) throws Exception {
  130. String[] clone = strings.clone();
  131. acc.listView.remove(String.join(",", clone));
  132. }
  133. public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
  134. for (final Accumulator other : others) {
  135. acc.listView.addAll(other.listView.getList());
  136. }
  137. }
  138. public static class Accumulator {
  139. public ListView<String> listView = new ListView<>();
  140. @Override
  141. public boolean equals(Object o) {
  142. if (this == o) return true;
  143. if (o == null || getClass() != o.getClass()) return false;
  144. Accumulator that = (Accumulator) o;
  145. return Objects.equals(listView, that.listView);
  146. }
  147. @Override
  148. public int hashCode() {
  149. return Objects.hash(listView);
  150. }
  151. }
  152. }
  153. ``` i able to get it working.

字符串

展开查看全部

相关问题