我正在使用spark会话将Dataframe保存到配置单元表。代码如下。
df.write.mode(SaveMode.Append).format("orc").insertInto("table")
数据来自Kafka。这可能是一天中的大量数据。spark dataframe是否内部保存配置单元压缩?。如果没有什么是最好的方法,做定期压缩不影响表插入。
atmip9wb1#
公共类hivecompaction{
private static SparkConf sparkConf; private static JavaSparkContext sc; private static SparkSession sqlContext = springutil.getBean("testSparkSession"); private static HashMap<Object, Object> partitionColumns; public static void compact(String table, Dataset<Row> dataToCompact) { logger.info("Started Compaction for - " + table); if (!partitionColumns.containsKey(table)) { compact_table_without_partition(table, dataToCompact); } else { compact_table_with_partition(table, dataToCompact, partitionColumns); } logger.info("Data Overwritten in HIVE table : " + table + " successfully"); } private static void compact_table_with_partition(String table, Dataset<Row> dataToCompact, Map<Object, Object> partitionData) { String[] partitions = ((String) partitionData.get(table)).split(","); List<Map<Object, Object>> partitionMap = getPartitionsToCompact(dataToCompact, Arrays.asList(partitions)); for (Map mapper : partitionMap) { // sqlContext.sql("REFRESH TABLE staging.dummy_table"); String query = "select * from " + table + " where " + frameQuery(" and ", mapper); Dataset<Row> originalTable = sqlContext.sql(query.toString()); if (originalTable.count() == 0) { dataToCompact.write().mode("append").format("parquet").insertInto(table); } else { String location = getHdfsFileLocation(table); String uuid = getUUID(); updateTable(table, dataToCompact, originalTable, uuid); String destinationPath = framePath(location, frameQuery("/", mapper), uuid); sqlContext.sql("Alter table " + table + " partition(" + frameQuery(",", mapper) + ") set location '" + destinationPath + "'"); } } } private static void compact_table_without_partition(String table, Dataset<Row> dataToCompact) { String query = "select * from " + table; Dataset<Row> originalTable = sqlContext.sql(query.toString()); if (originalTable.count() == 0) { dataToCompact.write().mode("append").format("parquet").insertInto(table); } else { String location = getHdfsFileLocation(table); String uuid = getUUID(); String destinationPath = framePath(location, null, uuid); updateTable(table, dataToCompact, originalTable, uuid); sqlContext.sql("Alter table " + table + " set location '" + destinationPath + "'"); } } private static void updateTable(String table, Dataset<Row> dataToCompact, Dataset<Row> originalTable, String uuid) { Seq<String> joinColumnSeq = getPrimaryKeyColumns(); Dataset<Row> unModifiedRecords = originalTable.join(dataToCompact, joinColumnSeq, "leftanti"); Dataset<Row> dataToInsert1 = dataToCompact.withColumn("uuid", functions.lit(uuid)); Dataset<Row> dataToInsert2 = unModifiedRecords.withColumn("uuid", functions.lit(uuid)); dataToInsert1.write().mode("append").format("parquet").insertInto(table + "_compacted"); dataToInsert2.write().mode("append").format("parquet").insertInto(table + "_compacted"); } private static String getHdfsFileLocation(String table) { Dataset<Row> tableDescription = sqlContext.sql("describe formatted " + table + "_compacted"); List<Row> rows = tableDescription.collectAsList(); String location = null; for (Row r : rows) { if (r.get(0).equals("Location")) { location = r.getString(1); break; } } return location; } private static String frameQuery(String delimiter, Map mapper) { StringBuilder modifiedQuery = new StringBuilder(); int i = 1; for (Object key : mapper.keySet()) { modifiedQuery.append(key + "="); modifiedQuery.append(mapper.get(key)); if (mapper.size() > i) modifiedQuery.append(delimiter); i++; } return modifiedQuery.toString(); } private static String framePath(String location, String framedpartition, String uuid) { StringBuilder loc = new StringBuilder(location); loc.append("/"); if (StringUtils.isNotEmpty(framedpartition)) { loc.append(framedpartition); loc.append("/"); } loc.append("uuid="); loc.append(uuid); logger.info(loc.toString()); return loc.toString(); } public static Seq<String> getColumnSeq(List<String> joinColumns) { List<String> cols = new ArrayList<>(joinColumns.size()); for (int i = 0; i < joinColumns.size(); i++) { cols.add(joinColumns.get(i).toLowerCase()); } return JavaConverters.asScalaBufferConverter(cols).asScala().readOnly(); } private static String getUUID() { StringBuilder uri = new StringBuilder(); Random rand = new Random(); int randNum = rand.nextInt(200); String uuid = DateTimeFormatter.ofPattern("yyyyMMddHHmmSSS").format(LocalDateTime.now()).toString() + (String.valueOf(randNum)); return uuid; } private static List<Map<Object, Object>> getPartitionsToCompact(Dataset<Row> filteredRecords, List<String> partitions) { Column[] columns = new Column[partitions.size()]; int index = 0; for (String c : partitions) { columns[index] = new Column(c); index++; } Dataset<Row> partitionsToCompact = filteredRecords.select(columns) .distinct(); /** * TOD : add filter condition for selecting * known paritions */ JavaRDD<Map<Object, Object>> querywithPartitions = partitionsToCompact.toJavaRDD().map(row -> { return convertRowToMap(row); }); return querywithPartitions.collect(); } private static Map<Object, Object> convertRowToMap(Row row) { StructField[] fields = row.schema().fields(); List<StructField> structFields = Arrays.asList(fields); Map<Object, Object> a = structFields.stream() .collect(Collectors.toMap(e -> ((StructField) e).name(), e -> row.getAs(e.name()))); return a; } private static Seq<String> getPrimaryKeyColumns() { ArrayList<String> primaryKeyColumns = new ArrayList<String>(); Seq<String> joinColumnSeq = getColumnSeq(primaryKeyColumns); return joinColumnSeq; } /* * public static void initSpark(String jobname) { sparkConf = new * SparkConf().setAppName(jobname); sparkConf.setMaster("local[3]"); * sparkConf.set("spark.driver.allowMultipleContexts", "true"); sc = new * JavaSparkContext(); sqlContext = new SQLContext(sc); } */ public static HashMap<Object, Object> getParitionColumns() { HashMap<Object, Object> paritionColumns = new HashMap<Object, Object>(); paritionColumns.put((Object) "staging.dummy_table", "trade_date,dwh_business_date,region_cd"); return paritionColumns; } public static void initialize(String table) { // initSpark("Hive Table Compaction -" + table); partitionColumns = getParitionColumns(); }
}用法:string table=“staging.dummy\u table”;
HiveCompaction.initialize(table); Dataset<Row> dataToCompact = sparkSession.sql("select * from staging.dummy_table"); HiveCompaction.compact(table, dataToCompact); sparkSession.sql("select * from staging.dummy_table_compacted").show(); System.out.println("Compaction successful");
iih3973s2#
在你的例子中你应该加上 partitionBy 因为数据量很大
partitionBy
df.write..mode(SaveMode.Append).format("orc").partitionBy("age")
或者您也可以按照下面的方式进行归档,我的做法是首先在spark job本身中注册一个temp表,然后利用hivecontext的sql方法在hive中使用temp表中的数据创建一个新表。例如,如果我有一个dataframe df和hivecontext hc,一般过程是:
df.registerTempTable("my_temp_table") hc.sql("Insert into overwrite table_name PARTITION SELECT a,b, PARTITION_col from my_temp_table")
2条答案
按热度按时间atmip9wb1#
公共类hivecompaction{
}用法:string table=“staging.dummy\u table”;
iih3973s2#
在你的例子中你应该加上
partitionBy
因为数据量很大或者您也可以按照下面的方式进行归档,我的做法是首先在spark job本身中注册一个temp表,然后利用hivecontext的sql方法在hive中使用temp表中的数据创建一个新表。例如,如果我有一个dataframe df和hivecontext hc,一般过程是: