avro和mrunit给出示例化异常

px9o7tmv  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(398)

我正在使用:
hadoop客户端2.2.0
MR1.0.0单元
avro 1.7.6版本
avro MR1.7.6单元
... 整个过程都在使用maven进行构建和测试。
我得到了一个nullpointerexception,直到我按照mrunit的说明在序列化中使用avro nullpointerexception。
现在我得到一个示例化异常:

  1. Running mypackage.MyTest
  2. log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
  3. log4j:WARN Please initialize the log4j system properly.
  4. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  5. 2014-03-23 20:49:21.463 java[27994:1003] Unable to load realm info from SCDynamicStore
  6. Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.945 sec <<< FAILURE!
  7. process(mypackage.MyTest) Time elapsed: 0.909 sec <<< ERROR!
  8. java.lang.RuntimeException: java.lang.InstantiationException
  9. at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
  10. at org.apache.hadoop.io.serializer.SerializationFactory.add(SerializationFactory.java:72)
  11. at org.apache.hadoop.io.serializer.SerializationFactory.<init>(SerializationFactory.java:63)
  12. at org.apache.hadoop.mrunit.internal.io.Serialization.<init>(Serialization.java:37)
  13. at org.apache.hadoop.mrunit.TestDriver.getSerialization(TestDriver.java:464)
  14. at org.apache.hadoop.mrunit.TestDriver.copy(TestDriver.java:608)
  15. at org.apache.hadoop.mrunit.TestDriver.copyPair(TestDriver.java:612)
  16. at org.apache.hadoop.mrunit.MapDriverBase.addInput(MapDriverBase.java:118)
  17. at org.apache.hadoop.mrunit.MapDriverBase.withInput(MapDriverBase.java:207)
  18. at mypackage.MyTest.process(MyTest.java:92)
  19. ...

avro型号如下:

  1. {
  2. "namespace": "model",
  3. "type": "record",
  4. "name": "Blob",
  5. "fields": [
  6. { "name": "value", "type": "string" }
  7. ]
  8. }

Map器如下所示:

  1. public class MyMapper
  2. extends Mapper<AvroKey<Blob>, NullWritable, LongWritable, NullWritable>
  3. {
  4. @Override
  5. public void map(AvroKey<Blob> key, NullWritable value, Context context)
  6. throws IOException, InterruptedException {
  7. context.write(new LongWritable(0), NullWritable.get());
  8. }
  9. }

失败的测试(我目前唯一的测试)如下所示:

  1. @Test
  2. public void process() throws IOException {
  3. mapper = new MyMapper();
  4. job = Job.getInstance();
  5. mapDriver = MapDriver.newMapDriver(mapper);
  6. Configuration configuration = mapDriver.getConfiguration();
  7. //Copy over the default io.serializations. If you don't do this then you will
  8. //not be able to deserialize the inputs to the mapper
  9. String[] serializations = configuration.getStrings("io.serializations");
  10. serializations = Arrays.copyOf(serializations, serializations.length + 1);
  11. serializations[serializations.length-1] = AvroSerialization.class.getName();
  12. configuration.setStrings("io.serializations", serializations);
  13. //Configure AvroSerialization by specifying the key writer and value writer schemas
  14. configuration.setStrings("avro.serialization.key.writer.schema", Schema.create(Schema.Type.LONG).toString(true));
  15. configuration.setStrings("avro.serialization.value.writer.schema", Schema.create(Schema.Type.NULL).toString(true));
  16. job.setMapperClass(MyMapper.class);
  17. job.setInputFormatClass(AvroKeyInputFormat.class);
  18. AvroJob.setInputKeySchema(job, Blob.SCHEMA$);
  19. job.setOutputKeyClass(LongWritable.class);
  20. input = Blob.newBuilder()
  21. .setValue("abc")
  22. .build();
  23. mapDriver
  24. .withInput(new AvroKey<Blob>(input), NullWritable.get())
  25. .withOutput(new LongWritable(0), NullWritable.get())
  26. .runTest();
  27. }

我对avro和mrunit都很陌生,所以我仍在努力完全理解它们之间的工作原理。在单元测试输出中,我看到了关于log4j的警告,并且不确定这不是问题的一部分(尽管我对此表示怀疑)。

fhg3lkii

fhg3lkii1#

试试这个;虽然这是reflectionutil的错误,但是外部框架是关于序列化的,您没有实现可写的。所以我认为这可能是关于avro序列化设置不正确。

  1. MapDriver driver = MapDriver.newMapDriver(your mapper);
  2. Configuration conf = driver.getConfiguration();
  3. AvroSerialization.addToConfiguration(conf);
  4. AvroSerialization.setKeyWriterSchema(conf, your schema);
  5. AvroSerialization.setKeyReaderSchema(conf, your schema);
  6. Job job = new Job(conf);
  7. job.set... your job settings;
  8. AvroJob.set... your avro job settings;

相关问题