我正在尝试使用avro模式(avro-backed)将csv数据序列化为parquet格式&再次将其读入配置单元表。
正在使用以下示例代码段(用于序列化单个记录的示例代码)成功地序列化此记录:
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MesspidType;
public class AvroParquetConverter {
public static void main(String[] args) throws IOException {
Schema avroSchema = new Schema.Parser().parse(new File("schema.avsc"));
GenericRecord myrecord = new GenericData.Record(avroSchema);
String outputFilename = "/home/jai/sample1000-snappy.parquet";
Path outputPath = new Path(outputFilename);
MesspidType parquetSchema = new AvroSchemaConverter()
.convert(avroSchema);
AvroWriteSupport writeSupport = new AvroWriteSupport(parquetSchema,
avroSchema);
CompressionCodecName compressionCodecSnappy = CompressionCodecName.SNAPPY;
int blockSize = 256 * 1024 * 1024;
int ppidSize = 64 * 1024;
ParquetWriter parquetWriterSnappy = new ParquetWriter(outputPath,
writeSupport, compressionCodecSnappy, blockSize, ppidSize);
BigDecimal bd = new BigDecimal(20);
GenericRecord myrecordTemp = new GenericData.Record(avroSchema);
myrecord.put("name", "Abhijeet1");
myrecord.put("pid", 20);
myrecord.put("favorite_number", 22);
String bd1 = "13.5";
BigDecimal bdecimal = new BigDecimal(bd1);
bdecimal.setScale(15, 6);
BigInteger bi = bdecimal.unscaledValue();
byte[] barray = bi.toByteArray();
ByteBuffer byteBuffer = ByteBuffer.allocate(barray.length);
byteBuffer.put(barray);
byteBuffer.rewind();
myrecord.put("price", byteBuffer);
parquetWriterSnappy.write(myrecord);
parquetWriterSnappy.close();
}
}
还可以使用以下语句完成十进制到字节缓冲区的转换:
ByteBuffer.wrap(bdecimal.unscaledValue().toByteArray());
下面是avro模式文件
{
"namespace": "avropoc",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string", "default" : "null"},
{"name": "favorite_number", "type": "int", "default": 0 },
{"name": "pid", "type":"int", "default" : 0 },
{"name": "price", "type": {"type" : "bytes","logicalType":"decimal","precision":15,"scale":6}, "default" : 0 }
]
}
还尝试对架构进行以下修改:
{"name": "price", "type": "bytes","logicalType":"decimal","precision":15,"scale":6, "default" : 0 }
我正在创建配置单元表,如下所示:
create external table avroparquet1
( name string, favorite_number int,
pid int, price DECIMAL(15,6))
STORED AS PARQUET;
但是,当我运行十进制字段价格查询时,会收到以下错误消息:
失败,出现异常java.io.ioexception:org.apache.hadoop.hive.ql.metadata.hiveexception:java.lang.classcastexception:org.apache.hadoop.io.byteswritable不能强制转换为org.apache.hadoop.hive.serde2.io.hivedecimalwritable
这看起来像是parquet/avro/hive相关的问题,它不能反序列化小数,在avro的情况下,小数需要写成bytebuffer。
我已经在avro 1.8.0、parquet 1.8.1和hive 1.1.0上试过了。
任何帮助都将不胜感激。
1条答案
按热度按时间vulvrdjw1#
hive为十进制(22,7)生成的实际模式——使用linkedin实用程序派生的一些代码来检查实际的Parquet文件——看起来像。。。
Parquet语法:
optional fixed_len_byte_array(10) my_dec_22_7;
avro语法:{ "name":"my_dec_22_7","type":["null",{"type":"fixed", "name":"my_dec_22_7","size":10} ], "default":null }
…其中10似乎是转储文件所需的字节数BigInteger
将22位数字转换为byte[]
. 查看avroserdeutils源代码及其转储HiveDecimal
,例如。也就是说,我真的不知道如何读/写Parquet文件中的十进制数。double和bigint更易于处理,因为它们有ieee标准类型(以及avro标准类型和parquet标准类型)支持。