如何使用pyspark收集两个连续日期之间的新id列表

wyyhbhjk  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(361)

我在pysparkDataframe中做了一个groupby over-week列,并在两个成功的星期数之间收集新的不同id。
我试着做一个groupby over week列,然后在dataframe上聚合collect\u set方法以获得所有可用的id,然后我逐个比较列表,以获得两个连续列表之间的不同id,按周数排列,如下所示:
输入:
周末ID\u 11id\u 21id\u 32id\u 12id\u 42id\u 53id\u 6
输出:
weekid\u listdiff\u listnew\u different\u id\u count1id\u 1,id\u 2,id\u 3id\u 1,id\u 2,id\u 3-2id\u 1,id\u 4,id\u 5id\u 4,id\u 523id\u 2,id\u 6id\u 61
这里的问题是,在我的例子中,我拥有大量的id(超过900万个id),spark会话由于内存不足而被终止,我想是(错误500)!
有没有其他解决方案可以让pyspark连续两周获得新的不同id的列表?

6kkfgxo0

6kkfgxo01#

为了扩展,您需要改为按id聚合,而不收集任何结果。请尝试以下方法:

import pyspark.sql.functions as F
from pyspark.sql import Window

data = spark.createDataFrame([(1, "ID_1"), (1, "ID_2"), (1, "ID_3"),
                              (2, "ID_1"), (2, "ID_4"), (2, "ID_5"),
                              (3, "ID_6")], ["Week", "ID"])

win = Window.partitionBy('ID').orderBy('Week')

agg_data = (
  data
  .withColumn("prevWeek", F.lag("Week", offset=1).over(win))
  .withColumn("isInPrevWeek", 
              F.col("prevWeek").isNotNull() & ((F.col("Week") - F.col("prevWeek")) == 1))
  .filter(~F.col("isInPrevWeek"))
  .groupBy("Week")
  .agg(F.count("*").alias("newIDs"),
       F.array_sort(F.collect_list("ID")).alias("showNewIDs"))  # Remove in production
  .orderBy("Week")
)

agg_data.show()

第一,功能 lag 使用一个窗口函数创建前一周的新列,该函数允许单独考虑每个id(分区)并按时间顺序对周进行排序。这可以很好地扩展,因为spark任务是由一组id组成的。
那么, isInPrevWeek 检查身份证是否确实在前一周。如果是这样,则过滤掉记录。现在,你只需要按周计算重新录制的ID。

+----+------+------------------+
|Week|newIDs|        showNewIDs|
+----+------+------------------+
|   1|     3|[ID_1, ID_2, ID_3]|
|   2|     2|      [ID_4, ID_5]|
|   3|     1|            [ID_6]|
+----+------+------------------+

请注意,代码段收集id只是为了说明目的,但这不是计算所必需的。

相关问题