从现有列在spark中添加列

hts6caw3  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(313)

所以我生成的Dataframe df 看起来像这样:

+---------------------------------------------------------------------+-----------------+
|constraint_message                                                   |constraint_status|
+---------------------------------------------------------------------+-----------------+
|                                                                     |Success          |
|Value: 8.109213053982745E-6 does not meet the constraint requirement!|Failure          |
|                                                                     |Success          |
|                                                                     |Success          |
|Value: 0.98 does not meet the constraint requirement!                |Failure          |
+---------------------------------------------------------------------+-----------------+

我想在这个Dataframe中有一个新的列,我在函数中为它定义了一个逻辑:

def metric = (status: String, valu:Double) => {
  if (status == "Success"){ 1 }  
  else{ valu } 
}
val putMetric = spark.udf.register("Metric",metric)

现在,当我这样调用它时:[注意:我稍后将用双变量替换0] df.withColumn("Metric",putMetric(col("constraint_status"),0)).show() 我得到一个错误:

try.scala:48: error: type mismatch;
 found   : Int(0)
 required: org.apache.spark.sql.Column
    df.withColumn("Metric",putMetric(col("constraint_status"),0))

如何纠正?我试着把 col(0) 但这也不管用

jhdbpxl9

jhdbpxl91#

正则表达式改编自以下答案:

val df2 = df.withColumn(
    "newcol",
    when(
        col("constraint_message").isNull || length(col("constraint_message")) === 0,
        lit(1)
    )
    .otherwise(
        regexp_extract(
            col("constraint_message"),
            raw"(\d+(\.\d+)?(E[+-]\d+)?)",
            1
        )
    )
    .cast("double")
)

df2.show(false)
+---------------------------------------------------------------------+-----------------+--------------------+
|constraint_message                                                   |constraint_status|newcol              |
+---------------------------------------------------------------------+-----------------+--------------------+
|null                                                                 |Success          |1.0                 |
|Value: 8.109213053982745E-6 does not meet the constraint requirement!|Failure          |8.109213053982745E-6|
|null                                                                 |Success          |1.0                 |
|null                                                                 |Success          |1.0                 |
|Value: 0.98 does not meet the constraint requirement!                |Failure          |0.98                |
+---------------------------------------------------------------------+-----------------+--------------------+
hc2pp10m

hc2pp10m2#

你可以用 regexp_extract ```
val df1 = df.withColumn(
"passed",
when(
col("constraint_status") === "Failure",
regexp_extract(col("constraint_message"), "Value: (\d*\.?\d+([e|E][+-]?[0-9]+)?).*", 1)
).otherwise(1).cast("double")
)

df1.show
//+--------------------+-----------------+--------------------+
//| constraint_message|constraint_status| passed|
//+--------------------+-----------------+--------------------+
//| | Success| 1|
//|Value: 8.10921305...| Failure|8.109213053982745E-6|
//| | Success| 1|
//| | Success| 1|
//|Value: 0.98 does ...| Failure| 0.98|
//+--------------------+-----------------+--------------------+

相关问题