spark连接两个dataframe并将结果dataframe转换为所需的json格式

kx1ctssn  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(367)

给定两个Dataframe。
df1是:

|emp_id|emp_site             |emp_name  

|1     |Washigton            | [Will, Smith] 
|2     |null                 | null
|3     |New York             | [Norman, Smith]
|4     |Iowa                 | [Ian, Smith]

df2是:

|emp_id|emp_site             |emp_name  

|1     |Washigton            | [Watson, Smith] 
|2     |Wisconsin            | [Sam, Robinson]
|3     |New York             | null
|4     |Illinois             | [Ican, Robinson]
|5     |Pennsylvania         | [Patrick, Robinson]

应该是一个df3,它是空的\u id列上df1和df2的连接(外部连接,以便所有df2记录都可用)。只有当df1和df2的相应列值不同时,df3才使所有列(empïu id除外)以“from”、“to”和“change”的形式表示(注意:如果df1和df2的列值相等,则应为null)。
注意:虽然“from”和“to”是自解释的,包含df1和df2中的值,但如果“from”值为null,则“change”表示“insert”。否则表示“更新”

|emp_id|emp_site                         |emp_name

|1     |null                             |[from -> [Will, Smith], to -> [Watson, Smith], change->update]
|2     |[to->Wisconsin, change->insert]  |[to -> [Sam, Robinson], change->update]
|3     |null                             |[from -> [Norman, Smith], change->update]
|4     |[from ->Iowa, to -> Illinois, change->insert]    |[from ->[Ian, Smith], to -> [Ican, Robinson], change-> insert]
|5     |[to -> Pennsylvania, change->insert]         | [to -> [Patrick, Robinson], change->insert]

尝试使用map实现所需的df3没有成功(特别是因为列类型并不总是字符串,即“from”和“to”可以根据df1和df2中的列类型保存结构类型)。尝试使用map数据结构在df3中构建“from”、“to”和“change”的原因是,最终这个df3需要转换成json。
非常感谢您的帮助。

t30tvxxf

t30tvxxf1#

试试这个-

1. 加载输入

val data =
      """
        |emp_id|emp_site             |emp_name
        |1     |Washigton            | Will
        |2     |null                 | null
        |3     |New York             | Norman
        |4     |Iowa                 | Ian
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)
    df1.show(false)
    df1.printSchema()
    /**
      * +------+---------+--------+
      * |emp_id|emp_site |emp_name|
      * +------+---------+--------+
      * |1     |Washigton|Will    |
      * |2     |null     |null    |
      * |3     |New York |Norman  |
      * |4     |Iowa     |Ian     |
      * +------+---------+--------+
      *
      * root
      * |-- emp_id: integer (nullable = true)
      * |-- emp_site: string (nullable = true)
      * |-- emp_name: string (nullable = true)
      */

    val data1 =
      """
        |emp_id|emp_site             |emp_name
        |1     |Washigton            | Watson
        |2     |Wisconsin            | Sam
        |3     |New York             | null
        |4     |Illinois             | Ican
        |5     |Pennsylvania         | Patrick
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df2.show(false)
    df2.printSchema()
    /**
      * +------+------------+--------+
      * |emp_id|emp_site    |emp_name|
      * +------+------------+--------+
      * |1     |Washigton   |Watson  |
      * |2     |Wisconsin   |Sam     |
      * |3     |New York    |null    |
      * |4     |Illinois    |Ican    |
      * |5     |Pennsylvania|Patrick |
      * +------+------------+--------+
      *
      * root
      * |-- emp_id: integer (nullable = true)
      * |-- emp_site: string (nullable = true)
      * |-- emp_name: string (nullable = true)
      */

2. 按所需格式处理

