读取orcfile并将数据写入orcfile时,出现以下错误:
expected org.apache.hadoop.hive.ql.io.orc.OrcStruct,
received org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow
是 MapoutputValue.class
不对吗?
this is my program:
package com.baifendian.basicPlatform.hive.ql.io.orc;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class OrcInput {
final static String inputSchema = "struct<c1:string,c2:string>";
final static String outputSchema = "struct<c1:string,c2:string>";
static StructObjectInspector inputOI;
static SettableStructObjectInspector outputOI;
static OrcSerde orcsd;
public static class OrcReaderMap extends
Mapper<NullWritable, OrcStruct, NullWritable, Writable> {
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
TypeInfo tfin = TypeInfoUtils
.getTypeInfoFromTypeString(inputSchema);
TypeInfo tfout = TypeInfoUtils
.getTypeInfoFromTypeString(outputSchema);
inputOI = (StructObjectInspector) OrcStruct
.createObjectInspector(tfin);
outputOI = (SettableStructObjectInspector) OrcStruct
.createObjectInspector(tfout);
orcsd = new OrcSerde();
List<? extends StructField> fldlst = outputOI
.getAllStructFieldRefs();
StringBuffer sbCols = new StringBuffer();
StringBuffer sbTyps = new StringBuffer();
for (StructField sf : fldlst) {
if (sbCols.length() > 0) {
sbCols.append(",");
}
sbCols.append(sf.getFieldName());
if (sbTyps.length() > 0) {
sbTyps.append(",");
}
sbTyps.append(sf.getFieldObjectInspector().getTypeName());
}
Properties props = new Properties();
props.put(IOConstants.COLUMNS, sbCols.toString());
props.put(IOConstants.COLUMNS_TYPES, sbTyps.toString());
orcsd.initialize(context.getConfiguration(), props);
}
public void map(NullWritable meaningless, OrcStruct orc, Context context)
throws IOException, InterruptedException {
List<Object> ilst = inputOI.getStructFieldsDataAsList(orc);
Text f1 = (Text) ilst.get(0);
Text f2 = (Text) ilst.get(1);
// output orc format
OrcStruct objOut = (OrcStruct) outputOI.create();
List<? extends StructField> flst = outputOI.getAllStructFieldRefs();
outputOI.setStructFieldData(objOut, flst.get(0), f1);
outputOI.setStructFieldData(objOut, flst.get(1), f2);
context.write(NullWritable.get(), orcsd.serialize(objOut, outputOI));
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "OrcReader");
job.setJarByClass(OrcInput.class);
job.setInputFormatClass(OrcNewInputFormat.class);
job.setOutputFormatClass(OrcNewOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("/warehouse/bae_xinhua_test.db/orcinput"));
FileOutputFormat.setOutputPath(job, new Path("/warehouse/bae_xinhua_test.db/orcoutput"));
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(OrcStruct.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.class);
job.setMapperClass(OrcInput.OrcReaderMap.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1条答案
按热度按时间i1icjdpr1#
请添加job.setnumreducetasks(0);