假设有一个包含多列的Dataframe,看起来是这样的(我省略了不必要的列):
+----------------------------------------+
|path |
+----------------------------------------+
|/tmp/some_folder/2020-04-02/blabla1.parq|
|/tmp/some_folder/2020-05-14/bla2bla.parq|
+----------------------------------------+
其中path是hdfs中的某个Parquet文件,它只有一行,结构如下:
+-----------+
|value |
+-----------+
|some value |
+-----------+
如何读取这些文件并向初始Dataframe中添加列(“值”)呢?因此,我想要一个这样的结构:
+----------------------------------------+----------+
|path |value |
+----------------------------------------+----------+
|/tmp/some_folder/2020-04-02/blabla1.parq|some value|
|/tmp/some_folder/2020-05-14/bla2bla.parq|bla blah |
+----------------------------------------+----------+
例如,我可以将“path”列转换为一个列表,通过迭代将其读入datframes并与初始dataframe连接。还有其他解决办法吗?最好是更快的性能。
2条答案
按热度按时间q3aa05251#
您可以使用
input_file_name()
以便path
将被添加到Dataframe。Example:
```from pyspark.sql.functions import *
from pyspark.sql.types import *
paths=df.select("path").rdd.map(lambda x:x[0]).collect()
schema will the fields
sch=StructType([StructField("path",StringType()),StructField("value",StringType())])
final_df=spark.createDataFrame([],schema)
for path in paths:
final_df=spark.read.parquet(path).withColumn("path",input_file_name())
dataframe will have path and value to it
final_df.show()
1cklez4t2#
我通过一次读取多个Parquet文件解决了这个问题:
spark.read.parquet(f"/tmp/some_folder/{2020-04-02/blabla1.parq, 2020-05-14/bla2bla.parq}")
然后使用input\u file\u name()添加“path”列。