spark在s3上的分区内创建分区

kr98yfug  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(503)

我有以下制表符分隔的示例数据集:

col1  period  col3  col4  col5  col6  col7  col8  col9  col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22
ASSDF 202001  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202002  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202003  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202004  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
...
...
ASSDF 202312  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99

我正在这个数据上运行一些转换,最终的数据在spark数据集中 "DS1" . 之后,我用“period”分区将数据集写入s3。因为我也想在s3文件中使用period,所以我正在从period列创建另一列“datasetperiod”。
我的scala函数保存tsv数据集。

def saveTsvDataset(dataframe: DataFrame, outputFullPath: String, numPartitions: Integer, partitionCols: String*): Unit = {
    dataframe
      .repartition(numPartitions)
      .write
      .partitionBy(partitionCols:_*)
      .mode(SaveMode.Overwrite)
      .option("sep", "\t")
      .csv(outputFullPath)
  }

在s3上保存数据集的scala代码。为s3上的分区添加新的列datasetperiod。

saveTsvDataset(
      DS1.withColumn("datasetPeriod",$"period")
      , "s3://s3_path"
      , 100
      , "period"
    )

现在,我的问题是,我有一个从202001到202312的周期,当我在s3上写“datasetperiod”上的分区时,有时它会在分区内为任何随机周期创建分区。所以这在任何时期都是随机发生的。我从来没有见过这种情况发生过多次。它创建了一个类似路径的 "s3://s3_path/datasetPeriod=202008/datasetPeriod=202008" .

blpfk2vs

blpfk2vs1#

你已经有一个 period 列。所以不需要再创建一个新的副本 datasetPeriod 列。
将Dataframe写入 s3://../parentFolder 使用 .partitionBy("period") 它创建如下文件夹:

df.write.partitionBy("period").csv("s3://../parentFolder/")
s3://.../parentFolder/period=202001/
s3://.../parentFolder/period=202002/
s3://.../parentFolder/period=202003/
...
s3://.../parentFolder/period=202312/

在读回数据时,只需提到 parentFolder 只会自动读取 period 作为一列。

val df = spark.read.csv("s3://../parentFolder/")
//df.schema will give you `period` as one of the column
df.printSchema
root
 |-- col1: string (nullable = true)
 |-- .... //other columns go here
 |-- period: string (nullable = true)

也就是说,无论您得到的分区列中有多个分区,都是由于在使用partitionby写入数据时使用了错误的路径。

相关问题