如何将avro中的java bytes列(logicaltype为decimal)转换为float/number格式并写入parquet

cclgggtu  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(451)

我想从hdfs读取一个avro文件,并将其作为parquet重新连接到hdfs。问题是有些字段有typ字节和逻辑类型decimal。我想wirte作为浮点数或最佳拟合的数字格式,以使用与配置单元Parquet文件或可以很容易地使用Spark功能,如总和的列。
avro模式示例:

  1. {
  2. "name" : "Field1",
  3. "type" : [ "null", {
  4. "type" : "bytes",
  5. "scale" : 0,
  6. "precision" : 64,
  7. "connect.version" : 1,
  8. "connect.parameters" : {
  9. "scale" : "0"
  10. },
  11. "connect.name" : "org.apache.kafka.connect.data.Decimal",
  12. "logicalType" : "decimal"
  13. } ],
  14. "default" : null
  15. },

读取avro和写入parquet的代码:

  1. SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local[*]")
  2. .set("dfs.client.use.datanode.hostname", "true")
  3. .set("spark.hadoop.fs.default.name", "hdfs://sandbox-hdp.hortonworks.com:8020")
  4. .set("spark.hadoop.fs.defaultFS", "hdfs://sandbox-hdp.hortonworks.com:8020")
  5. .set("spark.hadoop.fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName())
  6. .set("spark.hadoop.fs.hdfs.server", org.apache.hadoop.hdfs.server.namenode.NameNode.class.getName())
  7. .set("spark.hadoop.conf", org.apache.hadoop.hdfs.HdfsConfiguration.class.getName()));
  8. SparkSession spark = SparkSession
  9. .builder()
  10. .sparkContext(context)
  11. .enableHiveSupport()
  12. .getOrCreate();
  13. Dataset<Row> df = spark.read().format("avro")
  14. .load("/tmp/partition=0/tmp.avro");
  15. df.show();
  16. df.printSchema();
  17. df.write().mode(SaveMode.Overwrite)
  18. .parquet("/tmp/parquet/test.parquet");

我读了很多关于对话的文章,但没有什么真正有效的。有人有一个简单的方法来转换avro类型字节和逻辑类型十进制的数字格式,如浮点数?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题