spark批量删除hbase上的记录

c9x0cxw0  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(348)

我发现了很多关于从hbase在spark中加载数据的例子,其中一个对我有用的是

Configuration config = HBaseConfiguration.create();
    config.set(TableInputFormat.INPUT_TABLE, props.getProperty(ConfigConstants.HBASE_SRC_TABLE_NAME));
    config.set(TableInputFormat.SCAN_MAXVERSIONS, props.getProperty(ConfigConstants.HBASE_SRC_TABLE_VERSIONS));
    config.set(TableInputFormat.SCAN_COLUMN_FAMILY, HbaseConstants.MAPPING_FAMILY);
    config.set(TableInputFormat.SCAN_TIMERANGE_START, "0");
    config.set(TableInputFormat.SCAN_TIMERANGE_END, startTimestamp + "000");

    RDD<Tuple2<ImmutableBytesWritable, Result>> tupleRDD = context.newAPIHadoopRDD(config, TableInputFormat.class,
            ImmutableBytesWritable.class, Result.class);

然而,我真的需要一种方法来删除记录已加载到Spark一旦他们被处理。
试图将tuplerddMap到 JavaPairRDD<ImmutableBytesWritable, Delete> 然后是

JobConf jobConf = new JobConf(config);
    jobConf.setOutputFormat(org.apache.hadoop.hbase.mapred.TableOutputFormat.class);
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, props.getProperty(ConfigConstants.HBASE_TARGET_TABLE_NAME));
    outputPairsRDD.saveAsHadoopDataset(jobConf);

但这给了我一个例外如下

"main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.

有没有办法从spark中删除?

sdnqo3pr

sdnqo3pr1#

事实证明,没有简单的方法可以做到这一点,我的最终解决方案是抓取数据并通过hbase delete函数删除它们,而不是spark version delete for hbase。

相关问题