当所有列都不在源中时,有没有办法在spark/databricks合并查询上“set*”?

3duebb1j  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(498)

我正在spark/databricks中将一个表中的数据合并到另一个表中。我能做到 update set * 如果选择了所有列,但如果未选择所有列(例如,如果下面查询的dst表中有col9),则此操作失败。有没有一种方法可以在不重复列列表的情况下进行合并 when matched (即在源查询中按列名匹配)和 when not matched 部分?

  1. merge into demo dst
  2. using (
  3. select distinct
  4. col1,
  5. col2,
  6. col3,
  7. col4,
  8. col5,
  9. col6,
  10. col7,
  11. col8
  12. from
  13. demo_raw
  14. where 1=1
  15. and col1 = 'ac'
  16. and col2 is not null
  17. ) src
  18. on src.col1 = dst.col1
  19. when matched then
  20. update set *
  21. when not matched then
  22. insert *
  23. ;
raogr8fs

raogr8fs1#

下面是一个我比较满意的python解决方案:
功能定义:

  1. %python
  2. def merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols, debug):
  3. # create the sql string
  4. sqlString = ""
  5. sqlString += "merge into " + dstTableName + " t\n"
  6. sqlString += "using ( \n"
  7. sqlString += "select \n"
  8. for str in selectCols[:-1]:
  9. sqlString += " " + str + ", \n"
  10. sqlString += " " + selectCols[-1] + "\n"
  11. sqlString += "from " + srcTableName
  12. sqlString += whereStr + "\n"
  13. sqlString += ") s\n"
  14. sqlString += "on \n"
  15. sqlString += joinStr + "\n"
  16. sqlString += "when matched then update set \n"
  17. updateCols = [s + " = s." + s for s in cols]
  18. for str in updateCols[:-1]:
  19. sqlString += " " + str + ", \n"
  20. sqlString += " " + updateCols[-1]
  21. sqlString += """
  22. when not matched then
  23. insert (
  24. """
  25. for str in cols[:-1]:
  26. sqlString += " " + str + ", \n"
  27. sqlString += " " + cols[-1]
  28. sqlString += """
  29. )
  30. values (
  31. """
  32. for str in cols[:-1]:
  33. sqlString += " " + str + ", \n"
  34. sqlString += " " + cols[-1] + "\n"
  35. sqlString += ")"
  36. if debug == True:
  37. print(sqlString)
  38. spark.sql(sqlString)
  39. return sqlString

呼叫示例:

  1. # define the tables (FROM MY_PROJECT.DEMO TO DEMO)
  2. srcTableName = "my_project.demo"
  3. dstTableName = "demo"
  4. # define the where clause (ONLY AC DATA)
  5. whereStr = """
  6. where 1=1
  7. and org = 'ac'
  8. and org_patient_id is not null
  9. """
  10. # define the join (MATCH ON PATIENT_ID)
  11. joinStr = "t.patient_id = s.patient_id"
  12. # define the columns (JUST THESE COLUMNS)
  13. cols = [
  14. 'org',
  15. 'data_lot',
  16. 'raw_table',
  17. 'org_patient_id',
  18. 'patient_id',
  19. 'sex',
  20. 'sexual_orientation',
  21. 'year_of_birth',
  22. 'year_of_death',
  23. 'race',
  24. 'ethnicity',
  25. 'city',
  26. 'state',
  27. 'zip'
  28. ]
  29. # create any needed aliases for query string
  30. selectCols = cols
  31. selectCols = [x if x != 'year_of_birth' else '(2020 - age) as year_of_birth' for x in selectCols]
  32. # do the merge
  33. merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols)
  34. print("Done.")
展开查看全部

相关问题