如何在pyspark中将DataFrame转换回正常的RDD?

cidc1ykv  于 2022-11-16  发布在  Apache
关注(0)|答案(3)|浏览(261)

我需要使用

(rdd.)partitionBy(npartitions, custom_partitioner)

所有的DataFrame方法都只引用DataFrame结果。那么如何从DataFrame数据创建RDD呢?
注:这是对1.2.0的变更(在1.3.0中)。

更新来自@dpangmao的回答:方法是. rdd。我很想了解(a)它是否是公共的以及(b)性能影响是什么。

(a)是的,(B)-您可以看到,这里有重要的性能影响:必须通过调用mapPartitions创建新的RDD:
dataframe.py中(注意文件名也发生了变化(原来是sql.py)):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd
eh57zj3b

eh57zj3b1#

使用方法.rdd,如下所示:

rdd = df.rdd
mf98qq94

mf98qq942#

@dapangmao的答案有效,但是它没有给予常规的spark RDD,它返回一个Row对象.如果你想拥有常规的RDD格式。
试试看:

rdd = df.rdd.map(tuple)

rdd = df.rdd.map(list)
vs3odd8k

vs3odd8k3#

kennyut/Kistian给出的答案非常好用,但要获得精确的RDD(如输出),RDD由属性列表组成例如,[1,2,3,4],我们可以使用flatmap命令,如下所示:

rdd = df.rdd.flatMap(list)

rdd = df.rdd.flatMap(lambda x: list(x))

相关问题