我想递归地找到每个子文件夹的最大日期时间值,最后找到最上面的父文件夹的最大时间戳。sparksql在我运行时比较慢。所以我想用pyspark中的udf或Dataframe方法来实现这个逻辑。
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
| /A| parent-Folder| 1| null|
| /A/B| sub-folder| 2| null|
| /A/B/1.txt| file| 3| 02022021|
| /A/B/2.txt| file| 4| 02032021|
| /A/C| sub-folder| 2| null|
| /A/C/3.txt| file| 3| 02042021|
| /A/C/4.txt| file| 3| 02052021|
+-----------+---------------+----------------+---------+
输出应该像timestamp值一样显示(空值替换为每个级别的最大时间戳)
输出
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
| /A| parent-Folder| 02052021|
| /A/B| sub-folder| 02032021|
| /A/B/1.txt| file| 02022021|
| /A/B/2.txt| file| 02032021|
| /A/C| sub-folder| 02052021|
| /A/C/3.txt| file| 02042021|
| /A/C/4.txt| file| 02052021|
+-----------+---------------+---------+
我在sql下面尝试了sql,它给出了预期的结果它太慢时,数百万条记录在Dataframe)和我试图与Dataframe缓存,但它没有帮助。可能是一个昂贵的手术仅供参考。我从日期格式中删除了时间,因为它显示不正确。时间格式在这里不重要),但子文件夹和文件夹应该
df.show()
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
| /A| parent-Folder| 1| null|
| /A/B| sub-folder| 2| null|
| /A/B/1.txt| file| 3| 02022021|
| /A/B/2.txt| file| 4| 02032021|
| /A/C| sub-folder| 2| null|
| /A/C/3.txt| file| 3| 02042021|
| /A/C/4.txt| file| 3| 02052021|
+-----------+---------------+----------------+---------+
>>> self_join_rec = sqlc.sql("SELECT \
... a.File_Folder, a.Folder_File_Ind, Max(b.Timestamp) Timestamp \
... FROM src_table a \
... JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%') \
... GROUP BY \
... a.File_Folder, a.Folder_File_Ind \
... ORDER BY a.File_Folder,a.Folder_File_Ind"
... )
>>> self_join_rec.show()
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
| /A| parent-Folder| 02052021|
| /A/B| sub-folder| 02032021|
| /A/B/1.txt| file| 02022021|
| /A/B/2.txt| file| 02032021|
| /A/C| sub-folder| 02052021|
| /A/C/3.txt| file| 02042021|
| /A/C/4.txt| file| 02052021|
+-----------+---------------+---------+
1条答案
按热度按时间kdfy810k1#
添加列
base_folder
只包含不包含文件的文件夹部分,将用于连接分组依据
base_folder
并计算最大时间戳使用连接原始Dataframe
base_folder
获取空行的最大时间戳或将sql查询与CTE结合使用: