https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html很好地解释了Spark的枢轴是如何工作的。
在我的python代码中,我使用了panda,但没有聚合,只是重置了索引和连接:
pd.pivot_table(data=dfCountries, index=['A'], columns=['B'])
countryToMerge.index.name = 'ISO'
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')
这在spark中是如何工作的?
我尝试手动分组和加入,如下所示:
val grouped = countryKPI.groupBy("A").pivot("B")
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show
但那是行不通的。reset_index将如何融入spark/它将如何以spark原生的方式实现?
编辑
Python代码的最小示例:
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"])
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO'])
dates['ISO'] = isos.ISO
dates['ISO'] = dates['ISO'].astype("category")
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'],
'indicator_id':['a','a','b','b'],
'value':[7,8,9,7]})
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id'])
countryToMerge.index.name = 'ISO'
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner'))
dates ISO a b
0 2016-01-01 ABC 7 9
1 2016-01-03 ABC 7 9
2 2016-01-05 ABC 7 9
3 2016-01-07 ABC 7 9
4 2016-01-09 ABC 7 9
5 2016-01-02 POL 8 7
6 2016-01-04 POL 8 7
7 2016-01-06 POL 8 7
8 2016-01-08 POL 8 7
9 2016-01-10 POL 8 7
跟随scala/spark
val dates = Seq(("2016-01-01", "ABC"),
("2016-01-02", "ABC"),
("2016-01-03", "POL"),
("2016-01-04", "ABC"),
("2016-01-05", "POL"),
("2016-01-06", "ABC"),
("2016-01-07", "POL"),
("2016-01-08", "ABC"),
("2016-01-09", "POL"),
("2016-01-10", "ABC")
).toDF("dates", "ISO")
.withColumn("dates", 'dates.cast("Date"))
dates.show
dates.printSchema
val countryKPI = Seq(("ABC", "a", 7),
("ABC", "b", 8),
("POL", "a", 9),
("POL", "b", 7)
).toDF("country_id3", "indicator_id", "value")
countryKPI.show
countryKPI.printSchema
val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id")
6条答案
按热度按时间ifsvaxew1#
这对我很有效--不是因为我支持Georg Heiler关于使用“平均值”进行聚合的主张。为了在不进行聚合的情况下应用透视,您只需以尽可能大的粒度指定groupBy项。如果满足此条件,则可以使用任何聚合项(平均值、最小值、最大值等)。
t98cgbkg2#
在pyspark中,您可以使用以下命令:
类似于上面提到的@Derek Kaknes;创建一个唯一ID列,然后使用sum或某些其他聚集函数进行聚集。确保分组依据的列包括新创建的id_column。
lrpiutwd3#
将pyspark.sql函数导入为F df_new=df.select(“列1”,“列2”,“列3”).groupBy(“列1”,“列2”).agg(F.concat_ws(“,",F.收集集(“列3”))
在这里,无论你想在一行中包含哪一列的值,你都可以在collect_set中传递它们,我相信它应该可以工作!
dba5bblo4#
不知道这是否是你要找的,但我需要把值透视到列中,并注意每列中每个值的存在:
如果存在则生成
1
,否则生成null
。奇怪的是,在lit()
中指定字符串并不起作用-所有列都返回该值。9fkzdhlc5#
在Spark中没有一个好的方法可以在不进行聚合的情况下进行透视,基本上它假设您只使用OneHotEncoder来实现该功能,但这缺乏直接透视的可读性。我发现的最好的方法是:
但是,如果
(country_id3, value)
在数据集中不是唯一的,那么您将折叠行,并可能从透视列中获取一个有点无意义的first()
值。另一种方法是向数据集添加一个id列,根据该新id分组,透视所需列,然后联接回原始数据集。
在本例中,您仍然拥有原始的透视列,但是如果愿意,也可以
.drop()
该透视列。ozxc1zmp6#
下面的代码片段看起来很有效--但我不确定按avg进行的聚合是否正确--尽管输出是“拟合数”。
我不确定对于更大的数据量(平均值)来说,与仅仅重用值相比,这是否是“低效的”(因为我不想聚合)。