如何使用pyspark在dataframe中按位置合并两个列表

iswrvxsc  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(714)

我有如下的Dataframe。
当前Dataframe

+---+--------+---------=+
| id|size    |variantID |
+---+----+---+----------+
|  1| [10,20]| [150,160]|
|  2| [2]    | [1]      |
|  3| []     |   []     |
+---+--------+----------+

我想添加一个新的列,将size数组和variantid数组按位置合并到这个符号(|)中。从这里我想要一个名为sizemap的新数组。大小中的元素数与variantid列相同。
预期产量:

+---+--------+---------------------------+
| id|size    |variantID |sizeMap         |
+---+----+---+---------------------------+
|  1| [10,20]| [150,160]|[10|150, 20|160]|
|  2| [2]    | [1]      |  [2|1]         |
|  3| []     |   []     |   []           |
+---+--------+----------------------------+

你能帮我解决这个问题吗。。。!

xiozqbni

xiozqbni1#

我有下面的解决办法,这将工作。但由于自定义项的存在,对于大数据来说可能会很慢。最后一列也是字符串,因为它具有字符串管道字符“|”。

from pyspark.sql.functions import *
from pyspark.sql.types import *
values = [(1,[10,20], [150,160]), 
          (2,[2], [2|1]  ), 
          (3,[], [])]
rdd = sc.parallelize(values)
schema = StructType([StructField("id", IntegerType(), True),
                     StructField("size", ArrayType(IntegerType()), True),
                    StructField("variantID", ArrayType(IntegerType()), True)])
data = spark.createDataFrame(rdd, schema)
data.show()
"""
+---+--------+----------+
| id|    size| variantID|
+---+--------+----------+
|  1|[10, 20]|[150, 160]|
|  2|     [2]|       [3]|
|  3|      []|        []|
+---+--------+----------+
"""
def arrangeAsReuired(inputString) :
  inputString = inputString.replace("[","").replace("]","")
  if inputString.strip() in "[]&[]" :
    sizeMapPopulated = "[]"  
  else :
    firstArray = inputString.split("&")[0].split(",")
    secondArray = inputString.split("&")[1].split(",")
    sizeMapPopulated = [str(firstArray[x]) + "|" + str(secondArray[x]) for x in range(0, len(firstArray), 1)]
  return str(sizeMapPopulated)
udfToReturnData = udf(lambda z: arrangeAsReuired(z), StringType())
spark.udf.register("udfToReturnData", udfToReturnData)

data = data.withColumn("sizeMap", \
                       udfToReturnData(concat(col("size").cast("string"), lit("&"), col("variantID").cast("string")).cast("string"))) \
           .select("id","size","sizeMap")
data.show(20,False)
"""
+---+--------+----------------------+
|id |size    |sizeMap               |
+---+--------+----------------------+
|1  |[10, 20]|['10|150', ' 20| 160']|
|2  |[2]     |['2|3']               |
|3  |[]      |[]                    |
+---+--------+----------------------+
"""
e0uiprwp

e0uiprwp2#

也许这很有用(用scala编写,但是可以在pyspark中使用,只需很少的修改)

加载提供的测试数据

val df =
      spark.sql(
        """
          |select id, size, variantID from values
          | (1, array(10, 20), array(150, 160)),
          | (2, array(2), array(1)),
          | (3, array(null), array(null))
          | T(id, size, variantID)
        """.stripMargin)
    df.show(false)
    df.printSchema()
    /**
      * +---+--------+----------+
      * |id |size    |variantID |
      * +---+--------+----------+
      * |1  |[10, 20]|[150, 160]|
      * |2  |[2]     |[1]       |
      * |3  |[]      |[]        |
      * +---+--------+----------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- size: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- variantID: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      */

按位置的zip 2数组(不带udf)

val p = df.withColumn("sizeMap", arrays_zip($"size", $"variantID"))
      .withColumn("sizeMap", expr("TRANSFORM(sizeMap, x -> concat_ws('|', x.size, x.variantID))"))
    p.show(false)
    p.printSchema()

    /**
      * +---+--------+----------+----------------+
      * |id |size    |variantID |sizeMap         |
      * +---+--------+----------+----------------+
      * |1  |[10, 20]|[150, 160]|[10|150, 20|160]|
      * |2  |[2]     |[1]       |[2|1]           |
      * |3  |[]      |[]        |[]              |
      * +---+--------+----------+----------------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- size: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- variantID: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- sizeMap: array (nullable = false)
      * |    |-- element: string (containsNull = false)
      */

相关问题