基于子列的sparkDataframe转置

sqyvllje  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(462)

我有一个sparkDataframe,它看起来像这样:

root
|-- 0000154d-7585-5eb283ff985c: struct (nullable = true)
|    |-- collaborative_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- content_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- curated_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- discovery_score: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- original_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- recipe_id: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|-- 00005426-2675-68085cd359c7: struct (nullable = true)
|    |-- collaborative_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- content_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- curated_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- discovery_score: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- original_rank: array (nullable = true)
|    |    |-- element: long (containsNull = true)
|    |-- recipe_id: array (nullable = true)
|    |    |-- element: long (containsNull = true)

每列都是一个用户id,例如。 0000154d-7585-5eb283ff985c ,每行由15000个用户组成(它们来自json文件,每个文件包含15000个用户)。
我想转换它,使每个用户id都是一行,每个子列 collaborative_rank, content_rank, curated_rank, discovery_score, original_rank and recipe_id 是以数组为值的列。我是新来spark的有没有无痛的方法?
编辑:
作为参考,我从中读取的input.json文件如下所示:

{"0000154d-7585-4096-a71a-5eb283ff985c": {"recipe_id": [1, 2, 3], "collaborative_rank": [1, 2, 3], "curated_rank": [1, 2, 3], "discovery_score": [1]}, "00005426-2675-4940-8394-e8085cd359c7": {"recipe_id": [] ... }

等。

goucqfw6

goucqfw61#

如果不想将其转换为rdd并执行udf,可以考虑堆叠Dataframe。

df = spark.read.json(r'C:\stackoverflow\samples\inp.json')

stack_characteristics = str(len(df.columns))+','+','.join([f"'{v}',`{v}`" for v in df.columns])

df.select(expr(f'''stack({stack_characteristics})''').alias('userId','vals')).\
   select('userId', 'vals.*').show()

+--------------------+------------------+------------+---------------+---------+
|              userId|collaborative_rank|curated_rank|discovery_score|recipe_id|
+--------------------+------------------+------------+---------------+---------+
|0000154d-7585-409...|         [1, 2, 3]|   [1, 2, 3]|            [1]|[1, 2, 3]|
|00005426-2675-494...|         [1, 2, 3]|   [1, 2, 3]|            [1]|[1, 2, 3]|
+--------------------+------------------+------------+---------------+---------+
uqxowvwt

uqxowvwt2#

好的,下面的代码可以解决你的问题。考虑到输入json,

{"0000154d-7585-4096-a71a-5eb283ff985c": {"recipe_id": [1, 2, 3], "collaborative_rank": [1, 2, 3], "curated_rank": [1, 2, 3], "discovery_score": [1] }}
from pyspark.sql import Row

# read an input data

df=spark.read.json("/home/sathya/Desktop/stackoverflo/input.json")

# method to extract keys to columns

def extract_json(row):
    out_array = []
    data_dict = row.asDict()
    for k in data_dict.keys():
            out_array.append(Row(k, data_dict[k][0], data_dict[k][1],data_dict[k][2],data_dict[k][3]))
    return Row(*out_array)

# flatmap columns and extracting the data

rdd = df.rdd.flatMap(extract_json)

# df creation

df1=spark.createDataFrame(rdd)

df1.selectExpr("_1 as user_id","_2 as recipe_id", "_3 as collaborative_rank", "_4 as curated_rank", "_5 as discovery_score").show(truncate=False)
/*
+------------------------------------+---------+------------------+------------+---------------+
|user_id                             |recipe_id|collaborative_rank|curated_rank|discovery_score|
+------------------------------------+---------+------------------+------------+---------------+
|0000154d-7585-4096-a71a-5eb283ff985c|[1, 2, 3]|[1, 2, 3]         |[1]         |[1, 2, 3]      |
+------------------------------------+---------+------------------+------------+---------------+

* /

相关问题