我有一个Dataframe df
```
col1, col2, date
a1, b1, 2020-04-01
a2, b2, 2020-04-02
a3, b3, 2020-04-03
我想把每个日期写进s3中各自的位置。我收集 `date` 作为一个 `List[String]` 然后循环遍历每个值以过滤df并写出。
val dateStr = df.select(col(date)).distinct.collect().toList.map(x => x(0).toString)
dateStr.foreach { d =>
val dateModified = d.replaceAll("-","/")
inputDf
.filter(inputDf(incrementIdentifierCol) === d)
.write.parquet(s"s3://bucket/$dateModified")
}
有办法并行化吗 `dateStr` 在Dataframe上过滤并写入而不是逐个进行?
我知道我能做到
df.partitionBy("date").write.parquet("s3://bucket/")
但我不想把地点 `s3://bucket/date=2020-04-01` . 我希望是这样 `s3://bucket/2020/04/01` 所以我收集并运行foreach。
1条答案
按热度按时间apeeds0o1#
在循环中执行写操作不是最好的选择,因为spark将为每个写操作创建一个单独的阶段。这意味着,在写入时发生故障的情况下,很难确定加载了哪些数据以及没有加载哪些数据,特别是当加载的是增量数据时。所以我建议在一个写作阶段内完成所有的事情。
partitionBy
方法可以使用多个分区名称作为参数。在要使用的Dataframe中再添加三列(年、月、日)withColumn
方法。硬编码的年、月、日值可以替换为
date
列。