我有一个具有以下模式和一些示例记录的Dataframe:
// df.printSchema
root
|-- CUST_NAME: string (nullable = true)
|-- DIRECTION: string (nullable = true)
|-- BANK_NAME: string (nullable = true)
|-- TXN_AMT: double (nullable = false)
// df.show(false)
+---------+---------+---------+-------+
|CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|
+---------+---------+---------+-------+
|ABC |D |Bank1 |300.0 |
|DEF |C |Bank2 |10.0 |
|GHI |C |Bank3 |12.0 |
|JKL |D |Bank4 |500.0 |
+---------+---------+---------+-------+
现在,根据direction列中的值,我需要有条件地添加两个新列:
来自联合银行
至联合银行
就简单代码而言,它看起来像这样:
var from_bank, to_bank
val direction = "D"
val bank_name = "Test"
direction match {
case "D" => {
from_bank = bank_name
to_bank = null
}
case "C" => {
from_bank = null
to_bank = bank_name
}
}
上面的代码只是我试图实现的一个解释,我知道它不能与sparkDataframe一起工作。
我知道我可以用多个when/others子句得到我想要的,如下所示:
val df2 = df.withColumn(
"FROM_BANK",
when($"DIRECTION" === "D", $"BANK_NAME")
.otherwise(lit(null))
)
.withColumn(
"TO_BANK",
when($"DIRECTION" === "C", $"BANK_NAME")
.otherwise(lit(null))
)
df2.show(100,false)
// +---------+---------+---------+-------+---------+-------+
// |CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|FROM_BANK|TO_BANK|
// +---------+---------+---------+-------+---------+-------+
// |ABC |D |Bank1 |300.0 |Bank1 |null |
// |DEF |C |Bank2 |10.0 |null |Bank2 |
// |GHI |C |Bank3 |12.0 |null |Bank3 |
// |JKL |D |Bank4 |500.0 |Bank4 |null |
// +---------+---------+---------+-------+---------+-------+
上面的方法看似简单,但非常冗长,因为实际上,我还需要为总共8个列执行此操作。我考虑过的另一个选择是使用 .map
在Dataframe上的函数如下:
import spark.implicits._
val df3 = test_df.map(row => {
val direction = row.getAs[String]("Direction")
if (direction == "D")
(row.getAs[String]("CUST_NAME"),
row.getAs[String]("DIRECTION"),
row.getAs[String]("BANK_NAME"),
row.getAs[Double]("TXN_AMT"),
row.getAs[String]("BANK_NAME"), // This will become the FROM_BANK column
null // This will become to the TO_BANK column
)
else if (direction == "C")
(row.getAs[String]("CUST_NAME"),
row.getAs[String]("DIRECTION"),
row.getAs[String]("BANK_NAME"),
row.getAs[Double]("TXN_AMT"),
null, // This will become the FROM_BANK column
row.getAs[String]("BANK_NAME") // This will become to the TO_BANK column
)
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")
但是,在运行上述命令时,我得到以下错误:
Error:(35, 26) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val df3 = test_df.map(row => {
我尝试通过创建静态类型的数据集来修改上述内容,但仍然存在相同的问题:
import spark.implicits._
case class Record(CUST_NAME: String, DIRECTION: String, BANK_NAME: String, TXN_AMT: Double)
val test_df4 = test_df.as[Record].map(row => {
val direction = row.DIRECTION
if (direction == "D")
(
row.CUST_NAME,
row.DIRECTION,
row.BANK_NAME,
row.TXN_AMT,
row.BANK_NAME, // This will become the FROM_BANK column
null // This will become to the TO_BANK column
)
else if (direction == "C")
(
row.CUST_NAME,
row.DIRECTION,
row.BANK_NAME,
row.TXN_AMT,
null, // This will become the FROM_BANK column
row.BANK_NAME // This will become to the TO_BANK column
)
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")
test_df4.show(100,false)
我知道第一个选项可以工作,但我希望用一种更编程的方式来实现,因为我需要对所有基于 DIRECTION
列值。如果您对此有任何反馈或建议,我们将不胜感激。
谢谢!
1条答案
按热度按时间6gpjuf901#
你可以把
when
语句,然后选择它们。那你就不需要把一堆withColumn
声明。还要注意的是.otherwise(null)
没有必要,因为这是默认行为。