java.lang.nullpointerexception自定义密钥序列化错误

wxclj1h5  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(336)

我正在尝试使用mrunit测试一个简单的mapreduce项目。我为mapdriver设置输入,然后调用 mapDriver.runTest() (我也试过 mapDriver.run() 但会产生相同的错误)。
我编写了一个自定义密钥,它重载 write(DataOutput out) , readFields(DataInput in) 以及 compareTo(...) 方法。调试时,键使用 write(DataOutput out) . 但是,在钥匙 readFields(DataInput in) 方法(该方法正确检索以前使用 write(DataOutput out) )完成后,将抛出下面的错误。
我在这里搜索了类似的帖子,并试图覆盖 hashCode() 以及 equals() 方法无效。mrunit在使用自定义键时是否需要重写其他方法?这篇文章最类似于序列化中带有avro nullpointerexception的mrunit。但是,我没有使用avro,据我所知,我使用的是默认序列化。干杯!

java.lang.NullPointerException
at org.apache.hadoop.mrunit.Serialization.copy(Serialization.java:61)
at org.apache.hadoop.mrunit.Serialization.copy(Serialization.java:81)
at org.apache.hadoop.mrunit.mapreduce.mock.MockContextWrapper$4.answer(MockContextWrapper.java:78)
at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:31)
at org.mockito.internal.MockHandler.handle(MockHandler.java:97)
at org.mockito.internal.creation.MethodInterceptorFilter.intercept(MethodInterceptorFilter.java:47)
at org.apache.hadoop.mapreduce.Mapper$Context$$EnhancerByMockitoWithCGLIB$$f555e120.write(<generated>)
at model.RMSEEvaluation$Mapper.map(RMSEEvaluation.java:57)
at model.RMSEEvaluation$Mapper.map(RMSEEvaluation.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mrunit.mapreduce.MapDriver.run(MapDriver.java:221)
at org.apache.hadoop.mrunit.MapDriverBase.runTest(MapDriverBase.java:150)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:137)
at test.TestRMSEEvaluation.testSetValues(TestRMSEEvaluation.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at junit.framework.TestCase.runTest(TestCase.java:168)
at junit.framework.TestCase.runBare(TestCase.java:134)
at junit.framework.TestResult$1.protect(TestResult.java:110)
at junit.framework.TestResult.runProtected(TestResult.java:128)
at junit.framework.TestResult.run(TestResult.java:113)
at junit.framework.TestCase.run(TestCase.java:124)
at junit.framework.TestSuite.runTest(TestSuite.java:243)
at junit.framework.TestSuite.run(TestSuite.java:238)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:83)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
koaltpgm

koaltpgm1#

我已经找到了解决这个错误的办法。错误是因为未在中设置序列化类型 Configuration 为了 MapDriver mapDriver . 我必须使用以下命令显式设置序列化:

Configuration conf = new Configuration();
conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization," 
            + "org.apache.hadoop.io.serializer.WritableSerialization");
mapDriver.setConfiguration(conf);

希望这能帮助任何有类似问题的人!

bxgwgixi

bxgwgixi2#

首先,值得测试序列化/反序列化是否真的如预期的那样工作。
在不知道如何编写测试的情况下,下面这个简单的测试在mrunit 0.9.0-cubuating和junit 4.10中运行良好:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.Assert;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

public class TestCustom {

    private MapDriver<CustomRecord, Text, CustomRecord, Text> mapDriver;

    private Mapper<CustomRecord, Text, CustomRecord, Text> map = 
        new Mapper<CustomRecord, Text, CustomRecord, Text>();

    private Reducer<CustomRecord, Text, CustomRecord, Text> reduce = 
        new Reducer<CustomRecord, Text, CustomRecord, Text>();

    private ReduceDriver<CustomRecord, Text, CustomRecord, Text> reduceDriver
        = ReduceDriver.newReduceDriver(reduce);

    private MapReduceDriver<CustomRecord, Text, CustomRecord, 
      Text, CustomRecord, Text> mapReduceDriver;

    private Configuration conf = new Configuration();

    //test data
    private Pair<CustomRecord, Text> data;

    //shuffled and sorted data
    private static List<Pair<CustomRecord, List<Text>>> shuffledData; 

    @Before
    public void init() {

        mapDriver = MapDriver.newMapDriver(map);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(map, reduce);
        mapDriver.withConfiguration(conf);
        initData();
    }

    private void initData() {

        CustomRecord key = new CustomRecord("first", 1);
        Text value = new Text("key1");
        data = new Pair<CustomRecord, Text>(key, value);
    }

    @Test
    public void testMapper() throws IOException {

        mapDriver.withInput(data);
        //expected output result
        mapDriver.withOutput(data); 
        mapDriver.runTest(true);

        //shuffle and sort
        List<Pair<CustomRecord, Text>> pairs = 
          new ArrayList<Pair<CustomRecord, Text>>();
        pairs.add(data);

        shuffledData = mapReduceDriver.shuffle(pairs);

    }

    @Test
    public void testReducer() throws IOException {

        // feed input to one single reduce call
        Pair<CustomRecord, List<Text>> pair = shuffledData.get(0);
        reduceDriver.withInput(pair.getFirst(), pair.getSecond());

        //reducer's output
        List<Pair<CustomRecord, Text>> result = reduceDriver.run();

        Assert.assertEquals("Key mismatch!", 
          data.getFirst(), result.get(0).getFirst());
        Assert.assertEquals("Value mismatch!", 
          data.getSecond(), result.get(0).getSecond());
    }
}

它使用自定义可写键(customrecord)测试身份Map器和reducer。
注意,该键实现了writeablecomparable,并重写hashcode和equals。

相关问题