如何在pysparkDataframe读取方法中包含分区列

u0sqgete  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(429)

我写的avro文件的基础上Parquet文件。我阅读了以下文件:

读取数据

dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
    .load("/Users/rashmik/flight-time.parquet")

写入数据

我以avro格式编写了以下文件:

dfParquetRePartitioned.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "datasink/avro") \
    .partitionBy("OP_CARRIER") \
    .option("maxRecordsPerFile", 100000) \
    .save()

正如所料,我得到了按 OP_CARRIER .

从特定分区读取avro分区数据

在另一个作业中,我需要从上述作业的输出读取数据,即从 datasink/avro 目录。我使用下面的代码来读取 datasink/avro ```
dfAvro = spark.read.format("avro")
.option("mode","FAILFAST")
.load("datasink/avro/OP_CARRIER=AA")

它成功地读取了数据,但正如预期的那样 `OP_CARRIER` 列在中不可用 `dfAvro` Dataframe,因为它是第一个作业的分区列。现在我的要求是包括 `OP_CARRIER` 字段也在第二个Dataframe中,即 `dfAvro` . 有人能帮我吗?
我参考的是spark文档中的文档,但我无法找到相关信息。任何指针都会非常有用。
a7qyws3x

a7qyws3x1#

使用不同的别名复制相同的列值。

dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) \
.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()

这会给你想要的。但化名不同。或者你也可以在阅读的时候做。如果位置是动态的,那么可以很容易地附加列。

path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=") 
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load(path).withColumn(newcol[0], lit(newcol[1]))

如果值是静态的,那么在数据读取期间添加它就更容易了。

相关问题