如何使用pyspark基于checkDate仅选择最新记录

ecfdbz9o  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(141)

我有一个Spark

vehicle_coalesce  vehicleNumber  productionNumber pin  checkDate
V123              V123           P123             null 27/08/2023 01:03
P123              null           P123             W123 27/08/2023 01:05
P123              null           P123             W123 27/08/2023 01:05
V234              V234           P234             null 27/08/2023 01:03
V234              V234           null             W234 27/08/2023 01:05
V234              V234           null             W234 27/08/2023 01:05
P456              null           P456             W456 27/08/2023 01:03
v456              V456           null             W456 27/08/2023 01:05
V456              V456           P456             W456 27/08/2023 01:05

我必须按vehicleNumberproductionNumberpin进行分组,并按vehicleNumberproductionNumberpin进行分区,并仅选择基于checkDate的最新记录。
所需输出为:

vehicle_coalesce  vehicleNumber  productionNumber pin  checkDate
P123              null           P123             W123 27/08/2023 01:05
P123              null           P123             W123 27/08/2023 01:05
V234              V234           null             W234 27/08/2023 01:05
V234              V234           null             W234 27/08/2023 01:05
v456              V456           null             W456 27/08/2023 01:05
V456              V456           P456             W456 27/08/2023 01:05

在此,由于V123车辆具有相同的productionNumber,因此按productionNumber对其进行分组并挑选最新记录,对于V234,由于vehicleNumber具有相同的vehicleNumber,因此按V234对其进行分组并挑选最新记录,对于V456,由于pin具有相同的pin,因此按V456对其进行分组并挑选最新记录
如何使用pyspark?

gkn4icbw

gkn4icbw1#

由于您希望按'vehicleNumber'、'productionNumber''pin'分组,因此我将使用按checkDate排序的三个不同窗口。对于每个窗口,每列一个,我将保留感兴趣的列和最后一个checkDate的行的空值。
可以这样写:

from pyspark.sql import functions as F
from pyspark.sql import Window

result = df
for column in ['vehicleNumber', 'productionNumber', 'pin']:
    window = Window.partitionBy(column).orderBy(F.col("checkDate").desc())
    result = result\
        .withColumn("r", F.rank().over(window))\
        .where(F.col(column).isNull() | (F.col("r") == 1))\
        .drop("r")

result.show()
+----------------+-------------+----------------+----+----------------+
|vehicle_coalesce|vehicleNumber|productionNumber| pin|       checkDate|
+----------------+-------------+----------------+----+----------------+
|            P123|         null|            P123|W123|27/08/2023 01:05|
|            P123|         null|            P123|W123|27/08/2023 01:05|
|            V234|         V234|            null|W234|27/08/2023 01:05|
|            V234|         V234|            null|W234|27/08/2023 01:05|
|            v456|         V456|            null|W456|27/08/2023 01:05|
|            V456|         V456|            P456|W456|27/08/2023 01:05|
+----------------+-------------+----------------+----+----------------+
rhfm7lfc

rhfm7lfc2#

我建议在整个DataFrame中使用一致的日期时间格式,以确保正确的日期解析。这里有一个更新的方法,它假设“checkDate”列具有常量日期时间格式:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Create a Spark session
spark = SparkSession.builder.appName("LatestRecords").getOrCreate()

# Sample DataFrame
data = [("V123", "V123", "P123", None, "2023-08-27 01:03:00"),
        ("P123", None, "P123", "W123", "2023-08-27 01:05:00"),
        ("P123", None, "P123", "W123", "2023-08-27 01:05:00"),
        ("V234", "V234", "P234", None, "2023-08-27 01:03:00"),
        ("V234", "V234", None, "W234", "2023-08-27 01:05:00"),
        ("V234", "V234", None, "W234", "2023-08-27 01:05:00"),
        ("P456", None, "P456", "W456", "2023-08-27 01:03:00"),
        ("V456", "V456", None, "W456", "2023-08-27 01:05:00"),
        ("V456", "V456", "P456", "W456", "2023-08-27 01:05:00")]

columns = ["vehicle_coalesce", "vehicleNumber", "productionNumber", "pin", "checkDate"]

df = spark.createDataFrame(data, columns)

# Define a Window specification
window_spec = Window.partitionBy("vehicleNumber", "productionNumber", "pin").orderBy(F.desc("checkDate"))

# Add a new column with row numbers based on the Window specification
df = df.withColumn("row_num", F.row_number().over(window_spec))

# Filter only the rows with row_num == 1 (latest records within each partition)
result_df = df.filter(F.col("row_num") == 1).drop("row_num")

# Show the result
result_df.show()

对于一致的日期解析,在此代码中,“checkDate”字段假定为“yyyy-MM-dd HH:mm:ss”格式。

相关问题