我想从hdfs读取一个avro文件,并将其作为parquet重新连接到hdfs。问题是有些字段有typ字节和逻辑类型decimal。我想wirte作为浮点数或最佳拟合的数字格式,以使用与配置单元Parquet文件或可以很容易地使用Spark功能,如总和的列。
avro模式示例:
{
"name" : "Field1",
"type" : [ "null", {
"type" : "bytes",
"scale" : 0,
"precision" : 64,
"connect.version" : 1,
"connect.parameters" : {
"scale" : "0"
},
"connect.name" : "org.apache.kafka.connect.data.Decimal",
"logicalType" : "decimal"
} ],
"default" : null
},
读取avro和写入parquet的代码:
SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local[*]")
.set("dfs.client.use.datanode.hostname", "true")
.set("spark.hadoop.fs.default.name", "hdfs://sandbox-hdp.hortonworks.com:8020")
.set("spark.hadoop.fs.defaultFS", "hdfs://sandbox-hdp.hortonworks.com:8020")
.set("spark.hadoop.fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName())
.set("spark.hadoop.fs.hdfs.server", org.apache.hadoop.hdfs.server.namenode.NameNode.class.getName())
.set("spark.hadoop.conf", org.apache.hadoop.hdfs.HdfsConfiguration.class.getName()));
SparkSession spark = SparkSession
.builder()
.sparkContext(context)
.enableHiveSupport()
.getOrCreate();
Dataset<Row> df = spark.read().format("avro")
.load("/tmp/partition=0/tmp.avro");
df.show();
df.printSchema();
df.write().mode(SaveMode.Overwrite)
.parquet("/tmp/parquet/test.parquet");
我读了很多关于对话的文章,但没有什么真正有效的。有人有一个简单的方法来转换avro类型字节和逻辑类型十进制的数字格式,如浮点数?
暂无答案!
目前还没有任何答案,快来回答吧!