使用pyspark,我需要得到一个计数增加到列表中最高计数的第一个日期。所以在az的这个例子中,我需要2021-03-06行,因为这是第一个日期,820198的计数出现,它比其他所有的计数都高。如果计数在每个日期都没有变化,我只需要日志中最早的日期。对于正义与发展党来说,那将是2021年1月23日。
+-----+------+----------+
|state| count| log_date|
+-----+------+----------+
| AZ|820198|2021-03-07|
| AZ|820198|2021-03-06|
| AZ|818784|2021-03-05|
| AZ|801115|2021-03-03|
| AK| 46819|2021-03-07|
| AK| 46819|2021-03-06|
| AK| 46819|2021-03-05|
| AK| 46819|2021-01-23|
+-----+------+----------+
这是我到目前为止,它似乎在这些特定的数据条件下工作。我已经在这三天了,每天当一个新的条件/日期被添加到数据中时,我都不得不返工。
我想知道是否可以在原始需求的基础上简化这一点——这看起来像是很多分区,而pyspark可能有一种更聪明的方法,我不知道。
# start by getting the earliest date a count appears in the list
w = Window.partitionBy("state", "count").orderBy(F.desc("count"))
df = df_merged.withColumn("min_date_count_appears", F.min("log_date").over(w)).orderBy("state")
df.show()
# filter out records with the same count that appear at later dates
df = df.where(F.col("min_date_count_appears") == F.col("log_date")).orderBy("state")
df.show()
# add rank by log_date to get last two logs
w = Window.partitionBy("state").orderBy(F.desc("log_date"), F.desc("count"))
df = df.withColumn("rank", F.dense_rank().over(w))
df.show()
# get the lowest count in the list in case there has been no increase
window = Window.partitionBy("state")
df = df.withColumn("min_count", F.min("count").over(window)).withColumn("min_date", F.min("log_date").over(window.orderBy(F.desc("min_count"))))
df.show()
# get the top two logs to compare their counts. If there is no increase then fall back to the min_count
df1 = df.where(F.col("rank") == 1)
df2 = df.where(F.col("rank") == 2).select("state", "count", "log_date")
df2 = df2.withColumnRenamed("count", "prev_count").withColumnRenamed("log_date", "prev_date")
df = df1.join(df2, "state", "inner")
df.show()
case_increase = F.col("count") > F.col("prev_count")
df = df.withColumn("last_import", F.when(case_increase, F.col("log_date")).otherwise(F.col("min_date")))
df = df.withColumn("days_since_import", F.datediff(F.current_date(), df.last_import))
df.show()
1条答案
按热度按时间4ngedf3f1#
您可以获得最大计数的最小日期,如下所示:
如果你只想要日期和计数,你可以做