parquet编写器:org.apache.parquet.io.parquetencodingexception:编写空页

vof42yt1  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(371)

我正在使用ApacheParquetHadoop-parquetrecordwriter和mapreduce和hit ParquetEncodingException: writing empty page . 尽管我发现,当valuecount为0时,columnwriterbase中会发生这种情况,但我不知道为什么这个属性为0,为什么它与endoding有关,以及这种状态是如何发生的?你知道吗?谢谢你给我小费。

Error: org.apache.parquet.io.ParquetEncodingException: writing empty page
    at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:309)
    at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)

版本:org.apache。parquet:parquet-hadoop:1.11.0
我正在使用我自己的writesupport类:

public class MyDataWriteSupport extends WriteSupport<MyData> {

  private RecordConsumer recordConsumer;

  public MyDataWriteSupport() {}

  @Override
  public WriteContext init(Configuration configuration) {
    Map<String, String> metaData = new HashMap<>();
    return new WriteContext(getSchema(), metaData);
  }

  public static MessageType getSchema() {
    return MessageTypeParser.parseMessageType(
        " message MyData { "
            + "optional binary key (UTF8);"
            + "optional int64 length;"
            + "repeated int32 datarray;"
            + "repeated group myobj {\n"
            + "   optional int32 id;"
            + "   optional binary title (UTF8);"
            + "}"
            + " }");
  }

  @Override
  public void prepareForWrite(RecordConsumer recordConsumer) {
    this.recordConsumer = recordConsumer;
  }

  @Override
  public void write(MyData record) {
    recordConsumer.startMessage();
    writeData(record);
    recordConsumer.endMessage();
  }

  private void writeData(MyData record) {
    recordConsumer.startMessage();

    addStringValue(recordConsumer, 0, "key", record.getKey());
    addLongValue(recordConsumer, 1, "length", record.getLength());
    addIntegerArrayValues(recordConsumer, 2, "datarray", record.getDataArray());

    if (!record.getMyObjects().isEmpty()) {
      recordConsumer.startField("myobj", 3);
      record
          .getMyObject()
          .forEach(
              obj -> {
                recordConsumer.startGroup();
                addIntValue(recordConsumer, 0, "id", obj.id);
                addStringValue(recordConsumer, 1, "title", obj.title);
                recordConsumer.endGroup();
              });
      recordConsumer.endField("myobj", 3);
    }
    recordConsumer.endMessage();
  }

  private void addIntValue(RecordConsumer recordConsumer, int index, String fieldName, int value) {
    recordConsumer.startField(fieldName, index);
    recordConsumer.addInteger(value);
    recordConsumer.endField(fieldName, index);
  }

  private static void addIntegerArrayValues(
      RecordConsumer recordConsumer, int index, String fieldName, int[] is) {
    if (is.length > 0) {
      recordConsumer.startField(fieldName, index);
      Arrays.stream(is).forEach(labelIndex -> recordConsumer.addInteger(labelIndex));
      recordConsumer.endField(fieldName, index);
    }
  }

  private static void addLongValue(
      RecordConsumer recordConsumer, int index, String fieldName, long value) {
    recordConsumer.startField(fieldName, index);
    recordConsumer.addLong(value);
    recordConsumer.endField(fieldName, index);
  }

  private static void addStringValue(
      RecordConsumer recordConsumer, int index, String fieldName, String value) {
    recordConsumer.startField(fieldName, index);
    recordConsumer.addBinary(Binary.fromString(value));
    recordConsumer.endField(fieldName, index);
  }
}
qvtsj1bj

qvtsj1bj1#

我想问题出在开始/结束通话上。一个问题是 startMessage() 以及 endMessage() 两次调用一次 write(MyData) 又一次在 writeData(MyData) . 我建议使用 ValidatingRecordConsumer 作为您使用的recordconsumer的 Package 。这样,如果记录序列化出现问题,您可能会得到更有意义的异常。

相关问题