pivot在pyspark中同时具有数值和分类值的两列上

esbemjvw  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(462)

我在pyspark中有这样一个数据集:从集合导入namedtuple

user_row = namedtuple('user_row', 'id time category value'.split())
    data = [
        user_row(1,1,'speed','50'),
        user_row(1,1,'speed','60'),
        user_row(1,2,'door', 'open'),
        user_row(1,2,'door','open'),
        user_row(1,2,'door','close'),
        user_row(1,2,'speed','75'),
        user_row(2,10,'speed','30'), 
        user_row(2,11,'door', 'open'),
        user_row(2,12,'door','open'),
        user_row(2,13,'speed','50'),
        user_row(2,13,'speed','40')
    ]

    user_df = spark.createDataFrame(data)
    user_df.show()
+---+----+--------+-----+
| id|time|category|value|
+---+----+--------+-----+
|  1|   1|   speed|   50|
|  1|   1|   speed|   60|
|  1|   2|    door| open|
|  1|   2|    door| open|
|  1|   2|    door|close|
|  1|   2|   speed|   75|
|  2|  10|   speed|   30|
|  2|  11|    door| open|
|  2|  12|    door| open|
|  2|  13|   speed|   50|
|  2|  13|   speed|   40|
+---+----+--------+-----+

我想得到的是下面这样的东西,按id和时间分组,以类别为轴心,如果是数字,则返回平均值,如果是类别,则返回模式。

+---+----+--------+-----+
| id|time|    door|speed|
+---+----+--------+-----+
|  1|   1|    null|   55|
|  1|   2|    open|   75|
|  2|  10|    null|   30|
|  2|  11|    open| null|
|  2|  12|    open| null|
|  2|  13|    null|   45|
+---+----+--------+-----+

我尝试过这个方法,但是对于绝对值,它返回null(我不担心speed列中的null)

df = user_df\
    .groupBy('id','time')\
    .pivot('category')\
    .agg(avg('value'))\
    .orderBy(['id', 'time'])\

    df.show()

+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  1|   2|null| 75.0|
|  2|  10|null| 30.0|
|  2|  11|null| null|
|  2|  12|null| null|
|  2|  13|null| 45.0|
+---+----+----+-----+
0sgqnhkj

0sgqnhkj1#

你可以做一个额外的轴心并合并它们。试试这个。

import pyspark.sql.functions as F
from collections import namedtuple

user_row = namedtuple('user_row', 'id time category value'.split())
data = [
    user_row(1,1,'speed','50'),
    user_row(1,1,'speed','60'),
    user_row(1,2,'door', 'open'),
    user_row(1,2,'door','open'),
    user_row(1,2,'door','close'),
    user_row(1,2,'speed','75'),
    user_row(2,10,'speed','30'), 
    user_row(2,11,'door', 'open'),
    user_row(2,12,'door','open'),
    user_row(2,13,'speed','50'),
    user_row(2,13,'speed','40')
]

user_df = spark.createDataFrame(data)

# %%

# user_df.show()

df = user_df.groupBy('id','time')\
            .pivot('category')\
            .agg(F.avg('value').alias('avg'),F.max('value').alias('max'))\

# %%

expr1= [x for x in df.columns if '_avg' in x]
expr2= [x for x in df.columns if 'max' in x]
expr=zip(expr1,expr2)

# %%

sel_expr= [F.coalesce(x[0],x[1]).alias(x[0].split('_')[0]) for x in expr]

# %%

df_final = df.select('id','time',*sel_expr).orderBy('id','time')

df_final.show()
+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  1|   2|open| 75.0|
|  2|  10|null| 30.0|
|  2|  11|open| null|
|  2|  12|open| null|
|  2|  13|null| 45.0|
+---+----+----+-----+
imzjd6km

imzjd6km2#

尝试收集数据并根据需要进行转换
Spark2.4+

user_df.groupby('id','time').pivot('category').agg(collect_list('value')).\
        select('id','time',col('door')[0].alias('door'),expr('''aggregate(speed, cast(0.0 as double), (acc, x) -> acc + x, acc -> acc/size(speed))''').alias('speed')).show()

+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  2|  13|null| 45.0|
|  2|  11|open| null|
|  2|  12|open| null|
|  2|  10|null| 30.0|
|  1|   2|open| 75.0|
+---+----+----+-----+

相关问题