pyspark 将“并行阵列”合并到 Dataframe 结构中

bkhjykvo  于 2023-03-11  发布在  Spark
关注(0)|答案(2)|浏览(148)

我在json中有以下数据结构,我试图使用AWS Glue将其放入 Dataframe 中:

{
    "out": [
        {
            "attr": [ "a1", "a2", "a3" ],
            "val": [ 1, 2, 3 ],
            "text": "test1"
        },
        {
            "attr": [ "a4", "a5", "a6" ],
            "val": [ 4, 5, 6 ],
            "text": "test2"
        }
        
    ],
    "ids": [
        "id1",
        "id2"
    ]    
}

“ids”字段是一个与“out”中的条目并行的数组。我一直在尝试获得以下内容:

id     text     attr            val
--     ----     ----            ---
id1    test1    [a1, a2, a3]    [1,2,3]
id2    test2    [a4, a5, a6]    [4,5,6]

我已经能够将“out”的id和内容拆分成两个 Dataframe ,但是我找不到一种方法将它们水平连接起来。
使用

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session

print("Loading data...")
df = spark.read.json(<location>)
df.printSchema()

我得到了以下模式:

root
 |-- out: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attr: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- val: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)
kuarbcqp

kuarbcqp1#

使用 Dataframe API,

df = spark.read.json('test.json', multiLine=True)

df.select(f.explode(f.arrays_zip('ids', 'out')).alias('data')) \
  .select('data.ids', 'data.out.*') \
  .show(truncate=False)

+---+------------+-----+---------+
|ids|attr        |text |val      |
+---+------------+-----+---------+
|id1|[a1, a2, a3]|test1|[1, 2, 3]|
|id2|[a4, a5, a6]|test2|[4, 5, 6]|
+---+------------+-----+---------+
8hhllhi2

8hhllhi22#

在我的例子中,我使用spark SQL,但是我确信你也可以用plain pyspark来实现它。关键的想法是使用arrays_zip函数。还有zip_with,你可能想看看。

import json

from pyspark.sql import SparkSession

data = """
{
    "out": [
        {
            "attr": [ "a1", "a2", "a3" ],
            "val": [ 1, 2, 3 ],
            "text": "test1"
        },
        {
            "attr": [ "a4", "a5", "a6" ],
            "val": [ 4, 5, 6 ],
            "text": "test2"
        }
        
    ],
    "ids": [
        "id1",
        "id2"
    ]    
}
"""

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    data=[json.loads(data)],
    schema="struct<out:array<struct<attr:array<string>,val:array<int>,text:string>>,ids:array<string>>")
df.createOrReplaceTempView("input")

spark.sql("""
SELECT
    zipped.ids as id, 
    zipped.out.*
FROM input 
    LATERAL VIEW explode(arrays_zip(out, ids)) _z as zipped
""").show(truncate=False)

退货:

+---+------------+---------+-----+
|id |attr        |val      |text |
+---+------------+---------+-----+
|id1|[a1, a2, a3]|[1, 2, 3]|test1|
|id2|[a4, a5, a6]|[4, 5, 6]|test2|
+---+------------+---------+-----+

相关问题