带有scala的spark中dataframe列内的列名

2admgd59  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(515)


我正在使用scala的spark。2.4.3
我的salesperson数据框是这样的:它总共有54salesperson,我只举了3列的例子

Schema of SalesPerson table.
root
 |-- col: struct (nullable = false)
 |    |-- SalesPerson_1: string (nullable = true)
 |    |-- SalesPerson_2: string (nullable = true)
 |    |-- SalesPerson_3: string (nullable = true)

销售人员视图的数据。

SalesPerson_1|SalesPerson_2|SalesPerson_3
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    [Customer_1793,  Customer_202,  Customer_2461]
    [Customer_2424, Customer_130, Customer_787]
    [Customer_1061, Customer_318, Customer_706]
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

我的salesplace数据框看起来像

Schema of salesplace

 root
 |-- Place: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesplace
Place|Customer
Online| Customer_1793
Retail| Customer_1793
Retail| Customer_130
Online| Customer_130
Online| Customer_2461
Retail| Customer_2461
Online| Customer_2461

我试图检查salesperson表中的哪个客户在salesplace表中可用。有两个 additional column shows customer belong to salesperson 以及salesplace表中的客户发生计数
预期产量:

CustomerBelongstoSalesperson|Customer     |occurance|
SalesPerson_1               |Customer_1793|2
SalesPerson_2               |Customer_130 |2 
SalesPerson_3               |Customer_2461|3
SalesPerson_2               |Customer_202 |0
SalesPerson_1               |Customer_2424|0
SalesPerson_1               |Customer_1061|0
SalesPerson_2               |Customer_318 |0
SalesPerson_3               |Customer_787 |0

代码:

Error:
The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 54 aliases but got Salesperson,Customer ;

在spark中似乎没有什么关键。我不确定是否可以将columnname作为值放入列中。。。。有人能帮我想一想怎么做吗。。。。。。。。谢谢

muk1a3rh

muk1a3rh1#

试试这个-

加载提供的测试数据

val data1 =
      """
        |salesperson1          |  salesperson2
        |Customer_17         |Customer_202
        |Customer_24         |Customer_130
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +------------+------------+
      * |salesperson1|salesperson2|
      * +------------+------------+
      * |Customer_17 |Customer_202|
      * |Customer_24 |Customer_130|
      * +------------+------------+
      *
      * root
      * |-- salesperson1: string (nullable = true)
      * |-- salesperson2: string (nullable = true)
      */

    val data2 =
      """
        |Place  |Customer
        |shop  |Customer_17
        |Home  |Customer_17
        |shop  |Customer_17
        |Home  |Customer_130
        |Shop  |Customer_202
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.show(false)
    df2.printSchema()
    /**
      * +-----+------------+
      * |Place|Customer    |
      * +-----+------------+
      * |shop |Customer_17 |
      * |Home |Customer_17 |
      * |shop |Customer_17 |
      * |Home |Customer_130|
      * |Shop |Customer_202|
      * +-----+------------+
      *
      * root
      * |-- Place: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

取消PIVOT和左连接

val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)
    /**
      * +------------+------------+
      * |Salesperson |Customer    |
      * +------------+------------+
      * |salesperson1|Customer_17 |
      * |salesperson2|Customer_202|
      * |salesperson1|Customer_24 |
      * |salesperson2|Customer_130|
      * +------------+------------+
      */

    processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

    /**
      * +------------+---------+------------+
      * |Customer    |Occurance|Salesperson |
      * +------------+---------+------------+
      * |Customer_130|1        |salesperson2|
      * |Customer_17 |3        |salesperson1|
      * |Customer_202|1        |salesperson2|
      * |Customer_24 |0        |salesperson1|
      * +------------+---------+------------+
      */

相关问题