如何在mapreduce中使用文件输入/输出格式?

k3bvogb1  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(374)

我需要实现一个自定义的i/o格式的文件i/o格式的基础上。我该怎么办?
具体来说,我需要一种方法将orcfile库包含在源代码中(这是一个定制的pig实现),并使用orcfile输出格式写入数据,然后使用orcfile输入格式读回数据。

qvtsj1bj

qvtsj1bj1#

刚刚在这里编写了一个示例代码。希望有帮助。样例Map器代码

public static class MyMapper<K extends WritableComparable, V extends Writable> 
extends MapReduceBase implements Mapper<K, OrcStruct, Text, IntWritable> {

    private StructObjectInspector oip; 
    private final OrcSerde serde = new OrcSerde();

    public void configure(JobConf job) {
        Properties table = new Properties();
        table.setProperty("columns", "a,b,c");
        table.setProperty("columns.types", "int,string,struct<d:int,e:string>");

        serde.initialize(job, table);

        try {
            oip = (StructObjectInspector) serde.getObjectInspector();
        } catch (SerDeException e) {
            e.printStackTrace();
        }
    }
    public void map(K key, OrcStruct val,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
                    throws IOException {
        System.out.println(val);
        List<? extends StructField> fields =oip.getAllStructFieldRefs();

        StringObjectInspector bInspector =
                (StringObjectInspector) fields.get(B_ID).getFieldObjectInspector();
        String b = "garbage";
        try {
            b =  bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val), fields.get(B_ID)));
        } catch (SerDeException e1) {
            e1.printStackTrace();
        }

        OrcStruct struct = null;
        try {
            struct = (OrcStruct) oip.getStructFieldData(serde.deserialize(val),fields.get(C_ID));
        } catch (SerDeException e1) {
            e1.printStackTrace();
        }
        StructObjectInspector cInspector = (StructObjectInspector) fields.get(C_ID).getFieldObjectInspector();
        int d =   ((IntWritable) cInspector.getStructFieldData(struct, fields.get(D_ID))).get();
        String e = cInspector.getStructFieldData(struct, fields.get(E_ID)).toString();

        output.collect(new Text(b), new IntWritable(1));
        output.collect(new Text(e), new IntWritable(1));
    }

}

发射器代码

JobConf job = new JobConf(new Configuration(), OrcReader.class);

    // Specify various job-specific parameters     
    job.setJobName("myjob");
    job.set("mapreduce.framework.name","local");
    job.set("fs.default.name","file:///");
    job.set("log4j.logger.org.apache.hadoop","INFO");
    job.set("log4j.logger.org.apache.hadoop","INFO");

    //push down projection columns
    job.set("hive.io.file.readcolumn.ids","1,2");
    job.set("hive.io.file.read.all.columns","false");
    job.set("hive.io.file.readcolumn.names","b,c");

    FileInputFormat.setInputPaths(job, new Path("./src/main/resources/000000_0.orc"));
    FileOutputFormat.setOutputPath(job, new Path("./target/out1"));

    job.setMapperClass(OrcReader.MyMapper.class);
    job.setCombinerClass(OrcReader.MyReducer.class);
    job.setReducerClass(OrcReader.MyReducer.class);

    job.setInputFormat(OrcInputFormat.class);
    job.setOutputFormat(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    JobClient.runJob(job);
vyu0f0g1

vyu0f0g12#

您需要创建inputformat类(或fileinputformat,取决于文件的性质)的子类。
只需在google上搜索hadoop inputformat,您就会找到大量关于如何创建自己的inputformat类的文章和教程。

u91tlkcl

u91tlkcl3#

可以使用hcatalog库在mapreduce中读写orc文件。

相关问题