我正在尝试使用phoenix连接器从pyspark读取hbase和向hbase写入数据。
我在中看到了示例代码https://phoenix.apache.org/phoenix_spark.html 复制此处的示例代码以便于参考:
df.write \
.format("org.apache.phoenix.spark") \
.mode("overwrite") \
.option("table", "TABLE1") \
.option("zkUrl", "localhost:2181") \
.save()
我已经能够在hbase中读写pyspark。但是,本例只给出了“overwrite”模式,这意味着整个表都将被覆盖。但是,我需要一个选项,可以从pyspark将数据或更新(upsert)附加到hbase中。
我看到的大多数示例都是基于scala的,我无法获得任何关于pyspark支持的清晰文档。
任何相同的示例代码或Phoenix连接器支持的其他模式的见解,将不胜感激。
2条答案
按热度按时间of1yzvn41#
在我用相同的“覆盖”模式尝试了数据插入和更新的各种组合之后,我意识到这个相同的模式充当“附加”和“更新”。
如果hbase中已存在密钥,则更新数据,如果密钥不存在,则插入数据。
但是,如果我试图用一个删除了一些数据的新Dataframe“覆盖”来删除数据,它似乎不起作用。所以,事实上,它是在做更大的改变,而不是像这个词所暗示的那样覆盖。
svujldwt2#
apachesparksql通常不支持updates/upserts。由于它不提供跨执行器的原子事务,而且更新通常不是幂等的,因此很容易最终导致损坏状态,如果是大型数据集,很难从中恢复。
有时用户使用
RDD
/mapPartitions
手动推送数据,但将数据推送到队列(如kafka)更有意义,并使用粒度更高的客户端和方法从错误中优雅地恢复。