如何加入当前行时间之前的最近时间(pyspark 2.4.4 dataframes)

mqkwyuun  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(379)

我很难想出如何做到以下几点:
pyspark“df1”中有2个Dataframe,如下所示:

+----+-------------+-------+
| id | SMS Created |Content|
+----+-------------+-------+
| 1  | 12:00:00    | a     |
+----+-------------+-------+
| 2  | 13:00:00    | b     |
+----+-------------+-------+
| 3  | 11:00:00    | c     |
+----+-------------+-------+

df2如下所示:

+---------+----------+----+---------+
| Event   | Time     | id | Members |
+---------+----------+----+---------+
| Created | 11:30:00 | 1  | [1,2]   |
+---------+----------+----+---------+
| Updated | 11:42:00 | 1  | [1,2,3] |
+---------+----------+----+---------+
| Updated | 11:50:00 | 1  | [1,2,4] |
+---------+----------+----+---------+
| Updated | 12:50:00 | 1  | [1,2]   |
+---------+----------+----+---------+
| Created | 12:30:00 | 2  | [1,2]   |
+---------+----------+----+---------+
| Updated | 12:42:00 | 2  | [1,2,3] |
+---------+----------+----+---------+
| Updated | 12:50:00 | 2  | [1,2,4] |
+---------+----------+----+---------+
| Updated | 13:10:00 | 2  | [1,2]   |
+---------+----------+----+---------+
| Created | 10:30:00 | 3  | [1,2]   |
+---------+----------+----+---------+
| Updated | 10:42:00 | 3  | [1,2,3] |
+---------+----------+----+---------+
| Updated | 10:50:00 | 3  | [1,2,4] |
+---------+----------+----+---------+
| Updated | 12:10:00 | 2  | [1,2]   |
+---------+----------+----+---------+

每次成员更改时都会更新df2,但消息只发送给“sms created”时间之前的“成员”。
请注意,在“sms created”时间之后会有更新时间,因此在这里不使用任何类型的max()函数。我只是不知道该怎么做。
在“sms created”之前,您将如何加入最近的“event”,因此表如下所示:

+----+-------------+---------+---------+----------+---------+
| id | SMS Created | Content | Event   | Time     | Members |
+----+-------------+---------+---------+----------+---------+
| 1  | 12:00:00    | a       | Updated | 11:50:00 | [1,2.4] |
+----+-------------+---------+---------+----------+---------+
| 2  | 13:00:00    | b       | Updated | 12:50:00 | [1,2,4] |
+----+-------------+---------+---------+----------+---------+
| 3  | 11:00:00    | c       | Updated | 10:50:00 | [1,2,4] |
+----+-------------+---------+---------+----------+---------+

我正在使用pyspark 2.4.4和dataframeapi。任何帮助都将不胜感激!

jgwigjjp

jgwigjjp1#

welcome to SO 试试这个:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id")
df1.join(df2.withColumnRenamed("id","id2"), (F.col("id")==F.col("id2"))&(F.col("SMS Created")>F.col("Time"))).drop("id2")\
   .withColumn("max", F.max("Time").over(w))\
   .filter('max=Time').drop("max").orderBy("id").show()

# +---+-----------+-------+-------+--------+---------+

# | id|SMS Created|Content|  Event|    Time|  Members|

# +---+-----------+-------+-------+--------+---------+

# |  1|   12:00:00|      a|Updated|11:50:00|[1, 2, 4]|

# |  2|   13:00:00|      b|Updated|12:50:00|[1, 2, 4]|

# |  3|   11:00:00|      c|Updated|10:50:00|[1, 2, 4]|

# +---+-----------+-------+-------+--------+---------+

相关问题