so和web上的大多数问题/答案都讨论使用hive将一堆小的orc文件合并成一个较大的文件,但是,我的orc文件是日志文件,按天分开,我需要将它们分开。我只想每天“汇总”orc文件(hdfs中的目录)。我很可能需要用java编写解决方案,并且遇到了orcfilemergeoperator,这可能是我需要使用的,但现在说还为时过早。解决这个问题的最佳方法是什么?
bt1cpqcv1#
你不需要重新发明轮子。 ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE 可用于将较小的orc文件合并为较大的文件,因为 Hive 0.14.0. 合并发生在条带级别,这样可以避免对数据进行解压缩和解码。它工作得很快。我建议创建一个按天分区的外部表(分区是目录),然后合并它们 PARTITION (day_column) 作为分区规范。看这里:语言手册+兽人
ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE
Hive 0.14.0.
PARTITION (day_column)
mznpcxlj2#
这里有很好的答案,但这些都不允许我运行cron作业,这样我就可以每天进行汇总。我们每天都有日志文件写入hdfs,我不想每天进来时都在hive中运行查询。我最后做的事情对我来说似乎更直截了当。我编写了一个java程序,它使用orc库扫描目录中的所有文件,并创建这些文件的列表。然后打开一个新的writer,它是“组合”文件(以“.”开头,因此它对配置单元隐藏,否则配置单元将失败)。然后程序打开列表中的每个文件,读取内容并将其写入组合文件。读取所有文件后,它将删除这些文件。我还添加了在需要时一次运行一个目录的功能。注意:您需要一个模式文件。journald日志可以输出为json“journalctl-ojson”,然后您可以使用apacheorc工具生成一个模式文件,也可以手动生成一个模式文件。从兽人的自动发电机是好的,但手动总是更好。注意:要按原样使用这段代码,您需要一个有效的keytab并在类路径中添加-dkeytab=。
import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.security.UserGroupInformation; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import com.cloudera.org.joda.time.LocalDate; public class OrcFileRollUp { private final static String SCHEMA = "journald.schema"; private final static String UTF_8 = "UTF-8"; private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs"; private static final String keytabLocation = System.getProperty("keytab"); private static final String kerberosUser = "<userName>"; private static Writer writer; public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("hadoop.security.authentication", "Kerberos"); InetAddress myHost = InetAddress.getLocalHost(); String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName()); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation); int currentDay = LocalDate.now().getDayOfMonth(); int currentMonth = LocalDate.now().getMonthOfYear(); int currentYear = LocalDate.now().getYear(); Path path = new Path(HDFS_BASE_LOGS_DIR); FileSystem fileSystem = path.getFileSystem(conf); System.out.println("The URI is: " + fileSystem.getUri()); //Get Hosts: List<String> allHostsPath = getHosts(path, fileSystem); TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA) .replaceAll("\n", "")); //Open each file for reading and write contents for(int i = 0; i < allHostsPath.size(); i++) { String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working"; //filename: .2018_04_24.orc.working //Create list of files from directory and today's date OR pass a directory in via the command line in format //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/ String directory = ""; Path outFilePath; Path argsPath; List<String> orcFiles; if(args.length == 0) { directory = currentYear + "/" + currentMonth + "/" + currentDay; outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile); try { orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem); } catch (Exception e) { continue; } } else { outFilePath = new Path(args[0] + "/" + outFile); argsPath = new Path(args[0]); try { orcFiles = getAllFilePath(argsPath, fileSystem); } catch (Exception e) { continue; } } //Create List of files in the directory FileSystem fs = outFilePath.getFileSystem(conf); //Writer MUST be below ^^ or the combination file will be deleted as well. if(fs.exists(outFilePath)) { System.out.println(outFilePath + " exists, delete before continuing."); } else { writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf) .setSchema(schema)); } for(int j = 0; j < orcFiles.size(); j++ ) { Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf)); VectorizedRowBatch batch = reader.getSchema().createRowBatch(); RecordReader rows = reader.rows(); while (rows.nextBatch(batch)) { if (batch != null) { writer.addRowBatch(batch); } } rows.close(); fs.delete(new Path(orcFiles.get(j)), false); } //Close File writer.close(); //Remove leading "." from ORC file to make visible to Hive outFile = fileSystem.getFileStatus(outFilePath) .getPath() .getName(); if (outFile.startsWith(".")) { outFile = outFile.substring(1); int lastIndexOf = outFile.lastIndexOf(".working"); outFile = outFile.substring(0, lastIndexOf); } Path parent = outFilePath.getParent(); fileSystem.rename(outFilePath, new Path(parent, outFile)); if(args.length != 0) break; } } private static String getSchema(String resource) throws IOException { try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) { return IOUtils.toString(input, UTF_8); } } public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException { List<String> hostsList = new ArrayList<String>(); FileStatus[] fileStatus = fs.listStatus(filePath); for (FileStatus fileStat : fileStatus) { hostsList.add(fileStat.getPath().toString()); } return hostsList; } private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException { List<String> fileList = new ArrayList<String>(); FileStatus[] fileStatus = fs.listStatus(filePath); for (FileStatus fileStat : fileStatus) { if (fileStat.isDirectory()) { fileList.addAll(getAllFilePath(fileStat.getPath(), fs)); } else { fileList.add(fileStat.getPath() .toString()); } } for(int i = 0; i< fileList.size(); i++) { if(!fileList.get(i).endsWith(".orc")) fileList.remove(i); } return fileList; } }
2条答案
按热度按时间bt1cpqcv1#
你不需要重新发明轮子。
ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE
可用于将较小的orc文件合并为较大的文件,因为Hive 0.14.0.
合并发生在条带级别,这样可以避免对数据进行解压缩和解码。它工作得很快。我建议创建一个按天分区的外部表(分区是目录),然后合并它们PARTITION (day_column)
作为分区规范。看这里:语言手册+兽人
mznpcxlj2#
这里有很好的答案,但这些都不允许我运行cron作业,这样我就可以每天进行汇总。我们每天都有日志文件写入hdfs,我不想每天进来时都在hive中运行查询。
我最后做的事情对我来说似乎更直截了当。我编写了一个java程序,它使用orc库扫描目录中的所有文件,并创建这些文件的列表。然后打开一个新的writer,它是“组合”文件(以“.”开头,因此它对配置单元隐藏,否则配置单元将失败)。然后程序打开列表中的每个文件,读取内容并将其写入组合文件。读取所有文件后,它将删除这些文件。我还添加了在需要时一次运行一个目录的功能。
注意:您需要一个模式文件。journald日志可以输出为json“journalctl-ojson”,然后您可以使用apacheorc工具生成一个模式文件,也可以手动生成一个模式文件。从兽人的自动发电机是好的,但手动总是更好。
注意:要按原样使用这段代码,您需要一个有效的keytab并在类路径中添加-dkeytab=。