apache spark页面结果或查看大型数据集上的结果

rwqw0loc  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(346)

我正在使用带有spark 1.6.3的hive
我有一个很大的数据集(40000行,20列左右,每列包含500字节-3kb的数据)
查询是3个数据集的连接
我希望能够页面的最终连接数据集,我发现我可以使用 row_number() OVER (ORDER BY 1) 为数据集中的每一行生成唯一的行号。
在这之后我可以做

SELECT * FROM dataset WHERE row between 1 AND 100

但是,有些资源建议不要使用 ORDER BY 由于它将所有数据放在一个分区中(我可以在日志中看到这种情况,洗牌分配将数据移动到一个分区),因此当发生这种情况时,会出现内存不足异常。
如何以更有效的方式分页数据集?
我已经- MEMORY_AND_DISK 因此,如果分区太大,它将溢出到磁盘(对于某些转换,我可以看到,在我不使用时,至少有一些数据溢出到磁盘) row_number() )

gjmwrych

gjmwrych1#

好吧,你可以把这个方法应用到你的最终连接Dataframe上。
您还应该将Dataframe持久化为一个文件,以保证顺序,因为重新评估可能会创建不同的顺序。

juud5qan

juud5qan2#

一种策略可以是首先只选择数据集的唯一\u键,然后只对该数据集应用行\u数函数。因为您是从一个大的数据集中选择一个列,所以在单个分区中容纳该列的可能性更高。

val dfKey = df.select("uniqueKey")
dfKey.createOrUpdateTempTable("dfKey")
val dfWithRowNum = spark.sql(select dfKey*, row_number() as row_number OVER (ORDER BY 1))
// save dfWithRowNum

完成对uniquekey的行号操作后;保存Dataframe。现在在下一阶段中,将这个Dataframe与更大的Dataframe连接起来,并将row\u number列附加到该Dataframe上。

dfOriginal.createOrUpdateTempTable("dfOriginal")
dfWithRowNum.createOrUpdateTempTable("dfWithRowNum")
val joined = spark.sql("select dfOriginal.* from dfOriginal join dfWithRowNum on dfOriginal.uniqueKey = dfWithRowNum.uniqueKey")
// save joined

现在您可以查询

SELECT * FROM joineddataset WHERE row between 1 AND 100

对于persist with memory\ U盘,我发现偶尔会因为内存不足而失败。我宁愿只在性能受到影响的地方使用disk\u,尽管执行是有保证的。

相关问题