spark并行化要写入的字符串列表

afdcj2ne  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(335)

我有一个Dataframe df ```
col1, col2, date
a1, b1, 2020-04-01
a2, b2, 2020-04-02
a3, b3, 2020-04-03

我想把每个日期写进s3中各自的位置。我收集 `date` 作为一个 `List[String]` 然后循环遍历每个值以过滤df并写出。

val dateStr = df.select(col(date)).distinct.collect().toList.map(x => x(0).toString)
dateStr.foreach { d =>
val dateModified = d.replaceAll("-","/")
inputDf
.filter(inputDf(incrementIdentifierCol) === d)
.write.parquet(s"s3://bucket/$dateModified")
}

有办法并行化吗 `dateStr` 在Dataframe上过滤并写入而不是逐个进行?
我知道我能做到

df.partitionBy("date").write.parquet("s3://bucket/")

但我不想把地点 `s3://bucket/date=2020-04-01` . 我希望是这样 `s3://bucket/2020/04/01` 所以我收集并运行foreach。
apeeds0o

apeeds0o1#

在循环中执行写操作不是最好的选择,因为spark将为每个写操作创建一个单独的阶段。这意味着,在写入时发生故障的情况下,很难确定加载了哪些数据以及没有加载哪些数据,特别是当加载的是增量数据时。所以我建议在一个写作阶段内完成所有的事情。 partitionBy 方法可以使用多个分区名称作为参数。在要使用的Dataframe中再添加三列(年、月、日) withColumn 方法。

df
    .withColumn("year", lit("2020"))
    .withColumn("month", lit("06"))
    .withColumn("day", lit("19"))
    .write
    .partitionBy("year", "month", "day")
    .parquet("s3://bucket/" )

硬编码的年、月、日值可以替换为 date 列。

相关问题