如何在pyspark中将结构类似(key1,list(key2,value))的列表转换为Dataframe?

3bygqnnd  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(432)

我的清单如下:
其类型如下所示:

[(key1, [(key11, value11), (key12, value12)]), (key2, [(key21, value21), (key22, value22)...])...]

示例结构如下所示:

[('1052762305',
  [('1007819788', 0.9206884810054885),
   ('1005886801', 0.913818268123084),
   ('1003863766', 0.9131746152849486),
   ('1007811435', 0.9128666156173751),
   ('1005879599', 0.9126368405937075),
   ('1003705572', 0.9122051062936369),
   ('1007804896', 0.9083424459788203),
   ('1005890270', 0.8982097535650703),
   ('1007806781', 0.8708761186829758),
   ('1003670458', 0.8452789033694487)]),
 ('1064808607',
  [('1007804896', 0.9984397647563017),
   ('1003705572', 0.9970498347406341),
   ('1005879599', 0.9951581013190172),
   ('1007811435', 0.9934813787902085),
   ('1005886801', 0.9930572794622374),
   ('1003863766', 0.9928815742735568),
   ('1007819788', 0.9869723713790797),
   ('1005890270', 0.9642640856016443),
   ('1007806781', 0.9211558765137313),
   ('1003670458', 0.8519872445941068)])]

我想把它转换成

key1          key2             score
1052762305    1007819788    0.9206884810054885
1052762305    1005886801    0.913818268123084
1052762305    1003863766    0.9131746152849486
  ...            ...              ...
1064808607    1007804896    0.9984397647563017
1064808607    1003705572    0.9970498347406341
1064808607    1005879599    0.9951581013190172
  ...            ...              ...

我们如何在pyspark中实现这一点?

u7up0aaq

u7up0aaq1#

您基本上需要执行以下操作:
从列表中创建Dataframe
使用 explode 通过从对中提取键值(&V) select 可以这样做(源数据位于名为 a ):

from pyspark.sql.functions import explode, col
df = spark.createDataFrame(a, ['key1', 'val'])
df2 = df.select(col('key1'), explode(col('val')).alias('val'))
df3 = df2.select('key1', col('val')._1.alias('key2'), col('val')._2.alias('value'))

我们可以检查架构和数据是否匹配:

>>> df3.printSchema()
root
 |-- key1: string (nullable = true)
 |-- key2: string (nullable = true)
 |-- value: double (nullable = true)

>>> df3.show(2)
+----------+----------+------------------+
|      key1|      key2|             value|
+----------+----------+------------------+
|1052762305|1007819788|0.9206884810054885|
|1052762305|1005886801| 0.913818268123084|
+----------+----------+------------------+
only showing top 2 rows

我们还可以检查中间结果的模式:

>>> df.printSchema()
root
 |-- key1: string (nullable = true)
 |-- val: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: double (nullable = true)

>>> df2.printSchema()
root
 |-- key1: string (nullable = true)
 |-- val: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: double (nullable = true)
svdrlsy4

svdrlsy42#

您可以使用输入预先创建一个模式。使用explode并访问value结构中的元素。

from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.types import  StructType, StructField,StringType,ArrayType, DoubleType

    spark = SparkSession.builder \
        .appName('SO')\
        .getOrCreate()

        schema = StructType([StructField("key1",StringType()), StructField("value",ArrayType(
            StructType([ StructField("key2", StringType()),
               StructField("score", DoubleType())])
        )) ])

    df = spark.createDataFrame(
        [('1052762305',
          [('1007819788', 0.9206884810054885),
           ('1005886801', 0.913818268123084),
           ('1003863766', 0.9131746152849486),
           ('1007811435', 0.9128666156173751),
           ('1005879599', 0.9126368405937075),
           ('1003705572', 0.9122051062936369),
           ('1007804896', 0.9083424459788203),
           ('1005890270', 0.8982097535650703),
           ('1007806781', 0.8708761186829758),
           ('1003670458', 0.8452789033694487)]),

         ('1064808607',
          [('1007804896', 0.9984397647563017),
           ('1003705572', 0.9970498347406341),
           ('1005879599', 0.9951581013190172),
           ('1007811435', 0.9934813787902085),
           ('1005886801', 0.9930572794622374),
           ('1003863766', 0.9928815742735568),
           ('1007819788', 0.9869723713790797),
           ('1005890270', 0.9642640856016443),
           ('1007806781', 0.9211558765137313),
           ('1003670458', 0.8519872445941068)])
         ],schema
    )

    df.show()

    +----------+--------------------+
    |      key1|          value    |
    +----------+--------------------+
    |1052762305|[[1007819788, 0.9...|
    |1064808607|[[1007804896, 0.9...|
    +----------+--------------------+

    df.printSchema()

    root
     |-- key1: string (nullable = true)
     |-- value: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- key2: string (nullable = true)
     |    |    |-- score: double (nullable = true)

    df1=df.select('key1',F.explode('value').alias('value'))
    df1.show()
    +----------+--------------------+
    |      key1|          value     |
    +----------+--------------------+
    |1052762305|[1007819788, 0.92...|
    |1052762305|[1005886801, 0.91...|
    |1052762305|[1003863766, 0.91...|
    |1052762305|[1007811435, 0.91...|
    |1052762305|[1005879599, 0.91...|
    |1052762305|[1003705572, 0.91...|
    |1052762305|[1007804896, 0.90...|
    |1052762305|[1005890270, 0.89...|
    |1052762305|[1007806781, 0.87...|
    |1052762305|[1003670458, 0.84...|
    |1064808607|[1007804896, 0.99...|
    |1064808607|[1003705572, 0.99...|
    |1064808607|[1005879599, 0.99...|
    |1064808607|[1007811435, 0.99...|
    |1064808607|[1005886801, 0.99...|
    |1064808607|[1003863766, 0.99...|
    |1064808607|[1007819788, 0.98...|
    |1064808607|[1005890270, 0.96...|
    |1064808607|[1007806781, 0.92...|
    |1064808607|[1003670458, 0.85...|
    +----------+--------------------+

    df1.printSchema()

    root
     |-- key1: string (nullable = true)
     |-- value: struct (nullable = true)
     |    |-- key2: string (nullable = true)
     |    |-- score: double (nullable = true)

    df1.select("key1", "value.key2","value.score").show()

    +----------+----------+------------------+
    |      key1|      key2|             score|
    +----------+----------+------------------+
    |1052762305|1007819788|0.9206884810054885|
    |1052762305|1005886801| 0.913818268123084|
    |1052762305|1003863766|0.9131746152849486|
    |1052762305|1007811435|0.9128666156173751|
    |1052762305|1005879599|0.9126368405937075|
    |1052762305|1003705572|0.9122051062936369|
    |1052762305|1007804896|0.9083424459788203|
    |1052762305|1005890270|0.8982097535650703|
    |1052762305|1007806781|0.8708761186829758|
    |1052762305|1003670458|0.8452789033694487|
    |1064808607|1007804896|0.9984397647563017|
    |1064808607|1003705572|0.9970498347406341|
    |1064808607|1005879599|0.9951581013190172|
    |1064808607|1007811435|0.9934813787902085|
    |1064808607|1005886801|0.9930572794622374|
    |1064808607|1003863766|0.9928815742735568|
    |1064808607|1007819788|0.9869723713790797|
    |1064808607|1005890270|0.9642640856016443|
    |1064808607|1007806781|0.9211558765137313|
    |1064808607|1003670458|0.8519872445941068|

相关问题