嗨,我试图添加一列在我的SparkDataframe和计算值的基础上现有的Dataframe列。我正在写下面的代码。
val df1=spark.sql("select id,dt1,salary frm dbdt1.tabledt1")
val df2=df1.withColumn("new_date",WHEN (month(to_date(from_unixtime(unix_timestamp(dt1), 'dd-MM- yyyy')))
IN (01,02,03)) THEN
CONCAT(CONCAT(year(to_date(from_unixtime(unix_timestamp(dt1), 'dd-MM- yyyy')))-1,'-'),
substr(year(to_date(from_unixtime(unix_timestamp(dt1), 'dd-MM-yyyy'))),3,4))
.otherwise(CONCAT(CONCAT(year(to_date(from_unixtime(unix_timestamp(dt1), 'dd-MM- yyyy'))),'-')
,SUBSTR(year(to_date(from_unixtime(unix_timestamp(dt1), 'dd-MM-yyyy')))+1,3,4))))
但它总是显示问题错误:未关闭的字符文本。有人能告诉我该如何添加这个新列或修改现有代码吗。
1条答案
按热度按时间5kgi1eie1#
很多地方语法不正确。首先,我建议您在线查看一些spark sql示例,以及org.apache.spark.sql.functions api文档,因为您对when、concat和in的使用都是不正确的。
scala字符串用双引号括起来,您似乎在使用sql字符串语法。
'dd-MM-yyyy'
应该是"dd-MM-yyyy"
要引用Dataframedf1上的列dt1,可以使用以下方法之一:例如: