预期org.apache.hadoop.hive.ql.io.orc.orcstruct,收到org.apache.hadoop.hive.ql.io.orc.orcserde$orcserderow

bq3bfh9z  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(412)

读取orcfile并将数据写入orcfile时,出现以下错误:

expected org.apache.hadoop.hive.ql.io.orc.OrcStruct, 
received org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow

MapoutputValue.class 不对吗?

this is my program:
package com.baifendian.basicPlatform.hive.ql.io.orc;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class OrcInput {
    final static String inputSchema = "struct<c1:string,c2:string>";

    final static String outputSchema = "struct<c1:string,c2:string>";

    static StructObjectInspector inputOI;
    static SettableStructObjectInspector outputOI;
    static OrcSerde orcsd;

    public static class OrcReaderMap extends
            Mapper<NullWritable, OrcStruct, NullWritable, Writable> {

        public void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
            TypeInfo tfin = TypeInfoUtils
                    .getTypeInfoFromTypeString(inputSchema);
            TypeInfo tfout = TypeInfoUtils
                    .getTypeInfoFromTypeString(outputSchema);

            inputOI = (StructObjectInspector) OrcStruct
                    .createObjectInspector(tfin);
            outputOI = (SettableStructObjectInspector) OrcStruct
                    .createObjectInspector(tfout);

            orcsd = new OrcSerde();
            List<? extends StructField> fldlst = outputOI
                    .getAllStructFieldRefs();
            StringBuffer sbCols = new StringBuffer();
            StringBuffer sbTyps = new StringBuffer();
            for (StructField sf : fldlst) {
                if (sbCols.length() > 0) {
                    sbCols.append(",");
                }
                sbCols.append(sf.getFieldName());
                if (sbTyps.length() > 0) {
                    sbTyps.append(",");
                }
                sbTyps.append(sf.getFieldObjectInspector().getTypeName());
            }
            Properties props = new Properties();
            props.put(IOConstants.COLUMNS, sbCols.toString());
            props.put(IOConstants.COLUMNS_TYPES, sbTyps.toString());
            orcsd.initialize(context.getConfiguration(), props);
        }

        public void map(NullWritable meaningless, OrcStruct orc, Context context)
                throws IOException, InterruptedException {
            List<Object> ilst = inputOI.getStructFieldsDataAsList(orc);
            Text f1 = (Text) ilst.get(0);
            Text f2 = (Text) ilst.get(1);
            // output orc format
            OrcStruct objOut = (OrcStruct) outputOI.create();
            List<? extends StructField> flst = outputOI.getAllStructFieldRefs();
            outputOI.setStructFieldData(objOut, flst.get(0), f1);
            outputOI.setStructFieldData(objOut, flst.get(1), f2);
            context.write(NullWritable.get(), orcsd.serialize(objOut, outputOI));
        }
    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "OrcReader");
        job.setJarByClass(OrcInput.class);
        job.setInputFormatClass(OrcNewInputFormat.class);
        job.setOutputFormatClass(OrcNewOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path("/warehouse/bae_xinhua_test.db/orcinput"));
        FileOutputFormat.setOutputPath(job, new Path("/warehouse/bae_xinhua_test.db/orcoutput"));
        job.setMapOutputKeyClass(NullWritable.class); 
        job.setMapOutputValueClass(OrcStruct.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);
        job.setMapperClass(OrcInput.OrcReaderMap.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

相关问题