python—通过先读取Parquet文件,将新列附加到Dataframe

mxg2im7a  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(427)

假设有一个包含多列的Dataframe,看起来是这样的(我省略了不必要的列):

+----------------------------------------+
|path                                    |
+----------------------------------------+
|/tmp/some_folder/2020-04-02/blabla1.parq|
|/tmp/some_folder/2020-05-14/bla2bla.parq|
+----------------------------------------+

其中path是hdfs中的某个Parquet文件,它只有一行,结构如下:

+-----------+
|value      |
+-----------+
|some value |
+-----------+

如何读取这些文件并向初始Dataframe中添加列(“值”)呢?因此,我想要一个这样的结构:

+----------------------------------------+----------+
|path                                    |value     |
+----------------------------------------+----------+
|/tmp/some_folder/2020-04-02/blabla1.parq|some value|
|/tmp/some_folder/2020-05-14/bla2bla.parq|bla blah  |
+----------------------------------------+----------+

例如,我可以将“path”列转换为一个列表,通过迭代将其读入datframes并与初始dataframe连接。还有其他解决办法吗?最好是更快的性能。

q3aa0525

q3aa05251#

您可以使用 input_file_name() 以便 path 将被添加到Dataframe。 Example: ```
from pyspark.sql.functions import *
from pyspark.sql.types import *

paths=df.select("path").rdd.map(lambda x:x[0]).collect()

schema will the fields

sch=StructType([StructField("path",StringType()),StructField("value",StringType())])
final_df=spark.createDataFrame([],schema)

for path in paths:
final_df=spark.read.parquet(path).withColumn("path",input_file_name())

dataframe will have path and value to it

final_df.show()

1cklez4t

1cklez4t2#

我通过一次读取多个Parquet文件解决了这个问题: spark.read.parquet(f"/tmp/some_folder/{2020-04-02/blabla1.parq, 2020-05-14/bla2bla.parq}") 然后使用input\u file\u name()添加“path”列。

相关问题