我需要实现一个自定义的i/o格式的文件i/o格式的基础上。我该怎么办?具体来说,我需要一种方法将orcfile库包含在源代码中(这是一个定制的pig实现),并使用orcfile输出格式写入数据,然后使用orcfile输入格式读回数据。
qvtsj1bj1#
刚刚在这里编写了一个示例代码。希望有帮助。样例Map器代码
public static class MyMapper<K extends WritableComparable, V extends Writable> extends MapReduceBase implements Mapper<K, OrcStruct, Text, IntWritable> { private StructObjectInspector oip; private final OrcSerde serde = new OrcSerde(); public void configure(JobConf job) { Properties table = new Properties(); table.setProperty("columns", "a,b,c"); table.setProperty("columns.types", "int,string,struct<d:int,e:string>"); serde.initialize(job, table); try { oip = (StructObjectInspector) serde.getObjectInspector(); } catch (SerDeException e) { e.printStackTrace(); } } public void map(K key, OrcStruct val, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { System.out.println(val); List<? extends StructField> fields =oip.getAllStructFieldRefs(); StringObjectInspector bInspector = (StringObjectInspector) fields.get(B_ID).getFieldObjectInspector(); String b = "garbage"; try { b = bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val), fields.get(B_ID))); } catch (SerDeException e1) { e1.printStackTrace(); } OrcStruct struct = null; try { struct = (OrcStruct) oip.getStructFieldData(serde.deserialize(val),fields.get(C_ID)); } catch (SerDeException e1) { e1.printStackTrace(); } StructObjectInspector cInspector = (StructObjectInspector) fields.get(C_ID).getFieldObjectInspector(); int d = ((IntWritable) cInspector.getStructFieldData(struct, fields.get(D_ID))).get(); String e = cInspector.getStructFieldData(struct, fields.get(E_ID)).toString(); output.collect(new Text(b), new IntWritable(1)); output.collect(new Text(e), new IntWritable(1)); } }
发射器代码
JobConf job = new JobConf(new Configuration(), OrcReader.class); // Specify various job-specific parameters job.setJobName("myjob"); job.set("mapreduce.framework.name","local"); job.set("fs.default.name","file:///"); job.set("log4j.logger.org.apache.hadoop","INFO"); job.set("log4j.logger.org.apache.hadoop","INFO"); //push down projection columns job.set("hive.io.file.readcolumn.ids","1,2"); job.set("hive.io.file.read.all.columns","false"); job.set("hive.io.file.readcolumn.names","b,c"); FileInputFormat.setInputPaths(job, new Path("./src/main/resources/000000_0.orc")); FileOutputFormat.setOutputPath(job, new Path("./target/out1")); job.setMapperClass(OrcReader.MyMapper.class); job.setCombinerClass(OrcReader.MyReducer.class); job.setReducerClass(OrcReader.MyReducer.class); job.setInputFormat(OrcInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); JobClient.runJob(job);
vyu0f0g12#
您需要创建inputformat类(或fileinputformat,取决于文件的性质)的子类。只需在google上搜索hadoop inputformat,您就会找到大量关于如何创建自己的inputformat类的文章和教程。
u91tlkcl3#
可以使用hcatalog库在mapreduce中读写orc文件。
3条答案
按热度按时间qvtsj1bj1#
刚刚在这里编写了一个示例代码。希望有帮助。样例Map器代码
发射器代码
vyu0f0g12#
您需要创建inputformat类(或fileinputformat,取决于文件的性质)的子类。
只需在google上搜索hadoop inputformat,您就会找到大量关于如何创建自己的inputformat类的文章和教程。
u91tlkcl3#
可以使用hcatalog库在mapreduce中读写orc文件。