在包含json数据的parquet文件上创建配置单元表

jgwigjjp  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(377)

我要达到的目标
从源大json文件(employee sample.json)获取数据
一个简单的spark应用程序,将其作为文本文件读取并存储在parquet(simple-loader.java)中。我不知道json文件中有什么,所以我不能放任何模式,所以我希望模式是读的,而不是写的。创建了一个parquet文件,其中一列名为“value”,包含json字符串
在parquet文件上创建一个hive外部表,当我执行“select*fromtable”时,我看到一列带有json数据。
我真正需要的是创建一个hive表,它可以读取“value”列中的json数据并应用schema和emit列,这样我就可以根据需要在原始数据上创建各种表。
我已经在json文件上创建了配置单元表,并提取了列,但是这个从parquet提取列并应用json模式欺骗了我

  1. employee-sample.json
  2. {"name":"Dave", "age" : 30 , "DOB":"1987-01-01"}
  3. {"name":"Steve", "age" : 31 , "DOB":"1986-01-01"}
  4. {"name":"Kumar", "age" : 32 , "DOB":"1985-01-01"}

将json转换为parquet的简单spark代码

  1. simple-loader.java
  2. public static void main(String[] args) {
  3. SparkSession sparkSession = SparkSession.builder()
  4. .appName(JsonToParquet.class.getName())
  5. .master("local[*]").getOrCreate();
  6. Dataset<String> eventsDataSet = sparkSession.read().textFile("D:\\dev\\employee-sample.json");
  7. eventsDataSet.createOrReplaceTempView("rawView");
  8. sparkSession.sqlContext().sql("select string(value) as value from rawView")
  9. .write()
  10. .parquet("D:\\dev\\" + UUID.randomUUID().toString());
  11. sparkSession.close();
  12. }

Parquet文件上的Hive桌

  1. CREATE EXTERNAL TABLE EVENTS_RAW (
  2. VALUE STRING)
  3. STORED AS PARQUET
  4. LOCATION 'hdfs://XXXXXX:8020/employee/data_raw';

我尝试设置json serde,但它只在数据以json foram存储时有效,行格式serde'com.proofpoint.hive.serde.jsonserde'
预期格式

  1. CREATE EXTERNAL TABLE EVENTS_DATA (
  2. NAME STRING,
  3. AGE STRING,
  4. DOB STRING)
  5. ??????????????????????????????
q5iwbnjs

q5iwbnjs1#

创建配置单元外部表示例:

  1. public static final String CREATE_EXTERNAL = "CREATE EXTERNAL TABLE %s" +
  2. " (%s) " +
  3. " PARTITIONED BY(%s) " +
  4. " STORED AS %s" +
  5. " LOCATION '%s'";
  6. /**
  7. * Will create an external table and recover the partitions
  8. */
  9. public void createExternalTable(SparkSession sparkSession, StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
  10. String createQuery = createTableString(schema, tableName, format, partitions, tablePath);
  11. logger.info("Going to create External table with the following query:\n " + createQuery);
  12. sparkSession.sql(createQuery);
  13. logger.debug("Finish to create External table with the following query:\n " + createQuery);
  14. recoverPartitions(sparkSession, tableName);
  15. }
  16. public String createTableString(StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
  17. Set<String> partitionNames = partitions.stream().map(struct -> struct.name()).collect(Collectors.toSet());
  18. String columns = Arrays.stream(schema.fields())
  19. //Filter the partitions
  20. .filter(field -> !partitionNames.contains(field.name()))
  21. //
  22. .map(HiveTableHelper::fieldToStringBuilder)
  23. .collect(Collectors.joining(", "));
  24. String partitionsString = partitions.stream().map(HiveTableHelper::fieldToStringBuilder).collect(Collectors.joining(", "));
  25. return String.format(CREATE_EXTERNAL, tableName, columns, partitionsString, format.name(), tablePath);
  26. }
  27. /**
  28. *
  29. * @param sparkSession
  30. * @param table
  31. */
  32. public void recoverPartitions(SparkSession sparkSession, String table){
  33. String query = "ALTER TABLE " + table + " RECOVER PARTITIONS";
  34. logger.debug("Start: " + query);
  35. sparkSession.sql(query);
  36. sparkSession.catalog().refreshTable(table);
  37. logger.debug("Finish: " + query);
  38. }
  39. public static StringBuilder fieldToStringBuilder(StructField field){
  40. StringBuilder sb = new StringBuilder();
  41. sb.append(field.name()).append( " ").append(field.dataType().simpleString());
  42. return sb;
  43. }
展开查看全部

相关问题