将JSON模式转换为Flink格式

7hiiyaii  于 2023-10-14  发布在  Apache
关注(0)|答案(1)|浏览(186)

如何将JSON模式转换为Flink JavaScript语句?
例如,给定以下具有嵌套对象属性的JSON模式

  1. {
  2. "type": "object",
  3. "properties": {
  4. "name": {
  5. "type": "string"
  6. },
  7. "id": {
  8. "type": "string"
  9. },
  10. "i": {
  11. "type": "integer"
  12. },
  13. "p": {
  14. "type": "object",
  15. "properties": {
  16. "name": {
  17. "type": "string"
  18. },
  19. "id": {
  20. "type": "string"
  21. },
  22. "i": {
  23. "type": "integer"
  24. },
  25. "p1": {
  26. "type": "object",
  27. "properties": {
  28. "name": {
  29. "type": "string"
  30. },
  31. "id": {
  32. "type": "string"
  33. },
  34. "i": {
  35. "type": "integer"
  36. }
  37. }
  38. }
  39. }
  40. }
  41. }
  42. }

我希望将其转换为以下使用行类型的Flink SQL语句

  1. CREATE TABLE test_nested (
  2. name STRING,
  3. id STRING,
  4. i INTEGER,
  5. p ROW(name STRING, id STRING, i INTEGER, p1 ROW(name STRING, id STRING, i INTEGER))
  6. ) WITH (
  7. 'connector'='print'
  8. );

我尝试遍历properties字段,但每当遇到object类型时,我都不知道该怎么办。我的猜测是我需要递归地做,但我不知道确切的步骤。

  1. private static String traverse(JsonNode rootNode) {
  2. JsonNode properties = rootNode.get("properties");
  3. Iterator<Map.Entry<String, JsonNode>> fields = properties.fields();
  4. // columnNameType will contain the "columnName dataType", for example
  5. // "name STRING"
  6. // "id STRING"
  7. // "i INTEGER"
  8. // "p ROW(name STRING, id STRING, i INTEGER, p1 ROW(name STRING, id STRING, i INTEGER))"
  9. List<String> columnNameType = new ArrayList<>();
  10. while (fields.hasNext()) {
  11. Map.Entry<String, JsonNode> next = fields.next();
  12. String key = next.getKey();
  13. JsonNode value = next.getValue();
  14. String type = value.get("type").asText();
  15. if(type.equals("object")) {
  16. // recurse???
  17. } else {
  18. columnNameType.add(key+" "+type);
  19. }
  20. }
  21. // this should return a formatted string like
  22. /*
  23. name STRING,
  24. id STRING,
  25. i INTEGER,
  26. p ROW(name STRING, id STRING, i INTEGER, p1 ROW(name STRING, id STRING, i INTEGER))
  27. */
  28. return String.join(",\n", columnNameType);
  29. }
wkftcu5l

wkftcu5l1#

我认为以下可以是一个很好的起点。请注意,我也简化了JSON结构:

  1. {
  2. "name": "string",
  3. "i": "integer",
  4. "p" : {
  5. "prop1": "string",
  6. "prop2": "integer",
  7. "prop3": {
  8. "prop4": "string"
  9. }
  10. }
  11. }
  1. private static String traverse(JsonNode jsonNode, boolean topLevel) {
  2. List<String> fields = new ArrayList<>();
  3. jsonNode.fields().forEachRemaining(entry -> {
  4. StringBuilder sb = new StringBuilder();
  5. sb
  6. .append('`')
  7. .append(entry.getKey())
  8. .append("` ");
  9. if (entry.getValue().isObject()) {
  10. sb
  11. .append("ROW(")
  12. .append(traverse(entry.getValue(), false))
  13. .append(")");
  14. } else {
  15. sb.append(entry.getValue().asText().toUpperCase());
  16. }
  17. fields.add(sb.toString());
  18. });
  19. return String.join(topLevel ? ",\n" : ", ", fields);
  20. }

你可以这样称呼它:

  1. traverse(rootNode, true)

生成以下输出:

  1. `name` STRING,
  2. `i` INTEGER,
  3. `p` ROW(`prop1` STRING, `prop2` INTEGER, `prop3` ROW(`prop4` STRING))
展开查看全部

相关问题