将JSON模式转换为Flink格式

7hiiyaii  于 2023-10-14  发布在  Apache
关注(0)|答案(1)|浏览(148)

如何将JSON模式转换为Flink JavaScript语句?
例如,给定以下具有嵌套对象属性的JSON模式

{
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "id": {
      "type": "string"
    },
    "i": {
      "type": "integer"
    },
    "p": {
      "type": "object",
      "properties": {
        "name": {
          "type": "string"
        },
        "id": {
          "type": "string"
        },
        "i": {
          "type": "integer"
        },
        "p1": {
          "type": "object",
          "properties": {
            "name": {
              "type": "string"
            },
            "id": {
              "type": "string"
            },
            "i": {
              "type": "integer"
            }
          }
        }
      }
    }
  }
}

我希望将其转换为以下使用行类型的Flink SQL语句

CREATE TABLE test_nested (
    name STRING,
    id STRING,
    i INTEGER,
    p ROW(name STRING, id STRING, i INTEGER, p1 ROW(name STRING, id STRING, i INTEGER))
) WITH (
    'connector'='print'
);

我尝试遍历properties字段,但每当遇到object类型时,我都不知道该怎么办。我的猜测是我需要递归地做,但我不知道确切的步骤。

private static String traverse(JsonNode rootNode) {
    JsonNode properties = rootNode.get("properties");
    Iterator<Map.Entry<String, JsonNode>> fields = properties.fields();
    // columnNameType will contain the "columnName dataType", for example
    // "name STRING"
    // "id STRING"
    // "i INTEGER"
    // "p ROW(name STRING, id STRING, i INTEGER, p1 ROW(name STRING, id STRING, i INTEGER))"
    List<String> columnNameType = new ArrayList<>();
    while (fields.hasNext()) {
        Map.Entry<String, JsonNode> next = fields.next();
        String key = next.getKey();
        JsonNode value = next.getValue();
        String type = value.get("type").asText();

        if(type.equals("object")) {
            // recurse???
        } else {
            columnNameType.add(key+" "+type);
        }
    }
    // this should return a formatted string like
    /*
        name STRING,
        id STRING,
        i INTEGER,
        p ROW(name STRING, id STRING, i INTEGER, p1 ROW(name STRING, id STRING, i INTEGER))
    */
    return String.join(",\n",  columnNameType);
}
wkftcu5l

wkftcu5l1#

我认为以下可以是一个很好的起点。请注意,我也简化了JSON结构:

{
  "name": "string",
  "i": "integer",
  "p" : {
    "prop1": "string",
    "prop2": "integer",
    "prop3": {
      "prop4": "string"
    }
  }
}
private static String traverse(JsonNode jsonNode, boolean topLevel) {
  List<String> fields = new ArrayList<>();

  jsonNode.fields().forEachRemaining(entry -> {
    StringBuilder sb = new StringBuilder();
    sb
        .append('`')
        .append(entry.getKey())
        .append("` ");

    if (entry.getValue().isObject()) {
      sb
          .append("ROW(")
          .append(traverse(entry.getValue(), false))
          .append(")");
    } else {
      sb.append(entry.getValue().asText().toUpperCase());
    }
    fields.add(sb.toString());
  });

  return String.join(topLevel ? ",\n" : ", ", fields);
}

你可以这样称呼它:

traverse(rootNode, true)

生成以下输出:

`name` STRING,
`i` INTEGER,
`p` ROW(`prop1` STRING, `prop2` INTEGER, `prop3` ROW(`prop4` STRING))

相关问题