from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
实施示例:
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
我在Spark for Scala中寻找melt的实现时遇到了这个问题。 发布我的Scala端口,以防有人也偶然发现这一点。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/**Extends the [[org.apache.spark.sql.DataFrame]] class
*
* @param df the data frame to melt
*/
implicit class DataFrameFunctions(df: DataFrame) {
/**Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
*
* melt is (kind of) the inverse of pivot
* melt is currently (02/2017) not implemented in spark
*
* @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
* @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
*
* @todo method overloading for simple calling
*
* @param id_vars the columns to preserve
* @param value_vars the columns to melt
* @param var_name the name for the column holding the melted columns names
* @param value_name the name for the column holding the values of the melted columns
*
*/
def melt(
id_vars: Seq[String], value_vars: Seq[String],
var_name: String = "variable", value_name: String = "value") : DataFrame = {
// Create array<struct<variable: str, value: ...>>
val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)
// Add to the DataFrame and explode
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
return _tmp.select(cols: _*)
}
}
6条答案
按热度按时间55ooxyrt1#
这里没有内置函数(如果你使用SQL和Hive支持,你可以使用
stack
函数,但是它没有在Spark中公开,也没有本地实现),但是你自己的调用很简单。实施示例:
还有一些测试(基于Pandas doctests):
第一个
注意:为了与旧Python版本一起使用,请移除类型注解。
相关:
rbpvctlc2#
我在Spark for Scala中寻找
melt
的实现时遇到了这个问题。发布我的Scala端口,以防有人也偶然发现这一点。
因为我还没有那么先进的考虑
Scala
,我相信有改进的空间。欢迎提出任何意见。
4c8rllxm3#
投票支持user6910411的答案。它按预期工作,但是,它不能很好地处理None值。因此,我将他的melt函数重构为以下形式:
使用以下 Dataframe 进行测试:
第一个
7gcisfzg4#
UPD
最后,我找到了最有效的实现,它使用了我的Yarn配置中的所有资源。
对于非常宽的 Dataframe ,在user6910411答案的_vars_and_vals生成处,我发现性能下降。
通过selectExpr实现熔化非常有用
5kgi1eie5#
使用列表解析创建列名和列值的struct列,并使用magic inline分解新列。
sg3maiej6#
1)
复制和粘贴2)
更改前2个变量完整测试: