如何加入当前行时间之前的最近时间(pyspark 2.4.4 dataframes)

mqkwyuun  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(408)

我很难想出如何做到以下几点:
pyspark“df1”中有2个Dataframe,如下所示:

  1. +----+-------------+-------+
  2. | id | SMS Created |Content|
  3. +----+-------------+-------+
  4. | 1 | 12:00:00 | a |
  5. +----+-------------+-------+
  6. | 2 | 13:00:00 | b |
  7. +----+-------------+-------+
  8. | 3 | 11:00:00 | c |
  9. +----+-------------+-------+

df2如下所示:

  1. +---------+----------+----+---------+
  2. | Event | Time | id | Members |
  3. +---------+----------+----+---------+
  4. | Created | 11:30:00 | 1 | [1,2] |
  5. +---------+----------+----+---------+
  6. | Updated | 11:42:00 | 1 | [1,2,3] |
  7. +---------+----------+----+---------+
  8. | Updated | 11:50:00 | 1 | [1,2,4] |
  9. +---------+----------+----+---------+
  10. | Updated | 12:50:00 | 1 | [1,2] |
  11. +---------+----------+----+---------+
  12. | Created | 12:30:00 | 2 | [1,2] |
  13. +---------+----------+----+---------+
  14. | Updated | 12:42:00 | 2 | [1,2,3] |
  15. +---------+----------+----+---------+
  16. | Updated | 12:50:00 | 2 | [1,2,4] |
  17. +---------+----------+----+---------+
  18. | Updated | 13:10:00 | 2 | [1,2] |
  19. +---------+----------+----+---------+
  20. | Created | 10:30:00 | 3 | [1,2] |
  21. +---------+----------+----+---------+
  22. | Updated | 10:42:00 | 3 | [1,2,3] |
  23. +---------+----------+----+---------+
  24. | Updated | 10:50:00 | 3 | [1,2,4] |
  25. +---------+----------+----+---------+
  26. | Updated | 12:10:00 | 2 | [1,2] |
  27. +---------+----------+----+---------+

每次成员更改时都会更新df2,但消息只发送给“sms created”时间之前的“成员”。
请注意,在“sms created”时间之后会有更新时间,因此在这里不使用任何类型的max()函数。我只是不知道该怎么做。
在“sms created”之前,您将如何加入最近的“event”,因此表如下所示:

  1. +----+-------------+---------+---------+----------+---------+
  2. | id | SMS Created | Content | Event | Time | Members |
  3. +----+-------------+---------+---------+----------+---------+
  4. | 1 | 12:00:00 | a | Updated | 11:50:00 | [1,2.4] |
  5. +----+-------------+---------+---------+----------+---------+
  6. | 2 | 13:00:00 | b | Updated | 12:50:00 | [1,2,4] |
  7. +----+-------------+---------+---------+----------+---------+
  8. | 3 | 11:00:00 | c | Updated | 10:50:00 | [1,2,4] |
  9. +----+-------------+---------+---------+----------+---------+

我正在使用pyspark 2.4.4和dataframeapi。任何帮助都将不胜感激!

jgwigjjp

jgwigjjp1#

welcome to SO 试试这个:

  1. from pyspark.sql import functions as F
  2. from pyspark.sql.window import Window
  3. w=Window().partitionBy("id")
  4. df1.join(df2.withColumnRenamed("id","id2"), (F.col("id")==F.col("id2"))&(F.col("SMS Created")>F.col("Time"))).drop("id2")\
  5. .withColumn("max", F.max("Time").over(w))\
  6. .filter('max=Time').drop("max").orderBy("id").show()
  7. # +---+-----------+-------+-------+--------+---------+
  8. # | id|SMS Created|Content| Event| Time| Members|
  9. # +---+-----------+-------+-------+--------+---------+
  10. # | 1| 12:00:00| a|Updated|11:50:00|[1, 2, 4]|
  11. # | 2| 13:00:00| b|Updated|12:50:00|[1, 2, 4]|
  12. # | 3| 11:00:00| c|Updated|10:50:00|[1, 2, 4]|
  13. # +---+-----------+-------+-------+--------+---------+
展开查看全部

相关问题