pyspark Spark中Delta Merge调用中变量的用法

x759pob2  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(223)

我们在Spark + Scala代码中有一个Delta合并案例,如下所示:

  1. deltaTable.as("existing")
  2. .merge(dfNewData.as("new"), "new.SerialNumber = existing.SerialNumber and new.RSid = existing.RSid")
  3. .whenMatched()
  4. .update(Map("Id1" -> col("new.Id1"),... => around 20 columns
  5. ))
  6. .whenNotMatched()
  7. .insertAll()
  8. .execute()

字符串
为了提高性能,我想检查分区修剪(https://kb.databricks.com/delta/delta-merge-into
在上面的代码中,SerialNumber字段被过滤成块,设置了Min和Max值(使用Int变量SerialMin和SerialMax)。

  1. deltaTable.as("existing")
  2. .merge(dfNewData.as("new"), "existing.SerialNumber >= lit(SerialMin) and existing.SerialNumber < lit(SerialMax) and and new.SerialNumber = existing.SerialNumber and new.RSid = existing.RSid")
  3. .whenMatched()
  4. .update(Map("Id1" -> col("new.Id1"),... => around 20 columns
  5. ))
  6. .whenNotMatched()
  7. .insertAll()
  8. .execute()


但是上面的代码抛出了一个错误,说这是不支持的。我们是否可以在delta merge调用中使用变量?

falq053o

falq053o1#

使用下面的代码解决问题:

  1. deltaTable.as("existing")
  2. .merge(dfNewData.as("new"), s"existing.SerialNumber >= $SerialMin and existing.SerialNumber < $SerialMax and and new.SerialNumber = existing.SerialNumber and new.RSid = existing.RSid")
  3. .whenMatched()
  4. .update(Map("Id1" -> col("new.Id1"),... => around 20 columns
  5. ))
  6. .whenNotMatched()
  7. .insertAll()
  8. .execute()

字符串

相关问题