pyspark 如何将合并两个write_dynamic_frame.from_options合并到一个事务中?

rvpgvaaj  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(137)

我正在使用Glue和pyspark。我正在像这样写入两个表:

  1. glueContext.write_dynamic_frame.from_options(
  2. frame=DynamicFrame.fromDF(df1, glueContext, "df1"),
  3. connection_type="custom.spark",
  4. connection_options=combinedConf
  5. )
  6. glueContext.write_dynamic_frame.from_options(
  7. frame=DynamicFrame.fromDF(df2, glueContext, "df2"),
  8. connection_type="custom.spark",
  9. connection_options=combinedConf
  10. )

字符串
我想把它们合并合并到一个事务中,这样要么两个表都写,要么两个表都不写。我想到了Transactions,但是我怎么把transaction id传递给glueContext.write_dynamic_frame.from_options函数,我检查了文档,但是没有找到。
有没有办法强制这两行代码一起执行或者根本不执行?谢谢
编辑:我这样添加的,但得到了

  1. TypeError: from_options() got an unexpected keyword argument 'transactionId'


tx_id = context.start_transaction(read_only=False)context.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(df1,gluecontext,“df1”),connection_type=“custom.spark”,connection_options=config1,transactionId=tx_id)

  1. context.write_dynamic_frame.from_options(
  2. frame=DynamicFrame.fromDF(df2, gluecontext, "df2"),
  3. connection_type="custom.spark",
  4. connection_options=config2,
  5. transactionId=tx_id
  6. )
  7. try:
  8. context.commit_transaction(tx_id)
  9. print("done writing after transaction")
  10. except Exception:
  11. context.cancel_transaction(tx_id)
  12. print("cancel trans")
  13. raise Exception("the transaction cancelled")

kupeojn6

kupeojn61#

在additional_options中添加transactionId

相关问题