pysparkDataframe在每个文件夹级别的最大时间

tyky79it  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(431)

我想递归地找到每个子文件夹的最大日期时间值,最后找到最上面的父文件夹的最大时间戳。sparksql在我运行时比较慢。所以我想用pyspark中的udf或Dataframe方法来实现这个逻辑。

  1. +-----------+---------------+----------------+---------+
  2. |File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
  3. +-----------+---------------+----------------+---------+
  4. | /A| parent-Folder| 1| null|
  5. | /A/B| sub-folder| 2| null|
  6. | /A/B/1.txt| file| 3| 02022021|
  7. | /A/B/2.txt| file| 4| 02032021|
  8. | /A/C| sub-folder| 2| null|
  9. | /A/C/3.txt| file| 3| 02042021|
  10. | /A/C/4.txt| file| 3| 02052021|
  11. +-----------+---------------+----------------+---------+

输出应该像timestamp值一样显示(空值替换为每个级别的最大时间戳)
输出

  1. +-----------+---------------+---------+
  2. |File_Folder|Folder_File_Ind|Timestamp|
  3. +-----------+---------------+---------+
  4. | /A| parent-Folder| 02052021|
  5. | /A/B| sub-folder| 02032021|
  6. | /A/B/1.txt| file| 02022021|
  7. | /A/B/2.txt| file| 02032021|
  8. | /A/C| sub-folder| 02052021|
  9. | /A/C/3.txt| file| 02042021|
  10. | /A/C/4.txt| file| 02052021|
  11. +-----------+---------------+---------+

我在sql下面尝试了sql,它给出了预期的结果它太慢时,数百万条记录在Dataframe)和我试图与Dataframe缓存,但它没有帮助。可能是一个昂贵的手术仅供参考。我从日期格式中删除了时间,因为它显示不正确。时间格式在这里不重要),但子文件夹和文件夹应该

  1. df.show()
  2. +-----------+---------------+----------------+---------+
  3. |File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
  4. +-----------+---------------+----------------+---------+
  5. | /A| parent-Folder| 1| null|
  6. | /A/B| sub-folder| 2| null|
  7. | /A/B/1.txt| file| 3| 02022021|
  8. | /A/B/2.txt| file| 4| 02032021|
  9. | /A/C| sub-folder| 2| null|
  10. | /A/C/3.txt| file| 3| 02042021|
  11. | /A/C/4.txt| file| 3| 02052021|
  12. +-----------+---------------+----------------+---------+
  13. >>> self_join_rec = sqlc.sql("SELECT \
  14. ... a.File_Folder, a.Folder_File_Ind, Max(b.Timestamp) Timestamp \
  15. ... FROM src_table a \
  16. ... JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%') \
  17. ... GROUP BY \
  18. ... a.File_Folder, a.Folder_File_Ind \
  19. ... ORDER BY a.File_Folder,a.Folder_File_Ind"
  20. ... )
  21. >>> self_join_rec.show()
  22. +-----------+---------------+---------+
  23. |File_Folder|Folder_File_Ind|Timestamp|
  24. +-----------+---------------+---------+
  25. | /A| parent-Folder| 02052021|
  26. | /A/B| sub-folder| 02032021|
  27. | /A/B/1.txt| file| 02022021|
  28. | /A/B/2.txt| file| 02032021|
  29. | /A/C| sub-folder| 02052021|
  30. | /A/C/3.txt| file| 02042021|
  31. | /A/C/4.txt| file| 02052021|
  32. +-----------+---------------+---------+
kdfy810k

kdfy810k1#

添加列 base_folder 只包含不包含文件的文件夹部分,将用于连接
分组依据 base_folder 并计算最大时间戳
使用连接原始Dataframe base_folder 获取空行的最大时间戳

  1. from pyspark.sql import functions as F
  2. # create new column base_folder
  3. df = df.withColumn(
  4. "base_folder",
  5. F.when(
  6. F.col("Folder_File_Ind") == "file",
  7. F.regexp_extract("File_Folder", "(.*)/.*", 1)
  8. ).otherwise(F.col("File_Folder"))
  9. )
  10. df.cache()
  11. # calculate max timestamp per group of folders
  12. max_df = df.groupby("base_folder").agg(F.max("Timestamp").alias("max_timestamp")).filter("max_timestamp is not null")
  13. # join df with max_df
  14. df1 = df.alias("df").join(
  15. max_df.alias("max"),
  16. F.col("max.base_folder").startswith(F.col("df.base_folder")),
  17. "left"
  18. ).groupby("File_Folder").agg(
  19. F.first("Folder_File_Ind").alias("Folder_File_Ind"),
  20. F.first("folder_level_ind").alias("folder_level_ind"),
  21. F.coalesce(
  22. F.first("Timestamp"),
  23. F.max("max_timestamp")
  24. ).alias("Timestamp")
  25. )
  26. df1.show()
  27. # +-----------+---------------+----------------+-----------------+
  28. # |File_Folder|Folder_File_Ind|folder_level_ind| Timestamp|
  29. # +-----------+---------------+----------------+-----------------+
  30. # | /A| parent-Folder | 1| 02-FEB-2021 9 PM|
  31. # | /A/C/4.txt| file| 3|02-FEB-2021 11 AM|
  32. # | /A/B/2.txt| file| 3| 02-FEB-2021 9 PM|
  33. # | /A/C| sub-folder | 2|02-FEB-2021 11 AM|
  34. # | /A/C/3.txt| file| 3|02-FEB-2021 11 AM|
  35. # | /A/B| sub-folder| 2| 02-FEB-2021 9 PM|
  36. # | /A/B/1.txt| file| 3| 02-FEB-2021 9 PM|
  37. # +-----------+---------------+----------------+-----------------+

或将sql查询与CTE结合使用:

  1. sql_query = """
  2. WITH folders_data AS (
  3. SELECT *,
  4. CASE WHEN Folder_File_Ind = 'file' THEN regexp_extract(File_Folder, '(.*)/.*', 1)
  5. ELSE File_Folder
  6. END AS base_folder
  7. FROM src_table
  8. ),
  9. max_per_folder AS (
  10. SELECT base_folder,
  11. MAX(Timestamp) AS max_timestamp
  12. FROM folders_data
  13. GROUP BY base_folder
  14. HAVING MAX(Timestamp) IS NOT NULL
  15. )
  16. SELECT File_Folder,
  17. FIRST(Folder_File_Ind) AS Folder_File_Ind,
  18. FIRST(folder_level_ind) AS folder_level_ind,
  19. COALESCE(FIRST(Timestamp), MAX(max_timestamp)) AS Timestamp
  20. FROM folders_data AS t1
  21. LEFT JOIN max_per_folder t2
  22. ON t2.base_folder LIKE CONCAT(t1.base_folder, '%')
  23. GROUP BY File_Folder
  24. """
  25. spark.sql(sql_query).show()
展开查看全部

相关问题