Apache Spark 如何使用纯Java生成Parquet文件(包括日期和December类型)并上传到S3 [Windows](无HDFS)

fnatzsnv  于 2023-11-21  发布在  Apache
关注(0)|答案(2)|浏览(211)

我最近有一个需求,我需要生成Parquet文件,可以通过Apache Spark读取只使用Java(不使用额外的软件安装,如:Apache Drill,Hive,Spark等)。文件需要保存到S3,所以我将分享如何做到这两点的细节。
没有简单的指南来指导如何做到这一点。我也不是Java程序员,所以使用Maven,Hadoop等的概念对我来说都是陌生的。所以我花了将近两周的时间来完成这一工作。我想在下面分享我的个人指南,我是如何实现这一点的

093gszye

093gszye1#

我将使用NetBeans作为IDE。
关于Java中的 parquet 的一些信息(对于像我这样的新手来说):

  • 为了将数据序列化到parquet中,您必须选择流行的Java数据序列化框架之一:Avro,Protocol Buffers或Thrift(我将使用Avro(1.8.0),从我们的parquet-avro依赖中可以看出)
  • 您需要使用支持Maven的IDE。这是因为上面的依赖项有很多自己的依赖项。Maven将自动为您下载这些依赖项(如NuGet for VisualStudio)
    先决条件:

你必须在运行Java代码的Windows机器上安装Hadoop。好消息是你不需要安装整个Hadoop软件,而只需要两个文件:

  • hadoop.dll
  • winutils.exe

这些文件可以下载到here。对于这个例子,你需要2.8.1版本(由于parquet-avro 1.9.0)。
1.将这些文件复制到目标计算机上的C:\hadoop-2.8.1\bin
1.添加一个名为HADOOP_HOME的新系统变量(不是用户变量),其值为C:\hadoop-2.8.1
x1c 0d1x的数据
1.修改SystemPath变量(不是user变量),并在末尾添加以下内容:%HADOOP_HOME%\bin
1.重新启动计算机以使更改生效。
如果这个配置没有正确完成,你将在运行时得到以下错误:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

编码入门:

  • 首先创建一个新的空Maven项目,并添加parquet-avro 1.9.0和hadoop-aws 2.8.2作为依赖项:

  • 创建你的主类,在那里你可以写一些代码

  • 第一件事是你需要生成一个Schema。现在据我所知,你没有办法在运行时以编程方式生成一个Schema。Schema.Parserclass' parse() 方法只接受一个文件或一个字符串作为参数,并且一旦模式被创建,就不允许你修改它。为了避免这个问题,我在运行时生成我的模式JSON并解析它。是一个示例Schema:

String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
    + "\"type\": \"record\"," //Must be set as record
    + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
    + "\"fields\": ["
    + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
    + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
    + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
    + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
    + " ]}";
Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);

字符串
Avro架构的详细信息可以在这里找到:https://avro.apache.org/docs/1.8.0/spec.html

  • 接下来我们可以开始生成记录(Avro基本类型很简单):
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");

  • 为了生成十进制逻辑类型,必须使用fixedbytes基元类型作为存储的实际数据类型。当前的Parquet格式仅支持固定长度字节数组(又名:fixed_len_byte_array)。因此,我们在本例中也必须使用fixed(如图所示)在Java中,我们必须使用BigDecimal来真正处理小数。我已经确定,无论值如何,Decimal(32,4)都不会超过16个字节。因此,我们将使用标准字节。在我们下面的序列化中(以及上面的模式中),数组大小为16:
BigDecimal myDecimalValue = new BigDecimal("99.9999");

//First we need to make sure the BigDecimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);

//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();

//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();

//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));

byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {            
    //Because we set our fixed byte array size as 16 bytes, we need to
    //pad-left our original value's bytes with zeros
    int myDecimalBufferIndex = myDecimalBuffer.length - 1;
    for(int i = decimalBytes.length - 1; i >= 0; i--){
        myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
        myDecimalBufferIndex--;
    }
    //Save result
    fixed.bytes(myDecimalBuffer);
} else {
    throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}

//We can finally write our decimal to our record
record.put("myDecimal", fixed);

  • 对于Date值,Avro指定我们需要将自EPOCH以来的天数保存为整数。(如果您还需要时间组件,例如实际的DateTime类型,则需要使用TimestampAvro类型,我发现获得epoch以来的天数的最简单方法是使用joda-time库。如果您添加hadoop-aws你应该已经有了这个库。如果没有,你需要自己添加它:
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);

DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);

//We can write number of days since epoch into the record
record.put("myDate", days.getDays());

  • 我们终于可以像这样开始编写parquet文件了
try {
   Configuration conf = new Configuration();
   conf.set("fs.s3a.access.key", "ACCESSKEY");
   conf.set("fs.s3a.secret.key", "SECRETKEY");
   //Below are some other helpful settings
   //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
   //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
   //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
   //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors

   Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");

   //Use path below to save to local file system instead
   //Path path = new Path("data.parquet");

   try (ParquetWriter writer = AvroParquetWriter.builder(path)
           .withSchema(avroSchema)
           .withCompressionCodec(CompressionCodecName.GZIP)
           .withConf(conf)
           .withPageSize(4 * 1024 * 1024) //For compression
           .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
           .build()) {
       //We only have one record to write in our example
       writer.write(record);
   }
} catch (Exception ex) { ex.printStackTrace(System.out); }

  • 以下是加载到Apache Spark(2.2.0)中的数据:

为了方便起见,完整的源代码:

package com.mycompany.stackoverflow;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.MutableDateTime;

public class Main {
    public static void main(String[] args) {
        System.out.println("Start");

        String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
                + "\"type\": \"record\"," //Must be set as record
                + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
                + "\"fields\": ["
                + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
                + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
                + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
                + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
                + " ]}";

        Schema.Parser parser = new Schema.Parser().setValidate(true);
        Schema avroSchema = parser.parse(schema);

        GenericData.Record record = new GenericData.Record(avroSchema);
        record.put("myInteger", 1);
        record.put("myString", "string value 1");

        BigDecimal myDecimalValue = new BigDecimal("99.9999");

        //First we need to make sure the huge decimal matches our schema scale:
        myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);

        //Next we get the decimal value as one BigInteger (like there was no decimal point)
        BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();

        //Finally we serialize the integer
        byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();

        //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
        GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));

        byte[] myDecimalBuffer = new byte[16];
        if (myDecimalBuffer.length >= decimalBytes.length) {            
            //Because we set our fixed byte array size as 16 bytes, we need to
            //pad-left our original value's bytes with zeros
            int myDecimalBufferIndex = myDecimalBuffer.length - 1;
            for(int i = decimalBytes.length - 1; i >= 0; i--){
                myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
                myDecimalBufferIndex--;
            }

            //Save result
            fixed.bytes(myDecimalBuffer);
        } else {
            throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
        }

        //We can finally write our decimal to our record
        record.put("myDecimal", fixed);

        //Get epoch value
        MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);

        DateTime currentDate = new DateTime(); //Can take Java Date in constructor
        Days days = Days.daysBetween(epoch, currentDate);

        //We can write number of days since epoch into the record
        record.put("myDate", days.getDays());

        try {
           Configuration conf = new Configuration();
           conf.set("fs.s3a.access.key", "ACCESSKEY");
           conf.set("fs.s3a.secret.key", "SECRETKEY");
           //Below are some other helpful settings
           //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
           //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
           //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
           //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.

           Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");

           //Use path below to save to local file system instead
           //Path path = new Path("data.parquet");

           try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
                   .withSchema(avroSchema)
                   .withCompressionCodec(CompressionCodecName.GZIP)
                   .withConf(conf)
                   .withPageSize(4 * 1024 * 1024) //For compression
                   .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
                   .build()) {

               //We only have one record to write in our example
               writer.write(record);
           }
        } catch (Exception ex) { 
            ex.printStackTrace(System.out);
        }
    }
}

gorkyyrv

gorkyyrv2#

我花了一些时间研究这种方法,发现了一种使用DuckDB的替代方法,它在内存中创建一个表,并将其导出到一个parquet文件。
这就是我的工作!

implementation 'org.duckdb:duckdb_jdbc:0.9.2'

字符串
代码示例(尽可能简单):

Connection conn = DriverManager.getConnection("jdbc:duckdb:");
        try(Statement stmt = conn.createStatement()) {
            stmt.execute("CREATE TABLE items (item VARCHAR, value DECIMAL(10, 2), count INTEGER)");
            stmt.execute("INSERT INTO items VALUES ('jeans', 20.0, 1), ('hammer', 42.2, 2)");
            stmt.execute("COPY items TO 'output2.parquet' (FORMAT PARQUET);");
        }


将文件复制到s3应该很容易,但是有一个built-in support in DuckDB as well

相关问题