在PySpark中有条件地过滤行:聚合/窗口/生成表达式在查询的where子句中无效

bsxbgnwa  于 2023-01-20  发布在  Spark
关注(0)|答案(1)|浏览(121)

我在PySpark中有以下 Dataframe :

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()

# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:08", None, None, None)]

# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])

# Show the DataFrame
df.show()

打印输出:

+-------------------+----+-------------------+---------------------+
|          timestamp|myid|   start_timestamp |       end_timestamp |
+-------------------+----+-------------------+---------------------+
|2011-11-13 11:00:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-11-13 11:01:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-11-13 11:04:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-11-13 11:15:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00  |
|2011-12-15 15:00:00|   2|2011-12-15 15:05:00|2012-01-02 15:00:00  |
|2011-12-15 15:06:00|   2|2011-12-15 15:05:00|2012-01-02 15:00:00  |
|2011-12-15 15:08:00|null|               null|                null |
|2011-12-17 16:00:00|null|               null|                null |
+-------------------+----+-------------------+---------------------+

我只需要选择符合以下条件的行:
1.在“开始时间戳”和“结束时间戳”中具有空值,或
1.具有与“start_timestamp”列值最接近的“timestamp”。
对于上面的示例,预期结果如下:

+--------------------+------------+----------------------+---------------------+
|timestamp           |     myid   |  start_timespamp     |     end_timestamp
+--------------------+------------+----------------------+---------------------+
|2011-11-13 11:04    |    1       |  2011-11-13 11:06    |   2011-11-14  11:00
|2011-12-15 15:06    |    2       |  2011-12-15 15:05    |   2012-01-02  15:00
|2011-12-15 15:08    |    null    |  null                |   null
|2011-12-17 16:00:00 |    null    |  null                |   null
+--------------------+------------+----------------------+---------------------+

这是我当前的代码,但它给出了错误的结果:

此外,在我的ral数据集的情况下,此代码失败,错误为Aggregate/Window/Generate expressions are not valid in where clause of the query

from pyspark.sql import Window
from pyspark.sql.functions import abs, col, min

# Create a window to partition the data by "myid" and order by "timestamp"
window = Window.partitionBy("myid").orderBy("timestamp")

# Add a new column "time_diff" that calculates the absolute difference between "timestamp" and "start_timestamp"
df = df.withColumn("time_diff", abs(col("timestamp").cast("long") - col("start_timestamp").cast("long")))

# Add a new column "min_time_diff" that contains the minimum "time_diff" for each "myid"
df = df.withColumn("min_time_diff", min("time_diff").over(window))

# Select the rows that have null values in "start_timespamp" and "end_timestamp"
# or have the minimum value in "time_diff" for each "myid"
df = df.filter((col("start_timestamp").isNull() & col("end_timestamp").isNull()) | 
              (col("time_diff") == col("min_time_diff")))
bksxznpy

bksxznpy1#

这就是我的方法:

from pyspark.sql import functions as psf
from pyspark.sql.window import Window

# Create a list of rows for the DataFrame
rows = [("2011-11-13 11:00", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:01", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:04", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-11-13 11:15", 1, "2011-11-13 11:06", "2011-11-14 11:00"),
        ("2011-12-15 15:00", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:06", 2, "2011-12-15 15:05", "2012-01-02 15:00"),
        ("2011-12-15 15:08", None, None, None),
        ("2011-12-17 16:00:00", None, None, None)]

# Create a DataFrame from the rows
df = spark.createDataFrame(rows, ["timestamp", "myid", "start_timestamp", "end_timestamp"])

# cast timestamps
df = df.withColumn(
        "timestamp", psf.col("timestamp").cast("timestamp")
    ).withColumn(
        "start_timestamp", psf.col("start_timestamp").cast("timestamp")
    ).withColumn(
        "end_timestamp", psf.col("end_timestamp").cast("timestamp")
    )

# as per your code 
df = df.withColumn("time_diff", psf.abs(psf.col("timestamp").cast("long") - psf.col("start_timestamp").cast("long")))

# generate row number for selection of min time diff
window_spec = Window.partitionBy('myid').orderBy(psf.col('time_diff'))
df = df.withColumn('rn', psf.row_number().over(window_spec))

# apply selection criteria
df.filter(
    (psf.col('start_timestamp').isNull() & psf.col('end_timestamp').isNull()) 
    | (psf.col('rn')==psf.lit(1))
).show()

+-------------------+----+-------------------+-------------------+---------+---+
|          timestamp|myid|    start_timestamp|      end_timestamp|time_diff| rn|
+-------------------+----+-------------------+-------------------+---------+---+
|2011-12-15 15:08:00|null|               null|               null|     null|  1|
|2011-12-17 16:00:00|null|               null|               null|     null|  2|
|2011-11-13 11:04:00|   1|2011-11-13 11:06:00|2011-11-14 11:00:00|      120|  1|
|2011-12-15 15:06:00|   2|2011-12-15 15:05:00|2012-01-02 15:00:00|       60|  1|
+-------------------+----+-------------------+-------------------+---------+---+

相关问题