在spark 2.4中取消枢轴后,列值未正确对齐

falq053o  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(399)

spark 2.4第一个Dataframe看起来像:

Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615
Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605
Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265
Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178

第二个Dataframe看起来和图中的一样。

代码:

val df1 = spark.read.schema("Salesperson STRING, Customer STRING")
  .option("sep", ":")
  .csv("D:/New folder/prints/salesperson.txt".split(System.lineSeparator()).toSeq.toDS())
df1.show(false)
df1.printSchema()

val df2 = spark.read
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("nullValue", "null")
  .csv("D:/New folder/prints/Type.csv")
df2.show(false)
df2.printSchema()

问题来了。当我打印模式的时候,我变得像。

val processedDF = df1.withColumn("Customer", explode(split(trim(col("Customer")), ",")))
processedDF.show(false)

processedDF.join(df2, Seq("Customer"), "left")
  .groupBy("Customer")
  .agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson"))
  .show(false)


预期输出

从dataframe1带来所有客户及其销售人员的预期输出,以及客户在DataFrame2中出现的次数(称为发生)。我添加了快照图片以提高可见性。请帮帮我。

lyr7nygr

lyr7nygr1#

你不需要 stack 这里-
使用分隔符读取文本文件
: split 以及 explode 客户 Left Join 使用df2
计算发生率

加载数据1。使用分隔符读取文本文件:

val data1 =
      """
        |Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615
        |Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605
        |Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265
        |Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178
      """.stripMargin
//    val stringDS1 = data1.split(System.lineSeparator())
//      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
//      .toSeq.toDS()
    val df1 = spark.read.schema("Salesperson STRING, Customer STRING")
      .option("sep", ":")
      .csv(data1.split(System.lineSeparator()).toSeq.toDS())
    df1.show(false)
    df1.printSchema()
    /**
      * +--------------+-------------------------------------------------------+
      * |Salesperson   |Customer                                               |
      * +--------------+-------------------------------------------------------+
      * |Salesperson_21| Customer_575,Customer_2703,Customer_2682,Customer_2615|
      * |Salesperson_11| Customer_454,Customer_158,Customer_1859,Customer_2605 |
      * |Salesperson_10| Customer_1760,Customer_613,Customer_3008,Customer_1265|
      * |Salesperson_4 | Customer_1545,Customer_1312,Customer_861,Customer_2178|
      * +--------------+-------------------------------------------------------+
      *
      * root
      * |-- Salesperson: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

    val data2 =
      """
        |Type  |Customer
        |shop  |Customer_17
        |Home  |Customer_2703
        |shop  |Customer_2703
        |Home  |Customer_575
        |Shop  |Customer_202
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.show(false)
    df2.printSchema()
    /**
      * +----+-------------+
      * |Type|Customer     |
      * +----+-------------+
      * |shop|Customer_17  |
      * |Home|Customer_2703|
      * |shop|Customer_2703|
      * |Home|Customer_575 |
      * |Shop|Customer_202 |
      * +----+-------------+
      *
      * root
      * |-- Type: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

2. 分割和扩大客户

val processedDF = df1.withColumn("Customer", explode(split(trim(col("Customer")), ",")))
     processedDF.show(false)
    /**
      * +--------------+-------------+
      * |Salesperson   |Customer     |
      * +--------------+-------------+
      * |Salesperson_21|Customer_575 |
      * |Salesperson_21|Customer_2703|
      * |Salesperson_21|Customer_2682|
      * |Salesperson_21|Customer_2615|
      * |Salesperson_11|Customer_454 |
      * |Salesperson_11|Customer_158 |
      * |Salesperson_11|Customer_1859|
      * |Salesperson_11|Customer_2605|
      * |Salesperson_10|Customer_1760|
      * |Salesperson_10|Customer_613 |
      * |Salesperson_10|Customer_3008|
      * |Salesperson_10|Customer_1265|
      * |Salesperson_4 |Customer_1545|
      * |Salesperson_4 |Customer_1312|
      * |Salesperson_4 |Customer_861 |
      * |Salesperson_4 |Customer_2178|
      * +--------------+-------------+
      */

3. 与df2和DF4左连接。计算发生率

processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

    /**
      * +-------------+---------+--------------+
      * |Customer     |Occurance|Salesperson   |
      * +-------------+---------+--------------+
      * |Customer_1312|0        |Salesperson_4 |
      * |Customer_1545|0        |Salesperson_4 |
      * |Customer_1760|0        |Salesperson_10|
      * |Customer_2682|0        |Salesperson_21|
      * |Customer_2703|2        |Salesperson_21|
      * |Customer_3008|0        |Salesperson_10|
      * |Customer_454 |0        |Salesperson_11|
      * |Customer_613 |0        |Salesperson_10|
      * |Customer_1265|0        |Salesperson_10|
      * |Customer_158 |0        |Salesperson_11|
      * |Customer_1859|0        |Salesperson_11|
      * |Customer_2178|0        |Salesperson_4 |
      * |Customer_2605|0        |Salesperson_11|
      * |Customer_2615|0        |Salesperson_21|
      * |Customer_575 |1        |Salesperson_21|
      * |Customer_861 |0        |Salesperson_4 |
      * +-------------+---------+--------------+
      */

相关问题