合并操作后的delta-lake spark压缩导致“deltatable”对象没有属性“\u get\u object\u id”错误

cyvaqqii  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(585)

我正在使用pythonapi和pyspark执行delta-lake合并操作。在执行合并操作后,我调用压缩操作,但压缩会产生以下错误:
错误:

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 170, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1212, in _build_args
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1199, in _get_args
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_collections.py", line 501, in convert
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in _build_args
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in <listcomp>
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 298, in get_command_part
AttributeError: 'DeltaTable' object has no attribute '_get_object_id'

代码

delta_table = "delta_lake_path"

df = spark.read.csv("s3n://input_file.csv",header=True)

delta_table = DeltaTable.forPath(spark, delta_table)

delta_table.merge(df, "df.id = delta_table.id" ).whenNotMatchedInsertAll().execute()

# compaction

spark.read.format("delta").load(delta_table).repartition(1).write.option("dataChange",
"False").format("delta").mode("overwrite").save(delta_table)

有人能告诉我为什么spark会话不能创建另一个delta表示例吗。我需要在同一个脚本中执行合并和压缩,因为我只想在执行合并的分区上运行压缩。分区是从从input_file.csv创建的Dataframedf中存在的唯一值派生出来的

ldioqlga

ldioqlga1#

我认为你的问题在于 delta_table 变量-一开始它是一个包含delta-lake路径的字符串,但随后您正在创建一个delta-table对象,试图将其传递到 .load() 方法。分离这些变量有助于:

delta_table_path = "delta_lake_path"

df = spark.read.csv("s3n://input_file.csv",header=True)

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.merge(df, "df.id = delta_table.id" ).whenNotMatchedInsertAll().execute()

# compaction

spark.read.format("delta").load(delta_table_path).repartition(1).write.option("dataChange",
"False").format("delta").mode("overwrite").save(delta_table_path )

相关问题