我需要使用
(rdd.)partitionBy(npartitions, custom_partitioner)
所有的DataFrame方法都只引用DataFrame结果。那么如何从DataFrame数据创建RDD呢?
注:这是对1.2.0的变更(在1.3.0中)。
更新来自@dpangmao的回答:方法是. rdd。我很想了解(a)它是否是公共的以及(b)性能影响是什么。
(a)是的,(B)-您可以看到,这里有重要的性能影响:必须通过调用mapPartitions创建新的RDD:
在dataframe.py中(注意文件名也发生了变化(原来是sql.py)):
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd
3条答案
按热度按时间eh57zj3b1#
使用方法
.rdd
,如下所示:mf98qq942#
@dapangmao的答案有效,但是它没有给予常规的spark RDD,它返回一个Row对象.如果你想拥有常规的RDD格式。
试试看:
或
vs3odd8k3#
kennyut/Kistian给出的答案非常好用,但要获得精确的RDD(如输出),RDD由属性列表组成例如,[1,2,3,4],我们可以使用flatmap命令,如下所示:
或