我有一个dataframes,其中有几行已经存在于db中。我想更新现有行的几列。我们怎么做?我看到我们有savemodes:append和override,这可能会起到作用,但在这两种情况下都有限制。使用append,我得到了主键错误,因为这个选项试图在db中创建新行使用ovverride,我将为元组中未更改的属性释放值。有人能建议我如何更新行(元组)的几个属性(列值)吗。?
xmd2e60i1#
这可以在mysql级别处理,这个概念称为upsert。case when:主键是new,sql将作为新行插入mysql db案例:主键存在时,您可以使用
INSERT ON DUPLICATE KEY UPDATE
它将用新的条目/更改更新密钥。在这里阅读更多 here .这种用例的理想方法是,首先将数据插入mysql数据库中的临时表,然后使用触发器将数据加载到原始表中。从Spark中触发。
here
bzzcjhmw2#
在spark中,Dataframe是不可变的。因此不能就地更改值。一种方法是读取完整的表,进行修改,然后以覆盖模式写回完整的表。这需要时间。如果您的修改总是针对特定的组,比如说基于用户id或基于日期,那么您可以使用partitionby()基于该列写入数据。然后可以使用.filter()读取该分区进行修改,并使用insertinto()仅覆盖该分区-来自pyspark 2.3.0请参阅pyspark的其他版本的答案:覆盖sparkDataframe写入方法中的特定分区
2条答案
按热度按时间xmd2e60i1#
这可以在mysql级别处理,这个概念称为upsert。
case when:主键是new,sql将作为新行插入mysql db
案例:主键存在时,您可以使用
它将用新的条目/更改更新密钥。
在这里阅读更多
here
.这种用例的理想方法是,首先将数据插入mysql数据库中的临时表,然后使用触发器将数据加载到原始表中。从Spark中触发。
bzzcjhmw2#
在spark中,Dataframe是不可变的。因此不能就地更改值。一种方法是读取完整的表,进行修改,然后以覆盖模式写回完整的表。这需要时间。如果您的修改总是针对特定的组,比如说基于用户id或基于日期,那么您可以使用partitionby()基于该列写入数据。然后可以使用.filter()读取该分区进行修改,并使用insertinto()仅覆盖该分区-来自pyspark 2.3.0请参阅pyspark的其他版本的答案:覆盖sparkDataframe写入方法中的特定分区