val joiningKey = "emp_id"
    val cols =
      df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
        .map { c  =>
            val (df1Col, df2Col) = df1.col(c._1) -> df2.col(c._2)
            when(df1Col.isNull && df2Col.isNotNull,
              array(map(lit("to"), df2Col), map(lit("change"), lit("insert"))))
            .when(df1Col.isNotNull && df2Col.isNull,
              array(map(lit("from"), df1Col), map(lit("change"), lit("delete"))))
              .when(df1Col.isNotNull && df2Col.isNotNull && df1Col === df2Col,
                lit(null))
              .when(df1Col.isNull && df2Col.isNull,
                lit(null))
              .otherwise(array(map(lit("from"), df1Col), map(lit("to"), df2Col), map(lit("change"), lit("update"))))
            .as(c._1)

          }

    df1.join(df2, Seq(joiningKey), "outer")
      .select(cols ++ Seq(col(colName = joiningKey)): _*)
      .orderBy(joiningKey)
      .show(false)

    /**
      * +------------------------------------------------------+----------------------------------------------------+------+
      * |emp_site                                              |emp_name                                            |emp_id|
      * +------------------------------------------------------+----------------------------------------------------+------+
      * |null                                                  |[[from -> Will], [to -> Watson], [change -> update]]|1     |
      * |[[to -> Wisconsin], [change -> insert]]               |[[to -> Sam], [change -> insert]]                   |2     |
      * |null                                                  |[[from -> Norman], [change -> delete]]              |3     |
      * |[[from -> Iowa], [to -> Illinois], [change -> update]]|[[from -> Ian], [to -> Ican], [change -> update]]   |4     |
      * |[[to -> Pennsylvania], [change -> insert]]            |[[to -> Patrick], [change -> insert]]               |5     |
      * +------------------------------------------------------+----------------------------------------------------+------+
      */

3. 如果输入列的数据类型不是string(基于注解),则使用struct

// in case column is not of type string
    val getExpr = (fromExpr: String, toExpr: String, changeExpr: String) =>
      s"named_struct('from', $fromExpr, 'to', $toExpr, 'change', '$changeExpr')"
    val cols1 =
      df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
        .map { c =>
          val (c1, c2) = s"df1.${c._1}" -> s"df2.${c._2}"
          when(expr(s"$c1 is null and $c2 is not null"), expr(getExpr("null", c2, "insert")))
            .when(expr(s"$c1 is not null and $c2 is null"), expr(getExpr(c1, "null", "delete")))
            .when(expr(s"$c1 is not null and $c2 is not null and $c1=$c2"), expr(getExpr("null", "null", "null")))
            .when(expr(s"$c1 is null and $c2 is null"), expr(getExpr("null", "null", "null")))
            .otherwise(expr(getExpr(c1, c2, "update")))
            .as(c._1)
        }

    val processedDF = df1.as("df1").join(df2.as("df2"), Seq(joiningKey), "outer")
      .select(cols1 ++ Seq(col(colName = joiningKey)): _*)
      .orderBy(joiningKey)
    processedDF.show(false)
    processedDF.printSchema()

    /**
      * +------------------------+----------------------+------+
      * |emp_site                |emp_name              |emp_id|
      * +------------------------+----------------------+------+
      * |[,, null]               |[Will, Watson, update]|1     |
      * |[, Wisconsin, insert]   |[, Sam, insert]       |2     |
      * |[,, null]               |[Norman,, delete]     |3     |
      * |[Iowa, Illinois, update]|[Ian, Ican, update]   |4     |
      * |[, Pennsylvania, insert]|[, Patrick, insert]   |5     |
      * +------------------------+----------------------+------+
      *
      * root
      * |-- emp_site: struct (nullable = false)
      * |    |-- from: string (nullable = true)
      * |    |-- to: string (nullable = true)
      * |    |-- change: string (nullable = false)
      * |-- emp_name: struct (nullable = false)
      * |    |-- from: string (nullable = true)
      * |    |-- to: string (nullable = true)
      * |    |-- change: string (nullable = false)
      * |-- emp_id: integer (nullable = true)
      */

请注意 from 以及 to 不存在,我正在使用更改作为 null ,你可以把它改成 no-op

相关问题