pyspark 如何融化Spark DataFrame?

os8fio9y  于 2022-11-01  发布在  Spark
关注(0)|答案(6)|浏览(136)

在PySpark或者至少在Scala中,是否有一个与Apache Spark中的Pandas Melt函数等效的函数?
到目前为止,我一直在Python中运行一个示例数据集,现在我想对整个数据集使用Spark。

55ooxyrt

55ooxyrt1#

这里没有内置函数(如果你使用SQL和Hive支持,你可以使用stack函数,但是它没有在Spark中公开,也没有本地实现),但是你自己的调用很简单。

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable

实施示例:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

还有一些测试(基于Pandas doctests):
第一个
注意:为了与旧Python版本一起使用,请移除类型注解。
相关:

rbpvctlc

rbpvctlc2#

我在Spark for Scala中寻找melt的实现时遇到了这个问题。
发布我的Scala端口,以防有人也偶然发现这一点。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/**Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {

    /**Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}

因为我还没有那么先进的考虑Scala,我相信有改进的空间。
欢迎提出任何意见。

4c8rllxm

4c8rllxm3#

投票支持user6910411的答案。它按预期工作,但是,它不能很好地处理None值。因此,我将他的melt函数重构为以下形式:

from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

使用以下 Dataframe 进行测试:
第一个

7gcisfzg

7gcisfzg4#

UPD

最后,我找到了最有效的实现,它使用了我的Yarn配置中的所有资源。

from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

对于非常宽的 Dataframe ,在user6910411答案的_vars_and_vals生成处,我发现性能下降。
通过selectExpr实现熔化非常有用

columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...
5kgi1eie

5kgi1eie5#

使用列表解析创建列名和列值的struct列,并使用magic inline分解新列。

melted_df=(df.withColumn(
                 #Create struct of column names and corresponding values
                'tab',F.array(*[F.struct(lit(x).alias('var'),F.col(x).alias('val'))for x in df.columns if x!='A'] ))
                 #Explode the column
                 .selectExpr('A',"inline(tab)")

)

melted_df.show()

+---+---+---+
|  A|var|val|
+---+---+---+
|  a|  B|  1|
|  a|  C|  2|
|  b|  B|  3|
|  b|  C|  4|
|  c|  B|  5|
|  c|  C|  6|
+---+---+---+
sg3maiej

sg3maiej6#

1)复制和粘贴
2)更改前2个变量

to_melt = {'latin', 'greek', 'chinese'}
new_names = '(lang, letter)'

melt_list = [f"\'{c}\', `{c}`" for c in to_melt]
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(melt_list)}, {','.join(melt_list)}) {new_names}")
)

完整测试:

from pyspark.sql import functions as F
df = spark.createDataFrame([(101, "A", "Σ", "西"), (102, "B", "Ω", "诶")], ['ID', 'latin', 'greek', 'chinese'])
df.show()

# +---+-----+-----+-------+

# | ID|latin|greek|chinese|

# +---+-----+-----+-------+

# |101|    A|    Σ|     西|

# |102|    B|    Ω|     诶|

# +---+-----+-----+-------+

to_melt = {'latin', 'greek', 'chinese'}
new_names = '(lang, letter)'

melt_list = [f"\'{c}\', `{c}`" for c in to_melt]
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(melt_list)}, {','.join(melt_list)}) {new_names}")
)

df.show()

# +---+-------+------+

# | ID|   lang|letter|

# +---+-------+------+

# |101|  latin|     A|

# |101|  greek|     Σ|

# |101|chinese|    西|

# |102|  latin|     B|

# |102|  greek|     Ω|

# |102|chinese|    诶|

# +---+-------+------+

相关问题