使用mapreduce作业,我试图从hbase表中删除行。
我得到以下错误。
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Delete cannot be cast to org.apache.hadoop.hbase.KeyValue
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:124)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.
看起来这是由configureincrementalload将输出设置为keyvalue引起的。它只有putsortreducer和keyvaluessortreducer,但没有deletesortreducer。
我的代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DeleteRows extends Configured implements Tool {
public static class Map extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Delete> {
ImmutableBytesWritable hKey = new ImmutableBytesWritable();
Delete delRow;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
hKey.set(value.getBytes());
delRow = new Delete(hKey.get());
context.write(hKey, delRow);
// Update counters
context.getCounter("RowsDeleted", "Success").increment(1);
}
}
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
HBaseConfiguration.addHbaseResources(conf);
Job job = new Job(conf, "Delete stuff!");
job.setJarByClass(DeleteRows.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Delete.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
HTable hTable = new HTable(args[2]);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return (0);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DeleteRows(), args);
System.exit(exitCode);
}
}
有没有更好/更快的方法使用行键删除大量行?显然,删除Map器中的每一行是可能的,但是我认为这比将删除推送到正确的区域服务器要慢。
2条答案
按热度按时间3ks5zfa01#
您的目标是使用
Delete
流(实际删除标记为KeyValue
)在里面。标准的方法是HFileOutputFormat
. 事实上你只能把KeyValue
更改为此格式,有2个标准异径管:PutSortReducer
以及KeyValueSortReducer
. 如果将reduce任务数设置为0,则实际上会传递所有任务Delete
直接输出格式当然不行。您最明显的选择:
添加减速机
DeleteSortReducer
. 这样的减速机非常简单,你几乎可以复制。您只需要从delete中提取单个keyvalue流并对它们进行排序。PutSortReducer
是你的好榜样。Put
更改没有分类,所以这就是为什么需要这样的缩减器。只是构造而不是流
Delete
但流是适当的KeyValue
包含删除标记。这也许是提高速度的最好办法。bfhwhh0e2#
通过使用
TableMapReduceUtil.initTableReducerJob
设置减速器而不是HFileOutputFormat.configureIncrementalLoad
代码运行良好。但是,这仍然不会为completebulkload实用程序创建删除。它只执行delete rpc。