hadoop+jackson解析:objectmapper读取对象然后中断

zkure5ic  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(623)

我正在用jackson在hadoop中实现一个json记录阅读器。现在我正在用junit+mrunit进行本地测试。json文件每个都包含一个对象,在一些头之后,它有一个字段,其值是一个条目数组,我希望每个条目都被理解为一个记录(因此我需要跳过那些头)。
我可以通过将fsdatainputstream提升到读取点来实现这一点。在本地测试中,我执行以下操作:

  1. fs = FileSystem.get(new Configuration());
  2. in = fs.open(new Path(filename));
  3. long offset = getOffset(in, "HEADER_START_HERE");
  4. in.seek(offset);

其中getoffset是一个函数,它将inputstream指向字段值开始的位置,如果我们看一下 in.getPos() 价值观。
我正在读第一张唱片:

  1. ObjectMapper mapper = new ObjectMapper();
  2. JsonNode actualObj = mapper.readValue (in, JsonNode.class);

第一张唱片还不错。我可以用 mapper.writeValueAsString(actualObj) 它读得很好,它是有效的。
在这之前没问题。
所以我尝试迭代对象,方法是:

  1. ObjectMapper mapper = new ObjectMapper();
  2. JsonNode actualObj = null;
  3. do {
  4. actualObj = mapper.readValue (in, JsonNode.class);
  5. if( actualObj != null) {
  6. LOG.info("ELEMENT:\n" + mapper.writeValueAsString(actualObj) );
  7. }
  8. } while (actualObj != null) ;

它读到了第一个,但后来坏了:

  1. java.lang.NullPointerException: null
  2. at org.apache.hadoop.fs.BufferedFSInputStream.getPos(BufferedFSInputStream.java:54)
  3. at org.apache.hadoop.fs.FSDataInputStream.getPos(FSDataInputStream.java:57)
  4. at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:243)
  5. at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:273)
  6. at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:225)
  7. at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:193)
  8. at java.io.DataInputStream.read(DataInputStream.java:132)
  9. at org.codehaus.jackson.impl.ByteSourceBootstrapper.ensureLoaded(ByteSourceBootstrapper.java:340)
  10. at org.codehaus.jackson.impl.ByteSourceBootstrapper.detectEncoding(ByteSourceBootstrapper.java:116)
  11. at org.codehaus.jackson.impl.ByteSourceBootstrapper.constructParser(ByteSourceBootstrapper.java:197)
  12. at org.codehaus.jackson.JsonFactory._createJsonParser(JsonFactory.java:503)
  13. at org.codehaus.jackson.JsonFactory.createJsonParser(JsonFactory.java:365)
  14. at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1158)

为什么会发生这种异常?
这和在当地读书有关系吗?
在重复使用一个程序时是否需要某种重置或其他什么 ObjectMapper 或者它的底层流?

z8dt9xmd

z8dt9xmd1#

我设法解决了这个问题。如果有帮助:
首先,我使用的是jackson1.x的最新版本。好像有一次 JsonParserInputStream ,它控制了它。所以,使用 readValue() ,一旦它被读取(内部调用 _readMapAndClose() 自动关闭流。您可以设置一个设置来告诉 JsonParser 不关闭底层流。你可以把它传给你的朋友 JsonFactory 在你创建你的 JsonParser :

  1. JsonFactory f = new MappingJsonFactory();
  2. f.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);

请注意,您负责关闭流(在我的情况下是fsdatainputstream)。所以,答案是:
为什么会发生这种异常?
因为解析器管理流,并在readvalue()之后关闭它。
这和在当地读书有关系吗?

重用objectmapper或其底层流时是否需要某种重置或其他操作?
不需要。当使用流式api与类似objectmapper的方法混合使用时,您需要注意的是,有时Map器/解析器可能会控制底层流。请参阅jsonparser的javadoc,并检查每个读取方法的文档以满足您的需要。

相关问题