无聚集Spark枢轴

5vf7fwbs  于 2023-02-13  发布在  Apache
关注(0)|答案(6)|浏览(130)

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html很好地解释了Spark的枢轴是如何工作的。
在我的python代码中,我使用了panda,但没有聚合,只是重置了索引和连接:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B'])
countryToMerge.index.name = 'ISO'
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')

这在spark中是如何工作的?
我尝试手动分组和加入,如下所示:

val grouped = countryKPI.groupBy("A").pivot("B")
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show

但那是行不通的。reset_index将如何融入spark/它将如何以spark原生的方式实现?

编辑

Python代码的最小示例:

import pandas as pd
from datetime import datetime, timedelta
import numpy as np
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"])
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO'])
dates['ISO'] = isos.ISO
dates['ISO'] = dates['ISO'].astype("category")
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'],
                       'indicator_id':['a','a','b','b'],
                       'value':[7,8,9,7]})
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id'])
countryToMerge.index.name = 'ISO'
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner'))

  dates  ISO  a  b
0  2016-01-01  ABC  7  9
1  2016-01-03  ABC  7  9
2  2016-01-05  ABC  7  9
3  2016-01-07  ABC  7  9
4  2016-01-09  ABC  7  9
5  2016-01-02  POL  8  7
6  2016-01-04  POL  8  7
7  2016-01-06  POL  8  7
8  2016-01-08  POL  8  7
9  2016-01-10  POL  8  7

跟随scala/spark

val dates = Seq(("2016-01-01", "ABC"),
    ("2016-01-02", "ABC"),
    ("2016-01-03", "POL"),
    ("2016-01-04", "ABC"),
    ("2016-01-05", "POL"),
    ("2016-01-06", "ABC"),
    ("2016-01-07", "POL"),
    ("2016-01-08", "ABC"),
    ("2016-01-09", "POL"),
    ("2016-01-10", "ABC")
  ).toDF("dates", "ISO")
    .withColumn("dates", 'dates.cast("Date"))

  dates.show
  dates.printSchema

  val countryKPI = Seq(("ABC", "a", 7),
    ("ABC", "b", 8),
    ("POL", "a", 9),
    ("POL", "b", 7)
  ).toDF("country_id3", "indicator_id", "value")

  countryKPI.show
  countryKPI.printSchema

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id")
ifsvaxew

ifsvaxew1#

这对我很有效--不是因为我支持Georg Heiler关于使用“平均值”进行聚合的主张。为了在不进行聚合的情况下应用透视,您只需以尽可能大的粒度指定groupBy项。如果满足此条件,则可以使用任何聚合项(平均值、最小值、最大值等)。

countryKPI.groupBy("country_id3").pivot("indicator_id").agg(avg("value").alias("value_term"))
t98cgbkg

t98cgbkg2#

在pyspark中,您可以使用以下命令:
类似于上面提到的@Derek Kaknes;创建一个唯一ID列,然后使用sum或某些其他聚集函数进行聚集。确保分组依据的列包括新创建的id_column。

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id_column", monotonically_increasing_id())
groupby_columns = ["id_column"] + your_desired_columns
df = df.groupBy(groupby_columns).pivot(pivot_column).sum(value_column)
lrpiutwd

lrpiutwd3#

将pyspark.sql函数导入为F df_new=df.select(“列1”,“列2”,“列3”).groupBy(“列1”,“列2”).agg(F.concat_ws(“,",F.收集集(“列3”))
在这里,无论你想在一行中包含哪一列的值,你都可以在collect_set中传递它们,我相信它应该可以工作!

dba5bblo

dba5bblo4#

不知道这是否是你要找的,但我需要把值透视到列中,并注意每列中每个值的存在:

df.groupBy('A').pivot('B').agg(lit(1))

如果存在则生成1,否则生成null。奇怪的是,在lit()中指定字符串并不起作用-所有列都返回该值。

9fkzdhlc

9fkzdhlc5#

在Spark中没有一个好的方法可以在不进行聚合的情况下进行透视,基本上它假设您只使用OneHotEncoder来实现该功能,但这缺乏直接透视的可读性。我发现的最好的方法是:

val pivot = countryKPI
  .groupBy("country_id3", "value")
  .pivot("indicator_id", Seq("a", "b"))
  .agg(first(col("indicator_id")))

pivot.show
+-----------+-----+----+----+
|country_id3|value|   a|   b|
+-----------+-----+----+----+
|        ABC|    8|null|   b|
|        POL|    9|   a|null|
|        POL|    7|null|   b|
|        ABC|    7|   a|null|
+-----------+-----+----+----+

但是,如果(country_id3, value)在数据集中不是唯一的,那么您将折叠行,并可能从透视列中获取一个有点无意义的first()值。
另一种方法是向数据集添加一个id列,根据该新id分组,透视所需列,然后联接回原始数据集。

val countryWithId = countryKPI.withColumn("id", monotonically_increasing_id)
val pivotted = countryWithId
.groupBy("id")
.pivot("indicator_id")
.agg(first(col("indicator_id")))

val pivot2 = countryWithId.join(pivotted, Seq("id")).drop("id") //.drop("indicator_id")

pivot2.show
+-----------+------------+-----+----+----+
|country_id3|indicator_id|value|   a|   b|
+-----------+------------+-----+----+----+
|        ABC|           a|    7|   a|null|
|        ABC|           b|    8|null|   b|
|        POL|           a|    9|   a|null|
|        POL|           b|    7|null|   b|
+-----------+------------+-----+----+----+

在本例中,您仍然拥有原始的透视列,但是如果愿意,也可以.drop()该透视列。

ozxc1zmp

ozxc1zmp6#

下面的代码片段看起来很有效--但我不确定按avg进行的聚合是否正确--尽管输出是“拟合数”。

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show

我不确定对于更大的数据量(平均值)来说,与仅仅重用值相比,这是否是“低效的”(因为我不想聚合)。

相关问题