python—将spark df按一列分组,并将一列的结果拆分为多列—旋转和选择性合并

tf7tbtn2  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(420)

我有以下几点建议:
请注意,只有在已通过运行以下命令安装spark的情况下,才可以在本地运行此命令。否则,在databricks集群上复制该问题,该集群将自动初始化spark上下文。

from pyspark.sql import SparkSession

spark =  SparkSession.builder.appName("test").getOrCreate()

sc = spark.sparkContext
spark_dataframe = pd.DataFrame({'id' : ['867', '430', '658', '157', '521', '867', '430', '867'],
                                'Probability':[0.12, 0.72, 0.32, 0.83, 0.12, 0.49, 0.14, 0.12], 
                                'RAG': ['G', 'R', 'A', 'R', 'G', 'A', 'G', 'G'],
                                'Timestamp': ['2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 16-45-32', '2020-07-01 16-45-32', '2020-07-01 15-45-32']})
spark_dataframe = spark.createDataFrame(spark_dataframe)

现在我想按id对sparkDataframe进行分组,并计算rag列的值,将它们拆分为不同的列。所以就这样吧,

+---+--------------------+-------------+------------+
| id||G(count)|A(count)|R(count)|Timestamp(max)     |
+---+--------------------+-------------+------------+
|867|        2|       1|       0|2020-07-01 17-49-32|
|430|        1|       0|       1|2020-07-01 17-49-32|
|658|        0|       1|       0|2020-07-01 17-49-32|
|157|        0|       0|       1|2020-07-01 17-49-32|
|521|        1|       0|       0|2020-07-01 17-49-32|
+---+--------------------+-------------+------------+

根据上面的spark数据框创建一个字典列表,如下所示:

final_list=[]
map_dictionary={"R":0.6, "A":0.3, "G":0.1}

final_list=[{"id": "867", "RAG": "G", "Timestamp": "2020-07-01 17-49-32"}, #because for the id 867 the G column had 2 counts greater than the rest A, R column values on the same row.
 {"id": "430", "RAG": "R", "Timestamp": "2020-07-01 17-49-32"} #because G and R had 1 occurrence but R has greater weight based on the map dictionary,...
] #length of the list is equal to 5 since five are the unique rows of the spark df above.
lnxxn5zx

lnxxn5zx1#

您可以对它们进行分组和旋转。

import pyspark.sql.functions as F

# %%

tst = sqlContext.createDataFrame([(867,0.12,'G','2020-07-01 17-49-32'),(430,0.72,'R','2020-07-01 17-49-32'),(658,0.32,'A','2020-07-01 17-49-32'),\
                                              (157,0.83,'R','2020-07-01 17-49-32'),(521,0.12,'G','2020-07-01 17-49-32'),(867,0.49,'A','2020-07-01 16-45-32'),
                                              (430,0.14,'G','2020-07-01 16-45-32'),(867,0.12,'G','2020-07-01 16-45-32')],
                                               schema=['id','Probability','RAG','Timestamp'])
tst1 = tst.groupby('id').pivot('RAG').agg(F.count('Probability').alias('count'),F.max('Timestamp').alias('time_stamp'))

# there will be one time stamp per value of 'RAG'. The below code will find maximum among them

ts_coln = [F.col(x) for x in tst1.columns if 'time_stamp' in x]

tst2 = tst1.withColumn('max_ts',F.greatest(*ts_coln))

结果:
+---+-------+

ogq8wdun

ogq8wdun4#

--

-------+-------------------+
| id|A_count|       A_time_stamp|G_count|       G_time_stamp|R_count|       R_time_stamp|             max_ts|
+---+-------+-------------------+-------+-------------------+-------+-------------------+-------------------+
|658|      1|2020-07-01 17-49-32|      0|               null|      0|               null|2020-07-01 17-49-32|
|430|      0|               null|      1|2020-07-01 16-45-32|      1|2020-07-01 17-49-32|2020-07-01 17-49-32|
|521|      0|               null|      1|2020-07-01 17-49-32|      0|               null|2020-07-01 17-49-32|
|157|      0|               null|      0|               null|      1|2020-07-01 17-49-32|2020-07-01 17-49-32|
|867|      1|2020-07-01 16-45-32|      2|2020-07-01 17-49-32|      0|               null|2020-07-01 17-49-32|
+---+-------+-------------------+-------+-------------------+-------+-------------------+-------------------+

最后,可以删除不相关的列

相关问题