scala中的复合轴unpivot

eqqqjvef  于 2021-05-27  发布在  Spark
关注(0)|答案(5)|浏览(342)

我使用的是spark和scala 2.4
我的salesperson数据框看起来是这样的:它总共有54salesperson,我只举了4列的例子
salesperson表的架构。

root
 |-- col: struct (nullable = false)
 |    |-- salesperson_4: string (nullable = true)
 |    |-- salesperson_10: string (nullable = true)
 |    |-- salesperson_11: string (nullable = true)
 |    |-- salesperson_21: string (nullable = true)

销售人员表数据。

+--------------+--------------+--------------+--------------+
|salesperson_4 |salesperson_10|salesperson_11|salesperson_21|
+--------------+--------------+--------------+--------------+
| Customer_933 | Customer_1760| Customer_454 | Customer_127 |
|Customer_1297 |Customer_2411 |Customer_158  |Customer_2703 |
|Customer_861  |Customer_1550 |Customer_812 |Customer_2976 |
j9per5c4

j9per5c42#

----+
我的salestype数据框看起来像
salestype架构

root
 |-- Type: string (nullable = true)
 |-- Customer: string (nullable = true)

salestype数据

|Type  |customer     |
+------+-------------+
|Online|Customer_933 |
|inshop|Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online|Customer_812 |
|Online|Customer_812 |
|inshop|Customer_127 |
+------+-------------+

我正在尝试检查salesperson表中的哪些客户在salestype表中可用。有两个 additional column, which shows customer belong to specific salesperson 以及salesplace表中的客户发生计数。基本上所有客户都来自salesperson表,它存在于salestype表中

Expected Output:
+------+-------------++------+-------------++------+-------------+
    CustomerBelongstoSalesperson|Customer     |occurance|
    salesperson_4               |Customer_933 |2
    salesperson_10              |Customer_2411|2 
    salesperson_4               |Customer_1297|1
    salesperson_10              |Customer_1550|1
    SalesPerson_21              |Customer_2976|1
    SalesPerson_11              |Customer_812 |2
    SalesPerson_21              |Customer_127 |1
    salesperson_4               |Customer_861 |0
    salesperson_10              |Customer_1760|0
    SalesPerson_11              |Customer_454 |0
    SalesPerson_11              |Customer_158 |0
     SalesPerson_21             |Customer_2703|0
+------+-------------++------+-------------++------+-------------+

代码:

val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)

processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

谢谢。。。。。。请分享你的建议

klr1opcd

klr1opcd5#

它在spark 2.4.0+中工作,

val sourceDF = Seq(
    ("Customer_933","Customer_1760","Customer_454","Customer_127"),
    ("Customer_1297","Customer_2411","Customer_158","Customer_2703"),
    ("Customer_861","Customer_1550","Customer_812","Customer_2976")
).toDF("salesperson_4","salesperson_10","salesperson_11","salesperson_21")
sourceDF.show()

/*
+-------------+--------------+--------------+--------------+
|salesperson_4|salesperson_10|salesperson_11|salesperson_21|
+-------------+--------------+--------------+--------------+
| Customer_933| Customer_1760|  Customer_454|  Customer_127|
|Customer_1297| Customer_2411|  Customer_158| Customer_2703|
| Customer_861| Customer_1550|  Customer_812| Customer_2976|
+-------------+--------------+--------------+--------------+

* /

val salesDF=
Seq(
("Online","Customer_933"),
("inshop","Customer_933"),
("inshop","Customer_1297"),
("Online","Customer_2411"),
("Online","Customer_2411"),
("Online","Customer_1550"),
("Online","Customer_2976"),
("Online","Customer_812"),
("Online","Customer_812"),
("inshop","Customer_127")).toDF("Type","Customer")

salesDF.show()

/*
+------+-------------+
|  Type|     Customer|
+------+-------------+
|Online| Customer_933|
|inshop| Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online| Customer_812|
|Online| Customer_812|
|inshop| Customer_127|
+------+-------------+

* /

val stringCol = sourceDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = sourceDF.selectExpr(s"stack(${sourceDF.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)

/*
+--------------+-------------+
|Salesperson   |Customer     |
+--------------+-------------+
|salesperson_4 |Customer_933 |
|salesperson_10|Customer_1760|
|salesperson_11|Customer_454 |
|salesperson_21|Customer_127 |
|salesperson_4 |Customer_1297|
|salesperson_10|Customer_2411|
|salesperson_11|Customer_158 |
|salesperson_21|Customer_2703|
|salesperson_4 |Customer_861 |
|salesperson_10|Customer_1550|
|salesperson_11|Customer_812 |
|salesperson_21|Customer_2976|
+--------------+-------------+

* /

processedDF.join(salesDF, Seq("Customer"), "left").groupBy("Customer").agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson")).show(false)

/*
+-------------+---------+--------------+
|Customer     |Occurance|Salesperson   |
+-------------+---------+--------------+
|Customer_2411|2        |salesperson_10|
|Customer_158 |0        |salesperson_11|
|Customer_812 |2        |salesperson_11|
|Customer_1760|0        |salesperson_10|
|Customer_2703|0        |salesperson_21|
|Customer_861 |0        |salesperson_4 |
|Customer_127 |1        |salesperson_21|
|Customer_2976|1        |salesperson_21|
|Customer_1297|1        |salesperson_4 |
|Customer_454 |0        |salesperson_11|
|Customer_933 |2        |salesperson_4 |
|Customer_1550|1        |salesperson_10|
+-------------+---------+--------------+

* /

相关问题