我从dynamo db读取数据并将其存储在Spark
Dataset
中,如下所示:
// Building a dataset
Dataset citations = sparkSession.read()
.option("tableName", "Covid19Citation")
.option("region", "eu-west-1")
.format("dynamodb")
.load();
我想要的是根据行数分割这个数据集。
例如,如果数据集有超过500行,我想拆分它,并将每个数据集保存为单独的csv文件。因此,我想保存的每个数据集最多应该有500行。如果数据库中有1600行,输出应该是四个xml文件:
第一个xml文件包含500行
第二个xml文件也包含500行
第三个xml文件也包含500行,最后是
第四个xml文件,包含100行。
这是我目前为止尝试过的,但它不起作用:
List<Dataset> datasets = new ArrayList<>();
while (citations.count() > 0) {
Dataset splitted = citations.limit(400);
datasets.add(splitted);
citations = citations.except(splitted);
}
System.out.println("datasets : " + datasets.size());
for (Dataset d : datasets) {
code
d.coalesce(1)
.write()
.format("com.databricks.spark.xml")
.option("rootTag", "citations")
.option("rowTag", "citation")
.mode("overwrite")
.save("s3a://someoutputfolder/");
}
任何帮助都将不胜感激。
谢谢
1条答案
按热度按时间vfh0ocws1#
您可以利用:
row_number
和mod
:将数据集拆分为500个部分repartition
:为每个分区生成一个文件partitionBy
:为每个分区写入一个xml这里是scala / parquet中的一个例子(但是你也可以使用
xml
)