我在PySpark中有以下 Dataframe :
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()
# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
("2011-12-15 15:08", None, None, None)]
# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])
# Show the DataFrame
df.show()
打印输出:
+-------------------+----+-------------------+---------------------+
| timestamp|myid| start_timestamp | end_timestamp |
+-------------------+----+-------------------+---------------------+
|2011-11-13 11:00:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-11-13 11:01:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-11-13 11:04:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-11-13 11:15:00| 1|2011-11-13 11:06:00|2011-11-14 11:00:00 |
|2011-12-15 15:00:00| 2|2011-12-15 15:05:00|2012-01-02 15:00:00 |
|2011-12-15 15:06:00| 2|2011-12-15 15:05:00|2012-01-02 15:00:00 |
|2011-12-15 15:08:00|null| null| null |
|2011-12-17 16:00:00|null| null| null |
+-------------------+----+-------------------+---------------------+
我只需要选择符合以下条件的行:
1.在“开始时间戳”和“结束时间戳”中具有空值,或
1.具有与“start_timestamp”列值最接近的“timestamp”。
对于上面的示例,预期结果如下:
+--------------------+------------+----------------------+---------------------+
|timestamp | myid | start_timespamp | end_timestamp
+--------------------+------------+----------------------+---------------------+
|2011-11-13 11:04 | 1 | 2011-11-13 11:06 | 2011-11-14 11:00
|2011-12-15 15:06 | 2 | 2011-12-15 15:05 | 2012-01-02 15:00
|2011-12-15 15:08 | null | null | null
|2011-12-17 16:00:00 | null | null | null
+--------------------+------------+----------------------+---------------------+
这是我当前的代码,但它给出了错误的结果:
此外,在我的ral数据集的情况下,此代码失败,错误为Aggregate/Window/Generate expressions are not valid in where clause of the query
:
from pyspark.sql import Window
from pyspark.sql.functions import abs, col, min
# Create a window to partition the data by "myid" and order by "timestamp"
window = Window.partitionBy("myid").orderBy("timestamp")
# Add a new column "time_diff" that calculates the absolute difference between "timestamp" and "start_timestamp"
df = df.withColumn("time_diff", abs(col("timestamp").cast("long") - col("start_timestamp").cast("long")))
# Add a new column "min_time_diff" that contains the minimum "time_diff" for each "myid"
df = df.withColumn("min_time_diff", min("time_diff").over(window))
# Select the rows that have null values in "start_timespamp" and "end_timestamp"
# or have the minimum value in "time_diff" for each "myid"
df = df.filter((col("start_timestamp").isNull() & col("end_timestamp").isNull()) |
(col("time_diff") == col("min_time_diff")))
1条答案
按热度按时间bksxznpy1#
这就是我的方法: