如何在java中用分区将avro对象写入parquet?如何将数据附加到同一Parquet地板?

b0zn9rqh  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(332)

我正在使用confluent的kafkaavroderserializer反序列化通过kafka发送的avro对象。我想把收到的数据写入Parquet文件。我希望能够将数据附加到同一个Parquet地板上,并创建带有分区的Parquet地板。
我设法用avroparquetwriter创建了一个Parquet地板-但我没有找到如何添加分区或附加到同一个文件:
在使用avro之前,我使用spark来编写parquet——spark编写带有分区的parquet,使用append模式很简单——我是否应该尝试从avro对象创建rdd并使用spark来创建parquet?

myss37ts

myss37ts1#

我想把Parquet写给hdfs
就我个人而言,我不会用spark来做这个。
我宁愿使用hdfsKafka连接器。这是一个可以让你开始的配置文件。

name=hdfs-sink

# List of topics to read

topics=test_hdfs

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

# increase to be the sum of the partitions for all connected topics

tasks.max=1 

# the folder where core-site.xml and hdfs-site.xml exist

hadoop.conf.dir=/etc/hadoop

# the namenode url, defined as fs.defaultFS in the core-site.xml

hdfs.url=hdfs://hdfs-namenode.example.com:9000

# number of messages per file

flush.size=10 

# The format to write the message values

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

# Setup Avro parser

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry.example.com:8081
value.converter.schemas.enable=true
schema.compatibility=BACKWARD

如果您希望hdfs分区基于一个字段而不是文字上的“kafka partition”编号,那么请参阅 FieldPartitioner . 如果你想自动集成Hive,请参阅文件,以及。
假设您确实想使用spark,但是,您可以尝试absaoss/abris在avroDataframe中读取,然后您应该能够执行以下操作 df.write.format("parquet").path("/some/path") (不是精确的代码,因为我没有试过)

相关问题