我在过滤大型数据集时遇到了一个问题,我认为这是因为连接效率低下。我想做的是:
数据集信息包含大量用户数据,由用户id和时间戳标识。dataset filter\u dates为每个用户id包含最近处理的时间戳。
然后,处理作业需要从一个大的输入源中确定info中哪些数据是新的,哪些数据需要处理。
我试着做的是:
info
.join(
broadcast(filter_dates),
info("userid") === filter_dates("userid"),
"left"
)
.filter('info_date >= 'latest_date || 'latest_date.isNull)
但这是难以置信的慢(需要很多小时),在spark ui中,我看到它处理大量的数据作为输入。
然后,为了好玩,我试了一下:
val startDate = // Calculate the minimum startDate from filter_dates
info
.filter('info_date >= startDate)
这是非常快的(需要几分钟),但这当然不是最佳的,因为它最终会重新处理一些数据。这让我相信我加入数据集的方式有根本的问题。
有人知道我如何改进连接吗?
暂无答案!
目前还没有任何答案,快来回答吧!