hbase更新现有行

z9ju0rcb  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(241)

我在hbase中的日志数据格式如下。
hbase源表

---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1

dest表(在destination表中,在2013/09/25运行mapreduce后,单词作为key添加,count的和作为column.data)

------------------
word(table key) count
------------------
apple 7
oranges 6
mangoes 6

数据每天都会被添加到源表中。但是我不想对所有的源表数据进行map reduce。所以我试着只为那天添加的数据做map reduce。
2013/09/26新增数据来源表。

---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
2013/09/26 apple 10
2013/09/26 oranges 20

当我只为2013/09/26数据使用mapreduce时,我在dest表中得到了以下内容。
包含新数据的dest表(由于键相同,apple和oranges的计数将更新为2013/09/26数据。2013/09/25之前的旧数据将消失):

------------------
word(table key) count
------------------
apple 10
oranges 10
mangoes 6

预期目标表:

------------------
word(table key) count
------------------
apple 17
oranges 16
mangoes 6

我是否可以Mapreduce部分数据并将计数添加到dest table count列,还是每次都需要Mapreduce所有数据?
如果我能Mapreduce部分数据并更新计数,我该怎么做呢。
Map功能:

public void map(ImmutableBytesWritable row,Result value,Context context) throws IOException {
    ImmutableBytesWritable key = new  ImmutableBytesWritable(row.get());
    String cf = "data";
    String column1 = "word";
    String column2 = "count";
    String word   = new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column1)));
    Text t = new Text(word);
    context.write(t,value); 

}

减少功能:

public void reduce(Text key,Iterable<Result> values,Context context) throws IOException,InterruptedException {
    int count=0;
    String cf = "data";
    String column = "count";
    for(Result val :values) {
        int d = Integer.parseInt(new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column))))
        count += d;
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(cf.getBytes(), column.getBytes(), String.valueOf(count).getBytes());
    context.write(null, put);
}
v8wbuo2f

v8wbuo2f1#

使用hbase时,可以将列视为计数器。您可以对其递增或递增ColumnValue。它的一个很好的特性是,每个增量都是原子的,因此您可以同时从多个源(Map)中获取增量,并且总数将是正确的。
要在map(或reduce)中使用它,您需要自己编写hbase,而不是通过上下文—您可以在setup方法中打开表,在cleanup方法中关闭(甚至增加总数)

sr4lhrrt

sr4lhrrt2#

数据不在任何地方。由于您将数据放在同一单元格中,因此它将作为一个新版本。扫描表时,默认情况下只能看到最新版本。您需要编写逻辑,将新计数添加到上一个计数中,然后将最终值插入表中。
如果不想保留多个版本,则需要在将最终计数放入表中之前,通过删除旧版本来自行处理。

相关问题