spark,delta-lake嵌套列的自动模式演化

hsgswve4  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(463)

模式演化在合并时的工作深度是多少?
在以下情况下,合并时自动模式演化不起作用。

  1. import json
  2. d1 = {'a':'b','b':{'c':{'1':1}}}
  3. d2 = {'a':'s','b':{'c':{'1':2,'2':2}}}
  4. d3 = {'a':'v','b':{'c':{'1':4}}}
  5. df1 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d1)]))
  6. # passes
  7. df1.write.saveAsTable('test_table4',format='delta',mode='overwrite', path=f"hdfs://hdmaster:9000/dest/test_table4")
  8. df2 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d2)]))
  9. df2.createOrReplaceTempView('updates')
  10. query = """
  11. MERGE INTO test_table4 existing_records
  12. USING updates updates
  13. ON existing_records.a=updates.a
  14. WHEN MATCHED THEN UPDATE SET *
  15. WHEN NOT MATCHED THEN INSERT *
  16. """
  17. spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
  18. spark.sql(query) #passes
  19. df3 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d3)]))
  20. df3.createOrReplaceTempView('updates')
  21. query = """
  22. MERGE INTO test_table4 existing_records
  23. USING updates updates
  24. ON existing_records.a=updates.a
  25. WHEN MATCHED THEN UPDATE SET *
  26. WHEN NOT MATCHED THEN INSERT *
  27. """
  28. spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
  29. spark.sql(query) #FAILS #FAILS

当深度大于2并且传入的df缺少列时,这看起来会失败。这是故意的吗?这件事处理得很好 option("mergeSchema", "true") 如果要附加。但我想把数据插上去。但merge无法处理此架构更改
使用delta lake版本0.8.0

r7xajy2e

r7xajy2e1#

在delta 0.8中,应通过设置 spark.databricks.delta.schema.autoMerge.enabledtrue ,除了 mergeSchema 这对我来说更重要 append 模式。
有关此功能的更多详细信息,请参阅delta 0.8公告博客文章。

相关问题