pysparkDataframe在每个文件夹级别的最大时间

tyky79it  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(400)

我想递归地找到每个子文件夹的最大日期时间值,最后找到最上面的父文件夹的最大时间戳。sparksql在我运行时比较慢。所以我想用pyspark中的udf或Dataframe方法来实现这个逻辑。

+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
|         /A|  parent-Folder|               1|     null|
|       /A/B|     sub-folder|               2|     null|
| /A/B/1.txt|           file|               3| 02022021|
| /A/B/2.txt|           file|               4| 02032021|
|       /A/C|     sub-folder|               2|     null|
| /A/C/3.txt|           file|               3| 02042021|
| /A/C/4.txt|           file|               3| 02052021|
+-----------+---------------+----------------+---------+

输出应该像timestamp值一样显示(空值替换为每个级别的最大时间戳)
输出

+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
|         /A|  parent-Folder| 02052021|
|       /A/B|     sub-folder| 02032021|
| /A/B/1.txt|           file| 02022021|
| /A/B/2.txt|           file| 02032021|
|       /A/C|     sub-folder| 02052021|
| /A/C/3.txt|           file| 02042021|
| /A/C/4.txt|           file| 02052021|
+-----------+---------------+---------+

我在sql下面尝试了sql,它给出了预期的结果它太慢时,数百万条记录在Dataframe)和我试图与Dataframe缓存,但它没有帮助。可能是一个昂贵的手术仅供参考。我从日期格式中删除了时间,因为它显示不正确。时间格式在这里不重要),但子文件夹和文件夹应该

df.show()
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
|         /A|  parent-Folder|               1|     null|
|       /A/B|     sub-folder|               2|     null|
| /A/B/1.txt|           file|               3| 02022021|
| /A/B/2.txt|           file|               4| 02032021|
|       /A/C|     sub-folder|               2|     null|
| /A/C/3.txt|           file|               3| 02042021|
| /A/C/4.txt|           file|               3| 02052021|
+-----------+---------------+----------------+---------+

>>> self_join_rec =  sqlc.sql("SELECT   \
...     a.File_Folder,  a.Folder_File_Ind, Max(b.Timestamp) Timestamp  \
...     FROM src_table a \
...     JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%')   \
...     GROUP BY \
...     a.File_Folder,  a.Folder_File_Ind \
...     ORDER BY a.File_Folder,a.Folder_File_Ind"
... )
>>> self_join_rec.show()
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
|         /A|  parent-Folder| 02052021|
|       /A/B|     sub-folder| 02032021|
| /A/B/1.txt|           file| 02022021|
| /A/B/2.txt|           file| 02032021|
|       /A/C|     sub-folder| 02052021|
| /A/C/3.txt|           file| 02042021|
| /A/C/4.txt|           file| 02052021|
+-----------+---------------+---------+
kdfy810k

kdfy810k1#

添加列 base_folder 只包含不包含文件的文件夹部分,将用于连接
分组依据 base_folder 并计算最大时间戳
使用连接原始Dataframe base_folder 获取空行的最大时间戳

from pyspark.sql import functions as F

# create new column base_folder

df = df.withColumn(
    "base_folder",
    F.when(
        F.col("Folder_File_Ind") == "file",
        F.regexp_extract("File_Folder", "(.*)/.*", 1)
    ).otherwise(F.col("File_Folder"))
)

df.cache()

# calculate max timestamp per group of folders

max_df = df.groupby("base_folder").agg(F.max("Timestamp").alias("max_timestamp")).filter("max_timestamp is not null")

# join df with max_df

df1 = df.alias("df").join(
    max_df.alias("max"),
    F.col("max.base_folder").startswith(F.col("df.base_folder")),
    "left"
).groupby("File_Folder").agg(
    F.first("Folder_File_Ind").alias("Folder_File_Ind"),
    F.first("folder_level_ind").alias("folder_level_ind"),
    F.coalesce(
        F.first("Timestamp"),
        F.max("max_timestamp")
    ).alias("Timestamp")
)

df1.show()

# +-----------+---------------+----------------+-----------------+

# |File_Folder|Folder_File_Ind|folder_level_ind|        Timestamp|

# +-----------+---------------+----------------+-----------------+

# |         /A| parent-Folder |               1| 02-FEB-2021 9 PM|

# | /A/C/4.txt|           file|               3|02-FEB-2021 11 AM|

# | /A/B/2.txt|           file|               3| 02-FEB-2021 9 PM|

# |       /A/C|    sub-folder |               2|02-FEB-2021 11 AM|

# | /A/C/3.txt|           file|               3|02-FEB-2021 11 AM|

# |       /A/B|     sub-folder|               2| 02-FEB-2021 9 PM|

# | /A/B/1.txt|           file|               3| 02-FEB-2021 9 PM|

# +-----------+---------------+----------------+-----------------+

或将sql查询与CTE结合使用:

sql_query = """
WITH folders_data AS (
    SELECT  *,
            CASE WHEN Folder_File_Ind = 'file' THEN regexp_extract(File_Folder, '(.*)/.*', 1)
                 ELSE File_Folder
            END AS base_folder
    FROM    src_table
), 
max_per_folder AS (
    SELECT  base_folder,
            MAX(Timestamp) AS max_timestamp
    FROM    folders_data
    GROUP BY  base_folder
    HAVING  MAX(Timestamp) IS NOT NULL
)

SELECT  File_Folder,
        FIRST(Folder_File_Ind)      AS Folder_File_Ind,
        FIRST(folder_level_ind)     AS folder_level_ind,
        COALESCE(FIRST(Timestamp), MAX(max_timestamp))  AS Timestamp
FROM    folders_data AS t1
LEFT JOIN max_per_folder t2
ON      t2.base_folder LIKE CONCAT(t1.base_folder, '%')
GROUP BY File_Folder
"""

spark.sql(sql_query).show()

相关问题