使用mapreduce并行查询hbase中的行键列表

gab6jxml  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(330)

我想在hbase中执行查询操作,以使用提供的行键列表获取记录。因为mapreduce中的Map器是并行工作的,所以我想使用它。
行键的输入列表将在~100000的范围内,我已经创建了一个 customInputFormat 对于Map器,这将为每个Map器提供1000行键的列表,以便查询hbase表。这些查询到的记录可能在hbase表中存在,也可能不存在,我只想返回那些存在的记录。
我看到了各种各样的例子,我发现的是hbase表 scan 执行操作以获取行键的范围,范围由指定 startingRowKey 以及 endingRowKey ,但我只想查询提供的行键列表。
如何使用mapreduce执行此操作?欢迎任何帮助!

u1ehiz5o

u1ehiz5o1#

你可以在你的Map器中使用这种方法,对我来说效果很好,它将返回结果数组。

/**
     * Method getDetailRecords.
     * 
     * @param listOfRowKeys List<String>
     * @return Result[]
     * @throws IOException
     */
    private Result[] getDetailRecords(final List<String> listOfRowKeys) throws IOException {
        final HTableInterface table = HBaseConnection.getHTable(TBL_DETAIL);
        final List<Get> listOFGets = new ArrayList<Get>();
        Result[] results = null;
        try {
            for (final String rowkey : listOfRowKeys) {// prepare batch of get with row keys
   // System.err.println("get 'yourtablename', '" + saltIndexPrefix + rowkey + "'");
                final Get get = new Get(Bytes.toBytes(saltedRowKey(rowkey)));
                get.addColumn(COLUMN_FAMILY, Bytes.toBytes(yourcolumnname));
                listOFGets.add(get);
            }
            results = table.get(listOFGets);

        } finally {
            table.close();
        }
        return results;
    }
gfttwv5a

gfttwv5a2#

当您将行键列表传递给Map器时,您应该 get 对hbase的请求。每 get 返回请求密钥的数据,如果密钥不存在,则返回nothing。
首先,您应该在 setup() Map器的方法:

private Table table;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration hbaseConfig = HBaseConfiguration.create();
    Connection conn = ConnectionFactory.createConnection(hbaseConfig);
    this.table = conn.getTable(TableName.valueOf("hbaseTable"));
}

那你就可以 get 对hbase表的请求 map() 方法,通过get和result示例:

String key = "keyString";
Get getValue = new Get(key.getBytes());

//add column family and column qualifier if you desire
getValue.addColumn("columnFamily".getBytes(), "columnQual".getBytes());

try {
    Result result = table.get(getValue);
    if (!table.exists(getValue)) {

        //requested key doesn't exist
        return;
    }

    // do what you want with result instance 
}

在mapper的工作完成之后,您需要关闭到 cleanup() 方法;

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
    table.close();
}

此外,你可以自由地通过考试的结果 get 要求减速器或使用 cleanup() 方法将它们结合起来。这只取决于你的目的。

相关问题