spark scalaDataframe:如何将自定义类型应用于现有Dataframe?

ui7jx7zq  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(388)

我有一个Dataframe(datadf),它包含如下数据:

firstColumn;secondColumn;thirdColumn
myText;123;2010-08-12 00:00:00

在我的例子中,所有这些列都是stringtype。
另一方面,我有另一个Dataframe(customtypedf),它可以修改,并包含一些列的自定义类型,如:

columnName;customType
secondColumn;IntegerType
thirdColumn; TimestampType

如何在datadf dataframe上动态应用新类型?

pjngdqdw

pjngdqdw1#

您可以使用收集为seq的customtypedfMap列名:

val colTypes = customTypeDF.rdd.map(x => x.toSeq.asInstanceOf[Seq[String]]).collect

val result = dataDF.select(
    dataDF.columns.map(c => 
        if (colTypes.map(_(0)).contains(c)) 
        col(c).cast(colTypes.filter(_(0) == c)(0)(1).toLowerCase.replace("type","")).as(c) 
        else col(c)
    ):_*
)

result.show
+-----------+------------+-------------------+
|firstColumn|secondColumn|        thirdColumn|
+-----------+------------+-------------------+
|     myText|         123|2010-08-12 00:00:00|
+-----------+------------+-------------------+

result.printSchema
root
 |-- firstColumn: string (nullable = true)
 |-- secondColumn: integer (nullable = true)
 |-- thirdColumn: timestamp (nullable = true)

相关问题