我在pysparkDataframe中做了一个groupby over-week列,并在两个成功的星期数之间收集新的不同id。
我试着做一个groupby over week列,然后在dataframe上聚合collect\u set方法以获得所有可用的id,然后我逐个比较列表,以获得两个连续列表之间的不同id,按周数排列,如下所示:
输入:
周末ID\u 11id\u 21id\u 32id\u 12id\u 42id\u 53id\u 6
输出:
weekid\u listdiff\u listnew\u different\u id\u count1id\u 1,id\u 2,id\u 3id\u 1,id\u 2,id\u 3-2id\u 1,id\u 4,id\u 5id\u 4,id\u 523id\u 2,id\u 6id\u 61
这里的问题是,在我的例子中,我拥有大量的id(超过900万个id),spark会话由于内存不足而被终止,我想是(错误500)!
有没有其他解决方案可以让pyspark连续两周获得新的不同id的列表?
1条答案
按热度按时间6kkfgxo01#
为了扩展,您需要改为按id聚合,而不收集任何结果。请尝试以下方法:
第一,功能
lag
使用一个窗口函数创建前一周的新列,该函数允许单独考虑每个id(分区)并按时间顺序对周进行排序。这可以很好地扩展,因为spark任务是由一组id组成的。那么,
isInPrevWeek
检查身份证是否确实在前一周。如果是这样,则过滤掉记录。现在,你只需要按周计算重新录制的ID。请注意,代码段收集id只是为了说明目的,但这不是计算所必需的。