有条件地更新或添加多个sparkDataframe列

4bbkushb  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(320)

我有一个具有以下模式和一些示例记录的Dataframe:

// df.printSchema
root
 |-- CUST_NAME: string (nullable = true)
 |-- DIRECTION: string (nullable = true)
 |-- BANK_NAME: string (nullable = true)
 |-- TXN_AMT: double (nullable = false)

// df.show(false)
+---------+---------+---------+-------+
|CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|
+---------+---------+---------+-------+
|ABC      |D        |Bank1    |300.0  |
|DEF      |C        |Bank2    |10.0   |
|GHI      |C        |Bank3    |12.0   |
|JKL      |D        |Bank4    |500.0  |
+---------+---------+---------+-------+

现在,根据direction列中的值,我需要有条件地添加两个新列:
来自联合银行
至联合银行
就简单代码而言,它看起来像这样:

var from_bank, to_bank
val direction = "D"
val bank_name = "Test"

direction match {
  case "D" => {
   from_bank = bank_name
   to_bank = null
  }
  case "C" => {
   from_bank = null
   to_bank = bank_name
  }
}

上面的代码只是我试图实现的一个解释,我知道它不能与sparkDataframe一起工作。
我知道我可以用多个when/others子句得到我想要的,如下所示:

val df2 = df.withColumn(
  "FROM_BANK",
    when($"DIRECTION" === "D", $"BANK_NAME")
    .otherwise(lit(null))
  )
  .withColumn(
    "TO_BANK",
    when($"DIRECTION" === "C", $"BANK_NAME")
      .otherwise(lit(null))
  )

df2.show(100,false)
//    +---------+---------+---------+-------+---------+-------+
//    |CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|FROM_BANK|TO_BANK|
//    +---------+---------+---------+-------+---------+-------+
//    |ABC      |D        |Bank1    |300.0  |Bank1    |null   |
//    |DEF      |C        |Bank2    |10.0   |null     |Bank2  |
//    |GHI      |C        |Bank3    |12.0   |null     |Bank3  |
//    |JKL      |D        |Bank4    |500.0  |Bank4    |null   |
//    +---------+---------+---------+-------+---------+-------+

上面的方法看似简单,但非常冗长,因为实际上,我还需要为总共8个列执行此操作。我考虑过的另一个选择是使用 .map 在Dataframe上的函数如下:

import spark.implicits._
val df3 = test_df.map(row => {
      val direction = row.getAs[String]("Direction")

      if (direction == "D")
        (row.getAs[String]("CUST_NAME"),
          row.getAs[String]("DIRECTION"),
          row.getAs[String]("BANK_NAME"),
          row.getAs[Double]("TXN_AMT"),
          row.getAs[String]("BANK_NAME"), // This will become the FROM_BANK column
          null // This will become to the TO_BANK column
        )
      else if (direction == "C")
        (row.getAs[String]("CUST_NAME"),
          row.getAs[String]("DIRECTION"),
          row.getAs[String]("BANK_NAME"),
          row.getAs[Double]("TXN_AMT"),
          null, // This will become the FROM_BANK column
          row.getAs[String]("BANK_NAME") // This will become to the TO_BANK column
        )
    }).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")

但是,在运行上述命令时,我得到以下错误:

Error:(35, 26) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val df3 = test_df.map(row => {

我尝试通过创建静态类型的数据集来修改上述内容,但仍然存在相同的问题:

import spark.implicits._

case class Record(CUST_NAME: String, DIRECTION: String, BANK_NAME: String, TXN_AMT: Double)

val test_df4 = test_df.as[Record].map(row => {
  val direction = row.DIRECTION

  if (direction == "D")
    (
      row.CUST_NAME,
      row.DIRECTION,
      row.BANK_NAME,
      row.TXN_AMT,
      row.BANK_NAME, // This will become the FROM_BANK column
      null // This will become to the TO_BANK column
    )
  else if (direction == "C")
    (
      row.CUST_NAME,
      row.DIRECTION,
      row.BANK_NAME,
      row.TXN_AMT,
      null, // This will become the FROM_BANK column
      row.BANK_NAME // This will become to the TO_BANK column
    )
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")
test_df4.show(100,false)

我知道第一个选项可以工作,但我希望用一种更编程的方式来实现,因为我需要对所有基于 DIRECTION 列值。如果您对此有任何反馈或建议,我们将不胜感激。
谢谢!

6gpjuf90

6gpjuf901#

你可以把 when 语句,然后选择它们。那你就不需要把一堆 withColumn 声明。还要注意的是 .otherwise(null) 没有必要,因为这是默认行为。

val newcols = List(
    col("*"),
    when($"DIRECTION" === "D", $"BANK_NAME").as("FROM_BANK"),
    when($"DIRECTION" === "C", $"BANK_NAME").as("TO_BANK")
)

val df2 = df.select(newcols: _*)

相关问题