mapDataframe列

egmofgnx  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(237)

我对python非常陌生,尝试使用azuredatabricks将标签分隔文件中的数据转换为json。我有以下输入数据:

ID  Title   NumberA NumberB
1   test1   0       1
2   test2   2       3

我试着把它转换成json格式:

[
    {
        "ID": 1,
        "Title": "title1",
        "Numbers": [
            {
                "Type": "TypeA",
                "Code": "0"
            },
            {
                "Type": "TypeB",
                "Code": "1"
            }
        ]
    },
    {
        "ID": 2,
        "Title": "title2",
        "Numbers": [
            {
                "Type": "TypeA",
                "Code": "2"
            },
            {
                "Type": "TypeB",
                "Code": "3"
            }
        ]
    }
]

我的输入数据有这样一个模式:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

class MySchemas:
    def input_struct():
        schema = StructType()
        schema.add('ID', IntegerType(), True)
        schema.add('Title', StringType(), True)
        schema.add('NumberA', IntegerType(), True)
        schema.add('NumberB', IntegerType(), True)
        schema.add('_corrupt_record', StringType(), True)
        return schema

在另一个类中,我使用模式读取输入数据,如下所示:

df_schema = MySchemas.input_struct()
inputs = self.spark.read.option('sep', "\t").option("header","true").option('mode', 'PERMISSIVE').schema(df_schema).csv(self.sourceFilePath, quote="", escape="")

我不知道最后一部分怎么做。我要把它拿走 NumberA 以及 NumberB 列,并用一个名为 Numbers 它是一个对象列表,其中包含一个文本字符串列 Type 另一列包含 NumberA 以及 NumberB .
有人能告诉我怎么做吗?我正在使用databricks runtime 5.5 lts和spark 2.4.3。

esyap4oy

esyap4oy1#

您可以创建结构数组并转换为json:

import pyspark.sql.functions as F

result = inputs.select(
    'ID', 'Title', 
    F.array(
        F.struct(
            F.lit('TypeA').alias('Type'), 
            F.col('NumberA').alias('Code')
        ), 
        F.struct(
            F.lit('TypeB').alias('Type'), 
            F.col('NumberB').alias('Code')
        )
    ).alias('Numbers')
).agg(
    F.collect_list(
        F.to_json(F.struct('ID', 'Title', 'Numbers'))
    ).alias('col')
)

result.show(truncate=False)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|col                                                                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"ID":1,"Title":"test1","Numbers":[{"Type":"TypeA","Code":0},{"Type":"TypeB","Code":1}]}, {"ID":2,"Title":"test2","Numbers":[{"Type":"TypeA","Code":2},{"Type":"TypeB","Code":3}]}]|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

如果您想输出json文件,可以跳过 .agg 分道扬镳 result.write.json('filepath') .

相关问题