给定以下文件:
员工
技能
报告
等。
员工与其他每个文件之间存在1对n关系,例如,一个员工对应多个技能。每个文件在500mb到1.5gb之间,总共大约有10个文件。对于每个员工,我希望聚合/收集所有文件(技能、报告等)中的所有信息,并将其写入xml结构:
<employees>
<employee>
<skills>
<skill>...</skill>
<skill>...</skill>
...
</skills>
<reports
<report>...</report>
<report>...</report>
...
</reports>
...
</employee>
...
</employees>
我在做一些类似的事情:
val employeesRdd = employeesDf.map(r => (r.getAs[String]("employeeId"), r))
val skillsRdd = skillsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey()
val reportsRdd = reportsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey()
...
employeesRdd
.leftOuterJoin(skillsRdd)
.leftOuterJoin(reportsRdd)
...
.toLocalIterator
... // write <employee> nodes one by one
我遇到的问题是,所有groupbykey()操作都非常慢,就像长时间的操作一样。在运行了这么长时间之后,由于java.lang.outofmemoryerror:超出了gc开销限制,它崩溃了。我在本地模式下使用spark1.5.1,为jvm分配了大约20gb的内存。
1条答案
按热度按时间pb3s4cty1#
sparkDataframe的分区应该是您的最佳选择。
有针对性地将有助于在近距离内存储相关信息的数据。因此,它有助于快速获取所需信息。
官方文件,文件