我有一个自动化sparksql作业的用例,我想在其中执行以下操作:
使用spark从phoenix读取一个表(我们称之为table1),并在Dataframe(我们称之为df1)中收集找到的所有负值
然后,我想从另一个表(表2)中删除记录,其中一列的值在df1中(我考虑过使用连接查询,但我想知道是否可以使用dataframe,以及是否有使用hbase和spark dataframes的api)
afaik phoenix不直接支持通过spark进行删除操作(如果我错了,请纠正我,如果有什么方法我很乐意听到),这就是为什么我更倾向于使用hbase spark api的原因
下面是一个更直观地解释的模式:
这里有一些代码。
在Dataframe中收集负值:
// Collect negative values
val negativeValues = spark
.sqlContext
.phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
.select('COLUMN1)
.where('COLUMN2.lt(0))
// Send the query
[...]
从表2中删除列1在negativevalues中的值,因此在sql中类似于这样(如果可以将in直接应用于df):
DELETE FROM table2 WHERE COLUMN1 IN negativeValues
我的预期结果是:
table1
column1 | column2
|
123456 | 123
234567 | 456
345678 | -789
456789 | 012
567891 | -123
table2
column1 | column2
|
123456 | 321
234567 | 654
345678 | 945 <---- same column1 as table1's, so delete
456789 | 987
567891 | 675 <---- same column1 as table1's, so delete
最后,我想知道是否有一种方法可以通过spark将删除请求发送到hbase,而不必太麻烦。
谢谢您。
1条答案
按热度按时间vxf3dgd41#
如果需要从spark通过phoenix(sql引擎)向hbase运行“delete”查询,则必须创建自定义api。
可以使用以下方法,
从源dataframe获取table2 rowkey列以进行删除(在table2上)。
构造对源Dataframe的每个分区进行操作的代码,并构建“delete”查询。假设查询为“delete from table2 where column1=?”,准备它,并以正确的批量大小执行它。因为我们在Dataframe的每个分区上并行执行它,所以源Dataframe中的分区数驱动了并行性。因此,您可以尝试使用适当的大小对其进行重新分区,以查看适当的性能数据。
如果选择跳过sql引擎,还可以使用spark hbase direct api。这里有一个这样的例子-https://github.com/tmalaska/sparkonhbase/blob/master/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/hbasebulkdeleteexample.scala