pyspark,获取计数增加到列表中最高计数的第一个日期

fnx2tebb  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(310)

使用pyspark,我需要得到一个计数增加到列表中最高计数的第一个日期。所以在az的这个例子中,我需要2021-03-06行,因为这是第一个日期,820198的计数出现,它比其他所有的计数都高。如果计数在每个日期都没有变化,我只需要日志中最早的日期。对于正义与发展党来说,那将是2021年1月23日。

+-----+------+----------+
|state| count|  log_date|
+-----+------+----------+
|   AZ|820198|2021-03-07|
|   AZ|820198|2021-03-06|
|   AZ|818784|2021-03-05|
|   AZ|801115|2021-03-03|
|   AK| 46819|2021-03-07|
|   AK| 46819|2021-03-06|
|   AK| 46819|2021-03-05|
|   AK| 46819|2021-01-23|
+-----+------+----------+

这是我到目前为止,它似乎在这些特定的数据条件下工作。我已经在这三天了,每天当一个新的条件/日期被添加到数据中时,我都不得不返工。
我想知道是否可以在原始需求的基础上简化这一点——这看起来像是很多分区,而pyspark可能有一种更聪明的方法,我不知道。


# start by getting the earliest date a count appears in the list

w = Window.partitionBy("state", "count").orderBy(F.desc("count"))
df = df_merged.withColumn("min_date_count_appears", F.min("log_date").over(w)).orderBy("state")
df.show()

# filter out records with the same count that appear at later dates

df = df.where(F.col("min_date_count_appears") == F.col("log_date")).orderBy("state")
df.show()

# add rank by log_date to get last two logs

w = Window.partitionBy("state").orderBy(F.desc("log_date"), F.desc("count"))
df = df.withColumn("rank", F.dense_rank().over(w))
df.show()

# get the lowest count in the list in case there has been no increase

window = Window.partitionBy("state")
df = df.withColumn("min_count", F.min("count").over(window)).withColumn("min_date", F.min("log_date").over(window.orderBy(F.desc("min_count"))))
df.show()

# get the top two logs to compare their counts. If there is no increase then fall back to the min_count

df1 = df.where(F.col("rank") == 1)
df2 = df.where(F.col("rank") == 2).select("state", "count", "log_date")
df2 = df2.withColumnRenamed("count", "prev_count").withColumnRenamed("log_date", "prev_date")
df = df1.join(df2, "state", "inner")
df.show()

case_increase = F.col("count") > F.col("prev_count")
df = df.withColumn("last_import", F.when(case_increase, F.col("log_date")).otherwise(F.col("min_date")))
df = df.withColumn("days_since_import", F.datediff(F.current_date(), df.last_import))
df.show()
4ngedf3f

4ngedf3f1#

您可以获得最大计数的最小日期,如下所示:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'max_count',
    F.max('count').over(Window.partitionBy('state'))
).withColumn(
    'first_date',
    F.min(
        F.when(
            F.col('count') == F.col('max_count'), 
            F.col('log_date')
        )
    ).over(Window.partitionBy('state'))
)

df2.show()
+-----+------+----------+---------+----------+
|state| count|  log_date|max_count|first_date|
+-----+------+----------+---------+----------+
|   AZ|820198|2021-03-07|   820198|2021-03-06|
|   AZ|820198|2021-03-06|   820198|2021-03-06|
|   AZ|818784|2021-03-05|   820198|2021-03-06|
|   AZ|801115|2021-03-03|   820198|2021-03-06|
|   AK| 46819|2021-03-07|    46819|2021-01-23|
|   AK| 46819|2021-03-06|    46819|2021-01-23|
|   AK| 46819|2021-03-05|    46819|2021-01-23|
|   AK| 46819|2021-01-23|    46819|2021-01-23|
+-----+------+----------+---------+----------+

如果你只想要日期和计数,你可以做

df3 = df2.select('state', 'max_count', 'first_date').distinct()

df3.show()
+-----+---------+----------+
|state|max_count|first_date|
+-----+---------+----------+
|   AZ|   820198|2021-03-06|
|   AK|    46819|2021-01-23|
+-----+---------+----------+

相关问题